OpenAI Agents SDK #16:Runner 跑完之后,你真的知道手里拿着什么吗?

从「`print(result.final_output)` 输出 None」和「直接把 result 传给下一轮导致对话污染」两个高频坑切入,系统拆解 `RunResultBase` → `RunResult` → `RunResultStreaming` 的三层继承结构、`final_output` 三态逻辑、`new_items` 13 种 RunItem 子类型、`to_input_list()` 两种模式、`last_agent` 动态路由、Guardrail 结果数组,以及流式场景的必消费约定与 `cancel()` 两种模式,附 3 条可落地实践建议。

リサーチノート

你调用 Runner.run() 一切正常,没有报错。然后你写下这行代码:
print(result.final_output)
输出是 None
你开始怀疑自己。明明 Agent 跑完了,为什么是空的?
换个方向——你把整个 result 传给下一轮 Runner.run(),以为多轮对话就这样接上了。结果 Agent 表现越来越奇怪,好像对话历史被污染了一样。
这两个坑,我见过太多人踩。问题出在同一个地方:RunResult 到底是什么,大多数人从来没认真看过。

二、final_output 的三种状态

final_output 的类型标注是 Any,这本身就是一个信号1。它有三种完全不同的状态:
状态一:str 类型
last_agent 没有定义 output_type 时,最终输出是纯文本字符串。大多数基础场景属于这种。
状态二:结构化输出类型
last_agent 定义了 output_type(通常是 Pydantic 模型),输出是该类型的实例。由于 handoff 可能在运行中切换 Agent,SDK 无法静态推断输出类型,所以类型标注是 Any——需要你自己处理类型转换。
SDK 提供了 final_output_as() 方法简化这件事2
# raise_if_incorrect_type=True 时非目标类型会抛 TypeError
output = result.final_output_as(MyOutputModel, raise_if_incorrect_type=True)
状态三:None
这是最多人不理解的情况。以下场景会导致 final_outputNone
  1. 运行被中断(HITL):Agent 执行中触发了需要人工审批的操作,运行暂停,此时还没产生最终输出
  2. 流式运行未消费完RunResultStreamingfinal_output 在流结束前始终为 None
  3. 运行被强制取消:调用了 cancel(mode='immediate') 后 Agent 没能完成最后一轮
所以防御代码是必要的,不是「可选的最佳实践」:
result = await runner.run(agent, input="请帮我分析这份数据")

if result.final_output is None:
    # 检查是否有中断项待审批
    if result.interruptions:
        print(f"运行暂停,等待审批: {result.interruptions}")
    else:
        print("运行未完成")
else:
    print(result.final_output)

三、new_items:13 种 RunItem 子类型

new_items 是整个运行过程的事件日志,类型是 list[RunItem]。SDK 定义了 13 种 RunItem 子类型31
类型含义
MessageOutputItemAgent 产生的文本消息
ToolCallItem工具调用请求
ToolCallOutputItem工具调用返回结果
HandoffCallItemHandoff 切换请求
HandoffOutputItemHandoff 切换完成
ReasoningItem推理过程(启用了 reasoning 时)
ToolSearchCallItem搜索工具调用请求
ToolSearchOutputItem搜索工具调用返回
MCPListToolsItemMCP 工具列表请求
MCPApprovalRequestItemMCP 审批请求
MCPApprovalResponseItemMCP 审批响应
CompactionItem上下文压缩事件
ToolApprovalItem需要人工审批的工具调用
13 种看着多,实际生产代码里你大概只会认识其中 4-5 种。按类型过滤提取:
from agents.items import (
    MessageOutputItem,
    ToolCallItem,
    ToolCallOutputItem,
    HandoffCallItem,
)

# 提取 Agent 所有文本输出
messages = [
    item for item in result.new_items
    if isinstance(item, MessageOutputItem)
]

