OpenAI Agents SDK #9:让 Agent「边跑边说」——Streaming 流式输出全解析

从「盯着空白屏幕干等 Agent 结果」的开发者痛点切入,系统拆解 OpenAI Agents SDK 的 Streaming 机制。覆盖 Runner.run_streamed() 完整接口签名与 RunResultStreaming 关键成员、StreamEvent 三层事件类型(RawResponsesStreamEvent / RunItemStreamEvent / AgentUpdatedStreamEvent)及 11 种 RunItemStreamEvent name 枚举、Tool Call 事件的「调用-输出」两阶段处理与 Sub-agent 事件透传机制、流式 Guardrail 的输入/输出不对称行为。附两个完整带注释代码示例(打字机效果 + 工作流追踪),结尾给出 3 条可直接落地的实践建议,并预告 #10 Context 变量管理。

리서치 브리프

你有没有遇到过这种体验:把一个任务甩给 Agent,然后盯着空白屏幕干等,不知道它在干什么、跑了多久,也不知道它是卡死了还是在思考——直到某一刻,满屏结果一下子全涌出来。
这种「黑盒等待感」,是 Runner.run() 的设计取舍。
run() 只会在整个 Agent Loop 跑完后才把结果交给你。一个需要多次工具调用的任务,可能要等十几秒甚至更久——对聊天类应用几乎不可接受,对长时间任务来说更是直接「失联」。
解法很直接:Runner.run_streamed()1

run_streamed() 接口

完整签名如下2
Runner.run_streamed(
    starting_agent: Agent[TContext],
    input: str | list[TResponseInputItem] | RunState[TContext],
    context: TContext | None = None,
    max_turns: int = DEFAULT_MAX_TURNS,
    hooks: RunHooks[TContext] | None = None,
    run_config: RunConfig | None = None,
    previous_response_id: str | None = None,
    auto_previous_response_id: bool = False,
    conversation_id: str | None = None,
    session: Session | None = None,
    *,
    error_handlers: RunErrorHandlers[TContext] | None = None,
) -> RunResultStreaming
参数和 run() 几乎一模一样,区别在于返回值:不是 RunResult,而是 RunResultStreaming
RunResultStreaming 上有这几个关键成员:
  • stream_events() —— 异步迭代器,消费事件流的核心入口
  • interruptions —— 用于 Human-in-the-Loop 工具审批的中断列表
  • cancel() / cancel(mode="after_turn") —— 中止任务(立即 vs 当前轮结束后)
  • to_state() —— 序列化当前状态,用于持久化或恢复
  • is_complete —— 布尔属性,表示流是否已结束
整个 Agent Loop 要等 stream_events() 迭代完成才真正结束。期间 SDK 在背后做 session 持久化和历史压缩,你不用管——用 async for 消费就好3

StreamEvent 三种类型全解析

事件类型定义为 Union 三元组,每种类型有不同的语义和用途4
正在加载统计卡片...
① RawResponsesStreamEvent
type == "raw_response_event",携带 data: TResponseStreamEvent,即 Responses API 的原始流事件。这是粒度最细的事件,包含逐 token 的文字增量(ResponseTextDeltaEvent)。想实现聊天界面「字字涌现」的打字效果,就靠它。
② RunItemStreamEvent
type == "run_item_stream_event",带 nameitem 两个字段。name 的完整枚举值:
name 值含义
message_output_created模型完成一条消息输出
tool_called函数工具被调用(含参数)
tool_output函数工具执行完毕(含返回值)
tool_search_called搜索工具被触发
tool_search_output_created搜索工具返回结果
handoff_requestedAgent 发起 Handoff 请求
handoff_occuredHandoff 完成,控制权已转移
reasoning_item_created模型产生推理步骤(CoT)
mcp_approval_requestedMCP 工具请求用户审批
mcp_approval_responseMCP 审批结果写入
mcp_list_tools列出可用 MCP 工具
用这层事件,你不关心模型在输出哪个字,只关心「调了什么工具」「工具返回了什么」「任务有没有被甩给其他 Agent」。
③ AgentUpdatedStreamEvent
type == "agent_updated_stream_event",携带 new_agent: Agent[Any]。每次 Handoff 完成后触发,告诉你现在是哪个 Agent 在跑。如果你的工作流里有多 Agent 协作,这个事件可以用来更新 UI 上的「当前处理中:XX Agent」进度提示。

