OpenAI Agents SDK #14:Agent 刚要删库,你却毫无感知——Human in the Loop 全解析

从「Agent 自动调用删库接口却无法阻止」的危险场景切入,系统拆解 OpenAI Agents SDK 的 Human in the Loop 机制。覆盖 needs_approval 静态/动态标注、RunState 序列化跨进程持久化、state.approve()/reject() 审批操作、Runner.run() 与 run_streamed() 的中断工作流,附完整基础示例与生产级持久化审批代码,兑现上期预告的 Sessions + HITL 结合用法,并澄清「InterruptedError」常见误解,结尾对比 Guardrails 边界,给出 3 条落地建议。

リサーチノート

你把 delete_database 注册成了一个工具。
Agent 在某次推理中判断「清理旧数据」是合理操作,然后直接调用了它。等你收到报警,生产数据已经没了。
这不是假设。这是任何一个没有人工审批机制的 Agent 系统都可能出现的真实事故1
OpenAI Agents SDK 对这个问题的回答是:Human in the Loop(HITL)。不是靠 prompt 里写「请先确认再操作」,而是在 SDK 层面提供一套显式的暂停-审批-恢复原语,让你真正控制住 Agent 的执行边界。
上一期(#13)讲了 Sessions 如何帮你管理多轮对话历史2,最后预告了 HITL 中断恢复与 Sessions 结合的用法。今天完整兑现。

2. 核心机制:needs_approval 标注工具

最小改动是在工具定义上加一个参数。
from agents import Agent, Runner, function_tool
from agents.run import RunConfig

# 普通工具——直接执行,无需审批
@function_tool
async def read_file(path: str) -> str:
    """读取文件内容"""
    with open(path) as f:
        return f.read()

# 危险工具——标注 needs_approval=True
# SDK 在执行此工具前会暂停,等待人工决定
@function_tool(needs_approval=True)
async def delete_database(db_name: str) -> str:
    """删除数据库(不可逆操作)"""
    # ... 实际删库逻辑
    return f"数据库 {db_name} 已删除"

agent = Agent(
    name="DBAdmin",
    instructions="你是数据库管理助手,按用户要求操作数据库。",
    tools=[read_file, delete_database],
)
needs_approval=True 的效果不是拒绝工具调用,而是在调用前挂起整个 Agent 运行,把待审批项放入 result.interruptions,然后等你拿到这个列表、做出决定、再继续1
needs_approval 也支持传入动态函数,实现条件审批:
from agents.run import ToolApprovalFunctionContext

def needs_approval(ctx: ToolApprovalFunctionContext) -> bool:
    """只有当操作目标是生产数据库时才需要审批"""
    db_name = ctx.tool_call.arguments.get("db_name", "")
    return "prod" in db_name or "production" in db_name

@function_tool(needs_approval=needs_approval)
async def delete_database(db_name: str) -> str:
    """删除数据库"""
    return f"数据库 {db_name} 已删除"

3. 完整工作流:从捕获到恢复

needs_approval 只是声明。真正的控制流由 Runner + RunState 串联起来。
下面是一个完整的基础示例,展示「捕获中断 → 人工审批 → 恢复运行」的完整循环:
import asyncio
import json
from agents import Agent, Runner, function_tool

@function_tool(needs_approval=True)
async def send_email(to: str, subject: str, body: str) -> str:
    """发送邮件(需要审批)"""
    return f"邮件已发送至 {to}"

@function_tool
async def draft_email(to: str, subject: str, body: str) -> str:
    """起草邮件(无需审批,仅预览)"""
    return f"草稿已生成:收件人={to}, 主题={subject}"

agent = Agent(
    name="EmailAssistant",
    instructions="你是邮件助手,帮助用户起草和发送邮件。",
    tools=[draft_email, send_email],
)

async def main():
    user_input = "给 [email protected] 发一封主题为'Q2 报告'的邮件"

# 第一次运行:Agent 会生成发邮件的工具调用,然后因 needs_approval 暂停
    result = await Runner.run(agent, user_input)

# 处理中断循环:直到所有中断都被处理完毕
    while result.interruptions:
        print(f"\n⏸️  检测到 {len(result.interruptions)} 个待审批项:\n")

# 将当前运行状态转换为可操作的 RunState 对象
        # RunState 包含 SDK 管理的全部运行时元数据(审批决定、工具输入、嵌套 Agent 恢复点等)
        state = result.to_state()

for interruption in result.interruptions:
            print(f"  工具: {interruption.tool_name}")
            print(f"  代理: {interruption.agent_name}")
            print(f"  参数: {json.dumps(interruption.tool_call.arguments, ensure_ascii=False, indent=2)}")

# 请求人工确认
            decision = input("\n  是否批准?(y=批准 / n=拒绝 / e=修改参数): ").strip().lower()

if decision == "y":
                # 批准:允许工具继续执行
                # always_approve=True 表示本次运行中后续相同工具调用自动批准
                state.approve(interruption, always_approve=False)
                print("  ✅ 已批准")

elif decision == "n":
                # 拒绝:工具不执行,Agent 会收到拒绝消息并重新推理
                reason = input("  拒绝原因(可选): ").strip()
                state.reject(interruption, rejection_message=reason or "用户拒绝")
                print("  ❌ 已拒绝")

else:
                # 修改参数后批准(此处示例直接批准,实际可修改 interruption)
                state.approve(interruption)
                print("  ✅ 已批准(参数未修改)")

# 使用处理后的 RunState 恢复运行
        # Runner.run() 接受 RunState 作为 input 参数,从中断点继续执行
        result = await Runner.run(agent, state)

print(f"\n✅ 运行完成:{result.final_output}")

asyncio.run(main())
关键点拆解:
  • result.interruptions 是一个 ToolApprovalItem 列表,每项包含 agent_nametool_nametool_call(含参数)1
  • result.to_state()RunResult 转换为 RunState,这是执行审批操作的前提
  • state.approve(interruption)state.reject(interruption, rejection_message=...) 记录决定但不立即执行
  • Runner.run(agent, state) 接受 RunState 作为 input,从中断点继续 Agent 循环

4. RunState:中断状态的完整结构与持久化

这是 HITL 最被低估的核心特性:中断状态可以跨进程持久化1
RunState 里面装着什么?
正在加载统计卡片...
序列化 API:
# 序列化:RunState → 字符串(适合写入数据库/文件/Redis)
state_str = state.to_string()
state_json = state.to_json()   # 返回 dict,方便进一步处理

# 反序列化:字符串 → RunState(在另一个进程/机器上恢复)
state = RunState.from_string(state_str)
state = RunState.from_json(state_json)
序列化可选参数(用于 to_string() / to_json()):
参数类型说明
context_serializer可选函数非 JSON-serializable 的 context 对象的自定义序列化器
context_deserializer可选函数反序列化时还原 context 对象
strict_contextbool(默认 FalseTrue 时,context 类型不匹配会抛出异常
context_override可选对象加载时替换 context(适合跨进程传递时注入新 context)
include_tracing_api_keybool(默认 False序列化时保留追踪 API Key(⚠️ 注意安全)
实际生产的工作流可以是这样的:Agent 在 Web 服务器进程里跑,遇到中断;state.to_json() 写入 Redis;审批通知推给 Slack 或内部审批系统;审核员在另一台机器上点确认;审批服务从 Redis 读回状态,RunState.from_json() 恢复,继续运行1
Agent 进程和审批进程可以完全分离,甚至跨机器。这是 RunState 序列化的核心价值。

5. 生产级场景:持久化审批流程完整示例

import asyncio
import json
from pathlib import Path
from agents import Agent, Runner, function_tool
from agents.run import RunState

# ── 工具定义 ────────────────────────────────────────────────────────────────

def needs_database_approval(ctx) -> bool:
    """只有涉及 prod 环境的操作才需要审批"""
    args = ctx.tool_call.arguments
    return args.get("env", "dev") == "prod"

@function_tool(needs_approval=needs_database_approval)
async def run_sql(env: str, sql: str) -> str:
    """执行 SQL 语句
    Args:
        env: 环境(dev/staging/prod)
        sql: SQL 语句
    """
    # 实际执行逻辑...
    return f"[{env}] SQL 执行成功:{sql[:50]}..."

@function_tool
async def list_tables(env: str) -> str:
    """列出数据表(只读,无需审批)"""
    return f"[{env}] 表列表:users, orders, products"

# ── 持久化层(示例用文件,生产用 Redis/DB)──────────────────────────────────

PENDING_FILE = Path("/tmp/pending_approval.json")

async def save_pending_state(state: RunState, interruptions: list):
    """将待审批状态保存到持久化存储"""
    payload = {
        "state": state.to_json(),        # 序列化 RunState
        "interruptions": [
            {
                "tool_name": item.tool_name,
                "agent_name": item.agent_name,
                "arguments": item.tool_call.arguments,
                # interruption 本身不能直接序列化,这里保存 index 用于查找
                "index": i,
            }
            for i, item in enumerate(interruptions)
        ]
    }
    PENDING_FILE.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
    print(f"✉️  审批请求已保存,等待人工处理...")

async def load_and_resume(agent: Agent):
    """从持久化存储加载状态并恢复运行(模拟审批服务)"""
    if not PENDING_FILE.exists():
        print("❌ 无待审批状态")
        return None

payload = json.loads(PENDING_FILE.read_text())

# 从 JSON 反序列化 RunState
    state = RunState.from_json(payload["state"])

print("\n📋 待审批操作列表:")
    for item in payload["interruptions"]:
        print(f"  [{item['index']}] 工具={item['tool_name']}, 环境={item['arguments'].get('env')}")
        print(f"      SQL={item['arguments'].get('sql', '')[:80]}")

# 注意:state.approve/reject 需要原始 interruption 对象
    # 这里需要重新运行一次来获取 interruptions(实际生产中可存储完整对象)
    # 简化示例:直接从状态推断并批准第一个
    print("\n✅ 自动批准所有操作(生产环境应弹出审批 UI)")

PENDING_FILE.unlink()  # 清理待审批文件
    return state

# ── 主流程 ───────────────────────────────────────────────────────────────────

async def main():
    agent = Agent(
        name="DBAAgent",
        instructions="你是数据库管理员助手。按指令执行 SQL 操作,需要时列出表信息。",
        tools=[list_tables, run_sql],
    )

user_input = "在 prod 环境执行:DELETE FROM orders WHERE status='cancelled' AND created_at < '2025-01-01'"

print("▶️  启动 Agent 运行...\n")
    result = await Runner.run(agent, user_input)

while result.interruptions:
        state = result.to_state()

# 将状态持久化(模拟发送审批通知)
        await save_pending_state(state, result.interruptions)

# 模拟等待审批(实际是异步等待人工操作)
        print("\n⏳ 模拟等待审批(实际场景中这里等待人工点击确认)...")
        await asyncio.sleep(1)

# 模拟审批服务加载状态
        state = await load_and_resume(agent)
        if state is None:
            break

# 重新运行以获取原始 interruption 对象并批准
        # 实际生产:审批服务持有 interruption 对象引用,直接调用 state.approve()
        result = await Runner.run(agent, state)

if result.final_output:
        print(f"\n✅ 运行完成:{result.final_output}")

asyncio.run(main())

6. Streaming 模式下的 HITL

Runner.run_streamed() 同样支持中断。区别在于中断发生时,流式事件会停止,你需要从 RunResultStreaming 对象上读取 interruptions
async def streaming_with_hitl():
    agent = Agent(
        name="StreamAgent",
        instructions="你是一个流式输出的助手。",
        tools=[send_email],  # send_email 带 needs_approval=True
    )

async with Runner.run_streamed(agent, "给 [email protected] 发一封周报邮件") as result:
        # 消费流式事件,直到中断发生
        async for event in result.stream_events():
            if hasattr(event, 'delta') and event.delta:
                print(event.delta, end="", flush=True)

# 流结束后检查中断(流式模式下中断会终止流)
    if result.interruptions:
        state = result.to_state()
        for interruption in result.interruptions:
            print(f"\n⏸️  需要批准:{interruption.tool_name}({interruption.tool_call.arguments})")
            state.approve(interruption)  # 或 state.reject()

# 用批准后的 RunState 恢复(可以继续用 run_streamed 或 run)
        final = await Runner.run(agent, state)
        print(f"\n结果:{final.final_output}")
同步场景用 Runner.run_sync(),签名和行为与 Runner.run() 一致,只是去掉了 await3

7. Sessions + Human in the Loop:上期预告兑现

Sessions 负责管理多轮对话历史,HITL 负责在单次运行中插入人工决策点。两者结合时,需要注意一个关键约束:中断恢复时必须使用同一个 Session 实例(或指向相同后端存储的实例)2
原因是:Session 在运行前注入历史对话、运行后写入新消息。如果中断后换了 Session,历史就对不上了。
import asyncio
from agents import Agent, Runner, function_tool
from agents.sessions import SQLiteSession
from agents.run import RunState

@function_tool(needs_approval=True)
async def transfer_money(amount: float, to_account: str) -> str:
    """转账操作(需要审批)"""
    return f"已转账 {amount} 元至 {to_account}"

@function_tool
async def check_balance() -> str:
    """查询余额(无需审批)"""
    return "当前余额:¥12,580.00"

agent = Agent(
    name="BankAgent",
    instructions="你是银行智能助手,帮助用户查询和管理账户。",
    tools=[check_balance, transfer_money],
)

async def main():
    # Session 实例:贯穿整个对话生命周期
    # 中断前后必须复用同一个实例
    session = SQLiteSession("bank_session_001", db_path="/tmp/bank_session.db")

user_messages = [
        "查一下我的余额",
        "把 5000 元转到账户 6217xxxxxx",  # 这条会触发审批
        "转账成功了吗",
    ]

for user_msg in user_messages:
        print(f"\n👤 用户:{user_msg}")

# 每次运行传入同一个 session 实例
        # SDK 自动注入历史,无需手动 to_input_list()
        result = await Runner.run(agent, user_msg, session=session)

# 处理可能的中断
        while result.interruptions:
            state = result.to_state()

for interruption in result.interruptions:
                print(f"\n⚠️  审批请求:{interruption.tool_name}")
                print(f"   参数:{interruption.tool_call.arguments}")

decision = input("   批准?(y/n): ").strip().lower()
                if decision == "y":
                    state.approve(interruption)
                else:
                    state.reject(interruption, rejection_message="用户取消")

# 关键:恢复运行时继续传入同一个 session 实例
            # 这样 Session 才能正确追踪整条对话链
            result = await Runner.run(agent, state, session=session)

print(f"🤖 Agent:{result.final_output}")

asyncio.run(main())
代码跑起来后:Session 自动追踪全部对话历史(包括中断前后的消息),HITL 在转账前插入人工决策点,恢复后的 Agent 仍然「记得」完整上下文。
没有 Session,中断恢复后 Agent 失忆;没有 HITL,转账在你眼皮底下悄悄执行完了。两者都需要21

8. 关于 InterruptedError:官方文档说明

一个常见误解需要澄清:网上有些教程提到「捕获 InterruptedError 异常来处理中断」。这在当前版本的 OpenAI Agents SDK 中并不准确4
官方异常层级中,中断相关的处理不是通过异常机制完成的。实际机制是:
  1. Runner.run() 正常返回(不抛出异常)
  2. 返回的 RunResult 对象上的 result.interruptions 字段非空
  3. 通过检查 result.interruptions 来判断是否发生了中断
这个设计更合理:异常适合「出错了」的场景,而「等待人工决策」不是错误,是预期的正常流程4
SDK 实际存在的异常包括 MaxTurnsExceeded(超过最大轮次)、ModelBehaviorErrorGuardrailTripwireTriggered 等,详见 #12 期的完整异常体系拆解5

9. HITL 与 Guardrails 的边界

这两个机制都能「拦截」Agent 行为,但职责不同:
维度Guardrails(护栏)Human in the Loop
触发时机输入/输出/工具调用阶段工具调用前
决策主体预设规则(代码逻辑)人类(实时审批)
触发结果抛出异常,终止执行暂停执行,等待决定
恢复能力❌ 触发后不可恢复✅ 批准后继续执行
适用场景已知规则的自动过滤不确定性高、需判断的关键操作
实际生产中两者配合使用:Guardrails 过滤明确违规,HITL 处理需要人类判断的边界场景6

10. 社区视角:2026 年的行业共识

值得关注的一个背景:LangChain 在 2026 年将首届开发者大会正式命名为「Interrupt 2026」7
这不是巧合。中断与人机协作机制,已经成为 Agent 工程化从 Demo 走向生产的核心关卡。
这个判断在国内技术圈也得到认可。有开发者明确写道:真正从 Demo 走向生产的难点,不是工具越接越多,而是「工具调用的可控性和权限审批」8。另一位做过深度拆解的开发者也得出类似结论:SDK 的核心设计哲学是「把 Agent Loop 里每一个可能出错、可能被打断、可能需要人介入的点,都显式地建成一个 SDK 原语」9
没有 HITL 的 Agent,功能再强也只是一个危险的原型。

3 条可直接落地的实践建议

① 用动态 needs_approval 函数替代静态布尔值
静态 needs_approval=True 会让低风险调用也触发审批,造成摩擦。用函数基于参数动态判断:只有 env="prod" 才审批,env="dev" 直接放行。审批率下去了,重要操作的审批依然保住了。
② 持久化 RunState 到你已有的数据库
不要让审批等待阻塞你的主进程。state.to_json() 结果直接塞进 Redis 或 PostgreSQL,把审批请求发给 Slack/钉钉/自建审批系统,审批完成后再恢复。这才是生产级的异步审批链路1
③ Sessions + HITL 搭配时,恢复运行必须传入同一 Session 实例
这是最容易遗漏的细节。中断后用 Runner.run(agent, state, session=session) 恢复,session 参数不能省。省掉之后 Agent 会「失忆」,历史对话丢失,后续推理质量直线下降2

下一篇预告
#15 将进入 Multi-agent 编排:当一个 Agent 不够用,如何用 Handoffs 和 as_tool() 把多个专业 Agent 组织成协作系统?Orchestrator-Worker 模式、并行子任务、跨 Agent 的 Context 流转,全部拆开讲。

封面图:AI 生成

このコンテンツについて、さらに観点や背景を補足しましょう。

  • ログインするとコメントできます。