StreamEvent 三种类型
OpenAI Agents SDK 流式事件系统
从「盯着空白屏幕干等 Agent 结果」的开发者痛点切入,系统拆解 OpenAI Agents SDK 的 Streaming 机制。覆盖 Runner.run_streamed() 完整接口签名与 RunResultStreaming 关键成员、StreamEvent 三层事件类型(RawResponsesStreamEvent / RunItemStreamEvent / AgentUpdatedStreamEvent)及 11 种 RunItemStreamEvent name 枚举、Tool Call 事件的「调用-输出」两阶段处理与 Sub-agent 事件透传机制、流式 Guardrail 的输入/输出不对称行为。附两个完整带注释代码示例(打字机效果 + 工作流追踪),结尾给出 3 条可直接落地的实践建议,并预告 #10 Context 变量管理。
Research Brief
Runner.run() 的设计取舍。run() 只会在整个 Agent Loop 跑完后才把结果交给你。一个需要多次工具调用的任务,可能要等十几秒甚至更久——对聊天类应用几乎不可接受,对长时间任务来说更是直接「失联」。Runner.run_streamed()1。Runner.run_streamed(
starting_agent: Agent[TContext],
input: str | list[TResponseInputItem] | RunState[TContext],
context: TContext | None = None,
max_turns: int = DEFAULT_MAX_TURNS,
hooks: RunHooks[TContext] | None = None,
run_config: RunConfig | None = None,
previous_response_id: str | None = None,
auto_previous_response_id: bool = False,
conversation_id: str | None = None,
session: Session | None = None,
*,
error_handlers: RunErrorHandlers[TContext] | None = None,
) -> RunResultStreamingrun() 几乎一模一样,区别在于返回值:不是 RunResult,而是 RunResultStreaming。RunResultStreaming 上有这几个关键成员:stream_events() —— 异步迭代器,消费事件流的核心入口interruptions —— 用于 Human-in-the-Loop 工具审批的中断列表cancel() / cancel(mode="after_turn") —— 中止任务(立即 vs 当前轮结束后)to_state() —— 序列化当前状态,用于持久化或恢复is_complete —— 布尔属性,表示流是否已结束Union 三元组,每种类型有不同的语义和用途4:type == "raw_response_event",携带 data: TResponseStreamEvent,即 Responses API 的原始流事件。这是粒度最细的事件,包含逐 token 的文字增量(ResponseTextDeltaEvent)。想实现聊天界面「字字涌现」的打字效果,就靠它。type == "run_item_stream_event",带 name 和 item 两个字段。name 的完整枚举值:| name 值 | 含义 |
|---|---|
message_output_created | 模型完成一条消息输出 |
tool_called | 函数工具被调用(含参数) |
tool_output | 函数工具执行完毕(含返回值) |
tool_search_called | 搜索工具被触发 |
tool_search_output_created | 搜索工具返回结果 |
handoff_requested | Agent 发起 Handoff 请求 |
handoff_occured | Handoff 完成,控制权已转移 |
reasoning_item_created | 模型产生推理步骤(CoT) |
mcp_approval_requested | MCP 工具请求用户审批 |
mcp_approval_response | MCP 审批结果写入 |
mcp_list_tools | 列出可用 MCP 工具 |
type == "agent_updated_stream_event",携带 new_agent: Agent[Any]。每次 Handoff 完成后触发,告诉你现在是哪个 Agent 在跑。如果你的工作流里有多 Agent 协作,这个事件可以用来更新 UI 上的「当前处理中:XX Agent」进度提示。
import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner
agent = Agent(
name="chat_agent",
instructions="你是一个友好的助手,用简洁的中文回答问题。",
)
async def main():
# run_streamed() 立即返回,不等 Agent Loop 完成
result = await Runner.run_streamed(agent, input="用三句话介绍一下量子计算。")
print("=== 开始流式输出 ===")
async for event in result.stream_events():
# 只关心原始 token 事件
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
# 逐 token 打印,end="" 不换行,flush=True 立即刷新缓冲区
print(event.data.delta, end="", flush=True)
print("\n=== 流式完成 ===")
asyncio.run(main())run_streamed(),async for 遍历事件,检测 ResponseTextDeltaEvent 打印 delta。import asyncio
from agents import Agent, Runner
from agents.items import ToolCallItem, ToolCallOutputItem, MessageOutputItem
# 假设已定义一个带工具调用的 agent
agent = Agent(
name="research_agent",
instructions="你是一个研究助手,使用工具查找信息后综合回答。",
tools=[search_web], # 自定义工具
)
async def main():
result = await Runner.run_streamed(agent, input="搜索 OpenAI 最新动态")
current_agent_name = agent.name
print(f"▶ 开始(Agent: {current_agent_name})")
async for event in result.stream_events():
# 跳过 token 级噪音,只看语义事件
if event.type == "raw_response_event":
continue
# Agent 切换事件 —— 更新当前 Agent 名称
if event.type == "agent_updated_stream_event":
current_agent_name = event.new_agent.name
print(f"\n🔀 Agent 切换 → {current_agent_name}")
continue
# 语义事件处理
if event.type == "run_item_stream_event":
item = event.item
if isinstance(item, ToolCallItem):
# 工具被触发:打印工具名和参数
print(f"\n🔧 工具调用: {item.raw_item.name}")
print(f" 参数: {item.raw_item.arguments}")
elif isinstance(item, ToolCallOutputItem):
# 工具执行完毕:打印返回结果(截断避免刷屏)
output_preview = str(item.output)[:100]
print(f" ✓ 结果: {output_preview}...")
elif isinstance(item, MessageOutputItem):
# 最终消息输出完成
print(f"\n💬 消息输出完成")
print("\n◼ 完成")
asyncio.run(main())tool_called 在工具触发时立刻发出,这时工具还没跑完;tool_output 在工具返回结果后才发出。想在 UI 上显示「正在调用工具 X...」和「工具 X 完成」两种状态,就得分别处理这两个事件。stream_events() 里8。嵌套工作流里,从顶层就能拿到所有层级的事件,不需要在每一层单独订阅。mcp_approval_requested 事件,此时流暂停,你处理 result.interruptions 完成审批后,流才继续推进。blocking 模式下,Input Guardrail 在流开始前就跑完了。输入触发 Tripwire 的话,流根本不会启动——run_streamed() 返回后立刻抛 InputGuardrailTripwireTriggered,stream_events() 没有任何数据。OutputGuardrailTripwireTriggered 在流结束时抛出,但内容已被部分消费。应用不能接受「先流出再撤回」的话,需要在自己这层做缓冲——先攒着,检查通过后再渲染给用户。run_in_parallel 的影响:Input Guardrail 支持 run_in_parallel=True 并发执行。对流式场景来说,多个 Guardrail 并发意味着结果更快收拢,流的等待时延更短。但任一 Guardrail 触发 Tripwire,整个流还是会被中止。RawResponsesStreamEvent + ResponseTextDeltaEvent,每收到一个 delta 就 append 到前端文本框。用户感受从「等 10 秒看到完整回答」变成「0.3 秒开始出现文字,持续流入」。RunItemStreamEvent 驱动一个进度面板:tool_called → 在 UI 显示「正在搜索...」tool_output → 更新为「搜索完成,正在分析...」agent_updated_stream_event → 显示「当前 Agent:Summarizer」message_output_created → 显示「任务完成」RawResponsesStreamEvent 和 RunItemStreamEvent 的用途不一样。聊天输出用 Raw,任务监控用 Item。两者混在同一个 async for 里,条件判断会越写越乱。建议封装成两个独立函数:handle_token_stream() 和 handle_item_stream(),按场景选用或组合。stream_events() 迭代结束不等于任务成功。Output Guardrail 触发、max_turns 超限、工具超时,都可能在流结束后才抛异常。包一个 try/except:try:
async for event in result.stream_events():
# 处理事件
pass
# 流正常结束后,访问最终结果
final_output = result.final_output
except OutputGuardrailTripwireTriggered as e:
# 输出检查失败
handle_guardrail_failure(e)
except MaxTurnsExceeded:
# 超过最大轮次
handle_timeout()cancel(mode="after_turn") 优雅退出cancel() 立刻终止,工具可能卡在中间态。cancel(mode="after_turn") 等当前轮动作跑完再停,工具状态完整,history 也是干净的,后续想从 to_state() 恢复会容易很多。RunContextWrapper、本地 Context 与 LLM-visible Context 的边界在哪里、ToolContext 的额外元数据怎么用,以及多 Agent 工作流下的状态共享策略。
Add more perspectives or context around this content.