RunState 包含的内容
序列化后存储的 SDK 运行时元数据
从「Agent 自动调用删库接口却无法阻止」的危险场景切入,系统拆解 OpenAI Agents SDK 的 Human in the Loop 机制。覆盖 needs_approval 静态/动态标注、RunState 序列化跨进程持久化、state.approve()/reject() 审批操作、Runner.run() 与 run_streamed() 的中断工作流,附完整基础示例与生产级持久化审批代码,兑现上期预告的 Sessions + HITL 结合用法,并澄清「InterruptedError」常见误解,结尾对比 Guardrails 边界,给出 3 条落地建议。
リサーチノート
delete_database 注册成了一个工具。needs_approval 标注工具from agents import Agent, Runner, function_tool
from agents.run import RunConfig
# 普通工具——直接执行,无需审批
@function_tool
async def read_file(path: str) -> str:
"""读取文件内容"""
with open(path) as f:
return f.read()
# 危险工具——标注 needs_approval=True
# SDK 在执行此工具前会暂停,等待人工决定
@function_tool(needs_approval=True)
async def delete_database(db_name: str) -> str:
"""删除数据库(不可逆操作)"""
# ... 实际删库逻辑
return f"数据库 {db_name} 已删除"
agent = Agent(
name="DBAdmin",
instructions="你是数据库管理助手,按用户要求操作数据库。",
tools=[read_file, delete_database],
)needs_approval=True 的效果不是拒绝工具调用,而是在调用前挂起整个 Agent 运行,把待审批项放入 result.interruptions,然后等你拿到这个列表、做出决定、再继续1。needs_approval 也支持传入动态函数,实现条件审批:from agents.run import ToolApprovalFunctionContext
def needs_approval(ctx: ToolApprovalFunctionContext) -> bool:
"""只有当操作目标是生产数据库时才需要审批"""
db_name = ctx.tool_call.arguments.get("db_name", "")
return "prod" in db_name or "production" in db_name
@function_tool(needs_approval=needs_approval)
async def delete_database(db_name: str) -> str:
"""删除数据库"""
return f"数据库 {db_name} 已删除"needs_approval 只是声明。真正的控制流由 Runner + RunState 串联起来。import asyncio
import json
from agents import Agent, Runner, function_tool
@function_tool(needs_approval=True)
async def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件(需要审批)"""
return f"邮件已发送至 {to}"
@function_tool
async def draft_email(to: str, subject: str, body: str) -> str:
"""起草邮件(无需审批,仅预览)"""
return f"草稿已生成:收件人={to}, 主题={subject}"
agent = Agent(
name="EmailAssistant",
instructions="你是邮件助手,帮助用户起草和发送邮件。",
tools=[draft_email, send_email],
)
async def main():
user_input = "给 [email protected] 发一封主题为'Q2 报告'的邮件"
# 第一次运行:Agent 会生成发邮件的工具调用,然后因 needs_approval 暂停
result = await Runner.run(agent, user_input)
# 处理中断循环:直到所有中断都被处理完毕
while result.interruptions:
print(f"\n⏸️ 检测到 {len(result.interruptions)} 个待审批项:\n")
# 将当前运行状态转换为可操作的 RunState 对象
# RunState 包含 SDK 管理的全部运行时元数据(审批决定、工具输入、嵌套 Agent 恢复点等)
state = result.to_state()
for interruption in result.interruptions:
print(f" 工具: {interruption.tool_name}")
print(f" 代理: {interruption.agent_name}")
print(f" 参数: {json.dumps(interruption.tool_call.arguments, ensure_ascii=False, indent=2)}")
# 请求人工确认
decision = input("\n 是否批准?(y=批准 / n=拒绝 / e=修改参数): ").strip().lower()
if decision == "y":
# 批准:允许工具继续执行
# always_approve=True 表示本次运行中后续相同工具调用自动批准
state.approve(interruption, always_approve=False)
print(" ✅ 已批准")
elif decision == "n":
# 拒绝:工具不执行,Agent 会收到拒绝消息并重新推理
reason = input(" 拒绝原因(可选): ").strip()
state.reject(interruption, rejection_message=reason or "用户拒绝")
print(" ❌ 已拒绝")
else:
# 修改参数后批准(此处示例直接批准,实际可修改 interruption)
state.approve(interruption)
print(" ✅ 已批准(参数未修改)")
# 使用处理后的 RunState 恢复运行
# Runner.run() 接受 RunState 作为 input 参数,从中断点继续执行
result = await Runner.run(agent, state)
print(f"\n✅ 运行完成:{result.final_output}")
asyncio.run(main())result.interruptions 是一个 ToolApprovalItem 列表,每项包含 agent_name、tool_name、tool_call(含参数)1result.to_state() 把 RunResult 转换为 RunState,这是执行审批操作的前提state.approve(interruption) 和 state.reject(interruption, rejection_message=...) 记录决定但不立即执行Runner.run(agent, state) 接受 RunState 作为 input,从中断点继续 Agent 循环RunState:中断状态的完整结构与持久化RunState 里面装着什么?# 序列化:RunState → 字符串(适合写入数据库/文件/Redis)
state_str = state.to_string()
state_json = state.to_json() # 返回 dict,方便进一步处理
# 反序列化:字符串 → RunState(在另一个进程/机器上恢复)
state = RunState.from_string(state_str)
state = RunState.from_json(state_json)to_string() / to_json()):| 参数 | 类型 | 说明 |
|---|---|---|
context_serializer | 可选函数 | 非 JSON-serializable 的 context 对象的自定义序列化器 |
context_deserializer | 可选函数 | 反序列化时还原 context 对象 |
strict_context | bool(默认 False) | 为 True 时,context 类型不匹配会抛出异常 |
context_override | 可选对象 | 加载时替换 context(适合跨进程传递时注入新 context) |
include_tracing_api_key | bool(默认 False) | 序列化时保留追踪 API Key(⚠️ 注意安全) |
state.to_json() 写入 Redis;审批通知推给 Slack 或内部审批系统;审核员在另一台机器上点确认;审批服务从 Redis 读回状态,RunState.from_json() 恢复,继续运行1。RunState 序列化的核心价值。import asyncio
import json
from pathlib import Path
from agents import Agent, Runner, function_tool
from agents.run import RunState
# ── 工具定义 ────────────────────────────────────────────────────────────────
def needs_database_approval(ctx) -> bool:
"""只有涉及 prod 环境的操作才需要审批"""
args = ctx.tool_call.arguments
return args.get("env", "dev") == "prod"
@function_tool(needs_approval=needs_database_approval)
async def run_sql(env: str, sql: str) -> str:
"""执行 SQL 语句
Args:
env: 环境(dev/staging/prod)
sql: SQL 语句
"""
# 实际执行逻辑...
return f"[{env}] SQL 执行成功:{sql[:50]}..."
@function_tool
async def list_tables(env: str) -> str:
"""列出数据表(只读,无需审批)"""
return f"[{env}] 表列表:users, orders, products"
# ── 持久化层(示例用文件,生产用 Redis/DB)──────────────────────────────────
PENDING_FILE = Path("/tmp/pending_approval.json")
async def save_pending_state(state: RunState, interruptions: list):
"""将待审批状态保存到持久化存储"""
payload = {
"state": state.to_json(), # 序列化 RunState
"interruptions": [
{
"tool_name": item.tool_name,
"agent_name": item.agent_name,
"arguments": item.tool_call.arguments,
# interruption 本身不能直接序列化,这里保存 index 用于查找
"index": i,
}
for i, item in enumerate(interruptions)
]
}
PENDING_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
print(f"✉️ 审批请求已保存,等待人工处理...")
async def load_and_resume(agent: Agent):
"""从持久化存储加载状态并恢复运行(模拟审批服务)"""
if not PENDING_FILE.exists():
print("❌ 无待审批状态")
return None
payload = json.loads(PENDING_FILE.read_text())
# 从 JSON 反序列化 RunState
state = RunState.from_json(payload["state"])
print("\n📋 待审批操作列表:")
for item in payload["interruptions"]:
print(f" [{item['index']}] 工具={item['tool_name']}, 环境={item['arguments'].get('env')}")
print(f" SQL={item['arguments'].get('sql', '')[:80]}")
# 注意:state.approve/reject 需要原始 interruption 对象
# 这里需要重新运行一次来获取 interruptions(实际生产中可存储完整对象)
# 简化示例:直接从状态推断并批准第一个
print("\n✅ 自动批准所有操作(生产环境应弹出审批 UI)")
PENDING_FILE.unlink() # 清理待审批文件
return state
# ── 主流程 ───────────────────────────────────────────────────────────────────
async def main():
agent = Agent(
name="DBAAgent",
instructions="你是数据库管理员助手。按指令执行 SQL 操作,需要时列出表信息。",
tools=[list_tables, run_sql],
)
user_input = "在 prod 环境执行:DELETE FROM orders WHERE status='cancelled' AND created_at < '2025-01-01'"
print("▶️ 启动 Agent 运行...\n")
result = await Runner.run(agent, user_input)
while result.interruptions:
state = result.to_state()
# 将状态持久化(模拟发送审批通知)
await save_pending_state(state, result.interruptions)
# 模拟等待审批(实际是异步等待人工操作)
print("\n⏳ 模拟等待审批(实际场景中这里等待人工点击确认)...")
await asyncio.sleep(1)
# 模拟审批服务加载状态
state = await load_and_resume(agent)
if state is None:
break
# 重新运行以获取原始 interruption 对象并批准
# 实际生产:审批服务持有 interruption 对象引用,直接调用 state.approve()
result = await Runner.run(agent, state)
if result.final_output:
print(f"\n✅ 运行完成:{result.final_output}")
asyncio.run(main())Runner.run_streamed() 同样支持中断。区别在于中断发生时,流式事件会停止,你需要从 RunResultStreaming 对象上读取 interruptions:async def streaming_with_hitl():
agent = Agent(
name="StreamAgent",
instructions="你是一个流式输出的助手。",
tools=[send_email], # send_email 带 needs_approval=True
)
async with Runner.run_streamed(agent, "给 [email protected] 发一封周报邮件") as result:
# 消费流式事件,直到中断发生
async for event in result.stream_events():
if hasattr(event, 'delta') and event.delta:
print(event.delta, end="", flush=True)
# 流结束后检查中断(流式模式下中断会终止流)
if result.interruptions:
state = result.to_state()
for interruption in result.interruptions:
print(f"\n⏸️ 需要批准:{interruption.tool_name}({interruption.tool_call.arguments})")
state.approve(interruption) # 或 state.reject()
# 用批准后的 RunState 恢复(可以继续用 run_streamed 或 run)
final = await Runner.run(agent, state)
print(f"\n结果:{final.final_output}")import asyncio
from agents import Agent, Runner, function_tool
from agents.sessions import SQLiteSession
from agents.run import RunState
@function_tool(needs_approval=True)
async def transfer_money(amount: float, to_account: str) -> str:
"""转账操作(需要审批)"""
return f"已转账 {amount} 元至 {to_account}"
@function_tool
async def check_balance() -> str:
"""查询余额(无需审批)"""
return "当前余额:¥12,580.00"
agent = Agent(
name="BankAgent",
instructions="你是银行智能助手,帮助用户查询和管理账户。",
tools=[check_balance, transfer_money],
)
async def main():
# Session 实例:贯穿整个对话生命周期
# 中断前后必须复用同一个实例
session = SQLiteSession("bank_session_001", db_path="/tmp/bank_session.db")
user_messages = [
"查一下我的余额",
"把 5000 元转到账户 6217xxxxxx", # 这条会触发审批
"转账成功了吗",
]
for user_msg in user_messages:
print(f"\n👤 用户:{user_msg}")
# 每次运行传入同一个 session 实例
# SDK 自动注入历史,无需手动 to_input_list()
result = await Runner.run(agent, user_msg, session=session)
# 处理可能的中断
while result.interruptions:
state = result.to_state()
for interruption in result.interruptions:
print(f"\n⚠️ 审批请求:{interruption.tool_name}")
print(f" 参数:{interruption.tool_call.arguments}")
decision = input(" 批准?(y/n): ").strip().lower()
if decision == "y":
state.approve(interruption)
else:
state.reject(interruption, rejection_message="用户取消")
# 关键:恢复运行时继续传入同一个 session 实例
# 这样 Session 才能正确追踪整条对话链
result = await Runner.run(agent, state, session=session)
print(f"🤖 Agent:{result.final_output}")
asyncio.run(main())InterruptedError:官方文档说明InterruptedError 异常来处理中断」。这在当前版本的 OpenAI Agents SDK 中并不准确4。Runner.run() 正常返回(不抛出异常)RunResult 对象上的 result.interruptions 字段非空result.interruptions 来判断是否发生了中断MaxTurnsExceeded(超过最大轮次)、ModelBehaviorError、GuardrailTripwireTriggered 等,详见 #12 期的完整异常体系拆解5。| 维度 | Guardrails(护栏) | Human in the Loop |
|---|---|---|
| 触发时机 | 输入/输出/工具调用阶段 | 工具调用前 |
| 决策主体 | 预设规则(代码逻辑) | 人类(实时审批) |
| 触发结果 | 抛出异常,终止执行 | 暂停执行,等待决定 |
| 恢复能力 | ❌ 触发后不可恢复 | ✅ 批准后继续执行 |
| 适用场景 | 已知规则的自动过滤 | 不确定性高、需判断的关键操作 |
needs_approval 函数替代静态布尔值needs_approval=True 会让低风险调用也触发审批,造成摩擦。用函数基于参数动态判断:只有 env="prod" 才审批,env="dev" 直接放行。审批率下去了,重要操作的审批依然保住了。RunState 到你已有的数据库state.to_json() 结果直接塞进 Redis 或 PostgreSQL,把审批请求发给 Slack/钉钉/自建审批系统,审批完成后再恢复。这才是生产级的异步审批链路1。Runner.run(agent, state, session=session) 恢复,session 参数不能省。省掉之后 Agent 会「失忆」,历史对话丢失,后续推理质量直线下降2。as_tool() 把多个专业 Agent 组织成协作系统?Orchestrator-Worker 模式、并行子任务、跨 Agent 的 Context 流转,全部拆开讲。
このコンテンツについて、さらに観点や背景を補足しましょう。