完整代码示例:从 token 级到语义级

深色渐变背景中流动的数据流光带,代表 Agent 流式事件持续输出的过程
深色渐变背景中流动的数据流光带,代表 Agent 流式事件持续输出的过程

示例一:实现打字机效果(token 粒度)5

import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner

agent = Agent(
    name="chat_agent",
    instructions="你是一个友好的助手,用简洁的中文回答问题。",
)

async def main():
    # run_streamed() 立即返回,不等 Agent Loop 完成
    result = await Runner.run_streamed(agent, input="用三句话介绍一下量子计算。")

print("=== 开始流式输出 ===")
    async for event in result.stream_events():
        # 只关心原始 token 事件
        if event.type == "raw_response_event" and isinstance(
            event.data, ResponseTextDeltaEvent
        ):
            # 逐 token 打印,end="" 不换行,flush=True 立即刷新缓冲区
            print(event.data.delta, end="", flush=True)

print("\n=== 流式完成 ===")

asyncio.run(main())
核心就三行:调用 run_streamed()async for 遍历事件,检测 ResponseTextDeltaEvent 打印 delta

示例二:追踪工具调用与 Agent 切换(语义粒度)6

import asyncio
from agents import Agent, Runner
from agents.items import ToolCallItem, ToolCallOutputItem, MessageOutputItem

# 假设已定义一个带工具调用的 agent
agent = Agent(
    name="research_agent",
    instructions="你是一个研究助手,使用工具查找信息后综合回答。",
    tools=[search_web],  # 自定义工具
)

async def main():
    result = await Runner.run_streamed(agent, input="搜索 OpenAI 最新动态")

current_agent_name = agent.name
    print(f"▶ 开始(Agent: {current_agent_name})")

async for event in result.stream_events():
        # 跳过 token 级噪音,只看语义事件
        if event.type == "raw_response_event":
            continue

# Agent 切换事件 —— 更新当前 Agent 名称
        if event.type == "agent_updated_stream_event":
            current_agent_name = event.new_agent.name
            print(f"\n🔀 Agent 切换 → {current_agent_name}")
            continue

# 语义事件处理
        if event.type == "run_item_stream_event":
            item = event.item

if isinstance(item, ToolCallItem):
                # 工具被触发:打印工具名和参数
                print(f"\n🔧 工具调用: {item.raw_item.name}")
                print(f"   参数: {item.raw_item.arguments}")

elif isinstance(item, ToolCallOutputItem):
                # 工具执行完毕:打印返回结果(截断避免刷屏)
                output_preview = str(item.output)[:100]
                print(f"   ✓ 结果: {output_preview}...")

elif isinstance(item, MessageOutputItem):
                # 最终消息输出完成
                print(f"\n💬 消息输出完成")

print("\n◼ 完成")

asyncio.run(main())
这段代码完整捕获了 Agent 工作流的全部「动作节点」。没有黑盒,每一步都看得见。

处理工具调用事件的细节

流式模式下处理 Tool Call,有几个细节容易踩坑6
区分「调用」和「输出」tool_called 在工具触发时立刻发出,这时工具还没跑完;tool_output 在工具返回结果后才发出。想在 UI 上显示「正在调用工具 X...」和「工具 X 完成」两种状态,就得分别处理这两个事件。
工具便捷属性:SDK v0.14.7 起,ToolCallItem 上新增了 tool_namecall_id 两个属性7,不用再手动从 raw_item 里挖。
Sub-agent 事件透传:Agent 通过 Handoffs 调用子 Agent 时,子 Agent 的流式事件会透传到外层的 stream_events()8。嵌套工作流里,从顶层就能拿到所有层级的事件,不需要在每一层单独订阅。
MCP 审批流程:工具需要用户确认时(Human-in-the-Loop),流里会先出现 mcp_approval_requested 事件,此时流暂停,你处理 result.interruptions 完成审批后,流才继续推进。