# 提取所有工具调用(便于日志审计)
tool_calls = [
    (item.tool_name, item.tool_input)
    for item in result.new_items
    if isinstance(item, ToolCallItem)
]

# 检查是否发生了 Handoff
handoffs = [
    item for item in result.new_items
    if isinstance(item, HandoffCallItem)
]
if handoffs:
    print(f"发生了 {len(handoffs)} 次 Agent 切换")

四、to_input_list() 的正确用法

to_input_list() 是多轮对话的核心接口,也是最容易用错的地方。
它支持两种模式21
  • preserve_all(默认):将 new_items 转换为完整的 input-item 历史,原样保留所有事件
  • normalized:在 handoff 过滤或重写模型历史时使用规范化接续输入;对于没有发生 handoff 的普通运行,行为与 preserve_all 一致
多轮对话的正确接法4
# ✅ 正确:用 to_input_list() 构建下一轮的 input
input_messages = result.to_input_list()
input_messages.append({"role": "user", "content": "继续分析第二个维度"})
result2 = await runner.run(agent, input=input_messages)

# ❌ 错误:直接把 result 传给下一轮
result2 = await runner.run(agent, input=result)  # 这会报类型错误

# ❌ 错误:只传 final_output,丢失了工具调用历史
result2 = await runner.run(agent, input=result.final_output)
SDK 同时提供了四种记忆策略4
策略状态存于适用场景
to_input_list()应用内存简单多轮、需要自定义历史管理
Session存储后端 + SDK生产环境持久化对话(上期 #13 详解)
conversation_idOpenAI Conversations API云端托管,客户端轻量
previous_response_idOpenAI server-side最轻量的服务端续行

五、last_agent 与多 Agent 场景

last_agentRunResultBase 的抽象属性,返回 Agent[Any]2
单 Agent 场景下,它就是你传进去的那个 Agent,没什么用。多 Agent / Handoff 场景里,last_agent 指向「最后一个真正跑完的接手方」1——用它来决定下一步找谁对话,比维护一个全局状态变量干净多了。
一个典型的动态路由场景:
result = await runner.run(triage_agent, input=user_message)

# 根据 last_agent 决定下一步
if result.last_agent.name == "billing_agent":
    # 账单问题,继续追问细节
    followup = "你的账单问题已初步处理,需要开具发票吗?"
elif result.last_agent.name == "tech_support_agent":
    followup = "技术问题处理中,请描述报错信息"
else:
    followup = "您的问题已收到,稍后跟进"

next_input = result.to_input_list() + [{"role": "user", "content": followup}]
有一个坑:调用 release_agents()2 释放内存后,last_agent 可能返回 None。不要在 result 析构之后还去读它。

六、Guardrail 结果读取

SDK 内置 4 组 Guardrail 结果数组,每一组都是独立的21
属性触发时机
input_guardrail_results输入消息进入 Agent 前
output_guardrail_resultsAgent 产出最终输出后
tool_input_guardrail_results工具被调用前
tool_output_guardrail_results工具返回结果后
没有 Guardrail 配置时,四个数组都是空列表——不用担心 None。有触发时,元素按配置顺序排列:
# 检查是否有 Guardrail 被触发
for gr in result.input_guardrail_results:
    if gr.output.tripwire_triggered:
        print(f"输入 Guardrail 触发: {gr.guardrail.__class__.__name__}")
        print(f"  原因: {gr.output.output_info}")

# 工具级别 Guardrail 审计
for gr in result.tool_output_guardrail_results:
    print(f"工具: {gr.guardrail.__class__.__name__}, "
          f"触发: {gr.output.tripwire_triggered}")
这 4 个数组是全程累积的,整个 Runner.run() 调用期间所有 Guardrail 的检查记录都在里面。排查「运行为什么突然中断」时,第一个该看的地方就是这里。

七、RunResultStreaming 专项

Runner.run_streamed() 返回 RunResultStreaming,增加了以下独有字段2
  • current_agent:当前正在运行的 Agent(动态更新)
  • current_turn:当前轮次编号
  • is_complete:流是否已终止(bool
  • run_loop_exception:后台异常容器,用于消费流后检查静默失败
  • stream_events():异步迭代器,消费语义化流事件
  • cancel(mode):取消流运行
stream_events() 的必消费约定
这条规则必须背下来21stream_events() 必须消费完整个迭代器,不能提前退出。 为什么?
final_outputinterruptionsraw_responses 的赋值,以及 Session 持久化等副作用,可能在最后一个可见 token 到达之后仍在后台处理中。
result = runner.run_streamed(agent, input="请生成一份分析报告")

# ✅ 正确:消费完整个迭代器
async with result.stream_events() as events:
    async for event in events:
        if event.type == "raw_response_event":
            # 处理流式 token
            pass

# 到这里 is_complete 才可靠为 True
print(result.final_output)  # 此时才有值

# ❌ 错误:提前 break 跳出,final_output 可能仍为 None
async for event in result.stream_events():
    if some_condition:
        break  # 后台任务未完成
print(result.final_output)  # 不可靠
cancel() 的两种模式
# immediate:立即停止,取消所有后台任务,清空队列
# 代价:对话历史可能不完整
result.cancel(mode="immediate")

# after_turn:完成当前轮次后停止
# 好处:LLM 响应完成、工具调用结束、Session 状态正确保存
result.cancel(mode="after_turn")

# 注意:调用 cancel() 后仍需继续消费 stream_events()
# 才能让取消和清理操作正常完成
async for event in result.stream_events():
    pass  # 继续消费,直到迭代器自然终止
两种模式的选择标准:用户主动打断选 immediate;业务层面需要保证数据一致性选 after_turn

八、三条可落地的实践建议

1. 防御 final_output=None,不要让它崩溃你的代码
用一个统一的结果处理函数包装所有 Runner.run() 调用,强制检查三种状态:
from agents import RunResult

def extract_output(result: RunResult, expected_type=None):
    if result.final_output is None:
        if hasattr(result, "interruptions") and result.interruptions:
            raise RuntimeError("运行被 HITL 中断,需要人工审批后恢复")
        raise RuntimeError("运行未产生输出(可能被取消或中途出错)")

if expected_type is not None:
        return result.final_output_as(expected_type, raise_if_incorrect_type=True)

return result.final_output
2. new_items 类型过滤要用 isinstance,不要靠字段猜
13 种 RunItem 子类型有些字段名相似,靠字段存在性判断容易误判。永远用 isinstance 精确匹配类型,然后根据具体子类的 API 访问字段。IDE 补全也会更友好。
3. 流式 vs 非流式的场景选择
  • 非流式(run():任务型、批处理、代码驱动的确定性流水线——你只关心最终结果,不需要实时反馈
  • 流式(run_streamed():面向用户的交互界面——需要逐 token 显示让用户感受响应速度;或者运行时间长、需要展示进度
  • 别在服务器端批处理里用流式:流式的持续消费约定会增加代码复杂度,非流式更简洁可靠

小结

RunResult 不是单纯的「结果容器」。它是 SDK 把整个 Agent 运行过程的完整状态——事件流、Guardrail 记录、对话历史接续入口——打包在一起的对象。
final_output 的三态、new_items 的 13 种子类型、to_input_list() 的两种模式,加上流式场景的消费约定——把这四块弄清楚,大部分生产代码里遇到的问题都能定位。

下一篇预告 #17:Realtime Agents——实时语音 Agent 的构建方式。WebSocket 连接、音频流处理、低延迟响应的设计模式,以及与普通 text-based Agent 的核心差异。
封面图:AI 生成,主题为 RunResult 三层继承结构与数据流可视化

このコンテンツについて、さらに観点や背景を補足しましょう。

  • ログインするとコメントできます。