流式 Guardrail 的特殊行为

Guardrail 在流式模式下的行为,和非流式有一处关键差异9
Input Guardrail(输入检查)blocking 模式下,Input Guardrail 在流开始前就跑完了。输入触发 Tripwire 的话,流根本不会启动——run_streamed() 返回后立刻抛 InputGuardrailTripwireTriggeredstream_events() 没有任何数据。
Output Guardrail(输出检查):流式模式下,Output Guardrail 在所有事件流发出之后才执行。token 已经流给你了,最后才检查。如果 Tripwire 触发,OutputGuardrailTripwireTriggered 在流结束时抛出,但内容已被部分消费。应用不能接受「先流出再撤回」的话,需要在自己这层做缓冲——先攒着,检查通过后再渲染给用户。
run_in_parallel 的影响:Input Guardrail 支持 run_in_parallel=True 并发执行。对流式场景来说,多个 Guardrail 并发意味着结果更快收拢,流的等待时延更短。但任一 Guardrail 触发 Tripwire,整个流还是会被中止。
一句话总结:流式模式下,Input Guardrail 是「前门」,Output Guardrail 是「事后复核」——两者不对称,设计检查逻辑时要分开考虑。

两种实际应用场景

场景一:聊天界面实时打字效果
最常见的用法。用 RawResponsesStreamEvent + ResponseTextDeltaEvent,每收到一个 delta 就 append 到前端文本框。用户感受从「等 10 秒看到完整回答」变成「0.3 秒开始出现文字,持续流入」。
ChatGPT、Claude、Gemini 默认开流式输出不是偶然——不是因为更快,是因为让用户感觉更快,等待焦虑大幅降低。
场景二:进度感知与任务监控
对于需要多步工具调用的 Agent(比如搜索 + 总结 + 格式化的 Research Agent),用 RunItemStreamEvent 驱动一个进度面板:
  • 收到 tool_called → 在 UI 显示「正在搜索...」
  • 收到 tool_output → 更新为「搜索完成,正在分析...」
  • 收到 agent_updated_stream_event → 显示「当前 Agent:Summarizer」
  • 收到 message_output_created → 显示「任务完成」
长周期任务里,这种可见性让用户有事可盯,不至于盯着空白屏幕发呆;开发者调试时也能直接看哪个工具最慢,不用猜10

3 条落地建议

① 按需分层订阅,别把两个层次混在一起
RawResponsesStreamEventRunItemStreamEvent 的用途不一样。聊天输出用 Raw,任务监控用 Item。两者混在同一个 async for 里,条件判断会越写越乱。建议封装成两个独立函数:handle_token_stream()handle_item_stream(),按场景选用或组合。
② 永远处理 stream 结束后的异常
stream_events() 迭代结束不等于任务成功。Output Guardrail 触发、max_turns 超限、工具超时,都可能在流结束后才抛异常。包一个 try/except
try:
    async for event in result.stream_events():
        # 处理事件
        pass
    # 流正常结束后,访问最终结果
    final_output = result.final_output
except OutputGuardrailTripwireTriggered as e:
    # 输出检查失败
    handle_guardrail_failure(e)
except MaxTurnsExceeded:
    # 超过最大轮次
    handle_timeout()
③ 长任务用 cancel(mode="after_turn") 优雅退出
用户点「停止」时,直接 cancel() 立刻终止,工具可能卡在中间态。cancel(mode="after_turn") 等当前轮动作跑完再停,工具状态完整,history 也是干净的,后续想从 to_state() 恢复会容易很多。

下一篇预告

#10 将深入 Context 变量管理RunContextWrapper、本地 Context 与 LLM-visible Context 的边界在哪里、ToolContext 的额外元数据怎么用,以及多 Agent 工作流下的状态共享策略。

封面图:AI 生成,视觉概念来自 Streaming 数据流主题

이 콘텐츠를 둘러싼 관점이나 맥락을 계속 보강해 보세요.

  • 로그인하면 댓글을 작성할 수 있습니다.