StreamEvent 三层类型检查
每层检查过滤掉不同层级的噪声,确保只处理目标数据

默认模式下 Agent 运行像个黑盒,结果出来前什么都看不到。设置 include_partial_messages=True,SDK 开始向 async generator 插入 StreamEvent,文本逐字输出、工具调用实时显示状态。本篇完整拆解三层嵌套类型检查(StreamEvent→content_block_delta→text_delta)的设计逻辑、工具调用流的追踪方式、流式 UI 的构建模式,以及结构化输出在流式模式下的已知限制,附五条可落地的实践建议。
リサーチノート
include_partial_messages=True,把 SDK 从「结果型 API」变成「进程可观测的运行时」——文本逐字输出、工具调用实时显示状态、Agent 每一步行为都变得可感知。本篇完整拆解这套机制的工作原理、三层嵌套类型检查的设计逻辑,以及如何用它构建流式 UI。include_partial_messages=True,TypeScript 中设置 includePartialMessages: true,就能开启流式模式。AssistantMessage / ResultMessage 的基础上,额外往 async generator 里插入 StreamEvent 消息——这些是从 Claude API 实时接收的原始流式事件,未经积累,逐个到达。from claude_agent_sdk import query, ClaudeAgentOptions
from claude_agent_sdk.types import StreamEvent
import asyncio
async def stream_response():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Bash", "Read"],
)
async for message in query(prompt="列出项目里的文件", options=options):
if isinstance(message, StreamEvent):
event = message.event
if event.get("type") == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
print(delta.get("text", ""), end="", flush=True)
asyncio.run(stream_response())StreamEvent 是一个 dataclass:@dataclass
class StreamEvent:
uuid: str # 事件唯一 ID
session_id: str # 会话 ID
event: dict[str, Any] # 原始 Claude API 流式事件
parent_tool_use_id: str | None # 来自子 Agent 时的父工具 IDSDKPartialAssistantMessage,type 字段值为 'stream_event'。event 字段是原始的 Claude API 流式事件,不是 SDK 封装后的对象。这意味着两件事:一是你获得了最大灵活度,可以直接操作底层事件;二是 SDK 不会帮你积累文本——你需要自己把 text_delta 的碎片拼起来。event.type 有六种:1| 事件类型 | 含义 |
|---|---|
message_start | 新消息开始 |
content_block_start | 新内容块开始(文本块或工具调用块) |
content_block_delta | 内容块的增量更新 |
content_block_stop | 内容块结束 |
message_delta | 消息级更新(stop reason、usage 统计) |
message_stop | 消息结束 |
text_delta 和 input_json_delta。content_block_delta 就直接读 delta.text,当这个 delta 实际上是工具调用的 JSON 输入片段时,你拿到的是 None 或错误数据。message.type 是 StreamEvent(区分于 AssistantMessage / ResultMessage)event["type"] == "content_block_delta"(过滤掉 start/stop/message 类事件)delta["type"] == "text_delta"(区分文本片段与工具输入片段)input_json_delta 走的是另一条路。StreamEvent (message_start)
StreamEvent (content_block_start) ← 文本块开始
StreamEvent (content_block_delta) ← 文字逐字到达...
StreamEvent (content_block_stop)
StreamEvent (content_block_start) ← 工具调用块开始
StreamEvent (content_block_delta) ← 工具 JSON 输入逐步到达...
StreamEvent (content_block_stop)
StreamEvent (message_delta)
StreamEvent (message_stop)
AssistantMessage ← 完整消息
... 工具执行 ...
... 下一轮流式事件 ...
ResultMessage ← 最终结果AssistantMessage 和 ResultMessage 依然存在,流式事件只是「插在前面」。如果你的代码同时处理多种消息类型,只需要在 isinstance 分支里分别处理即可。content_block_start(工具开始调用)、input_json_delta(工具输入 JSON 逐步到达)、content_block_stop(工具调用完成)。async def stream_tool_calls():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Read", "Bash"],
)
current_tool = None
tool_input = ""
async for message in query(prompt="读取 README.md 文件", options=options):
if isinstance(message, StreamEvent):
event = message.event
event_type = event.get("type")
if event_type == "content_block_start":
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
current_tool = content_block.get("name")
tool_input = ""
print(f"开始调用工具: {current_tool}")
elif event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "input_json_delta":
chunk = delta.get("partial_json", "")
tool_input += chunk
elif event_type == "content_block_stop":
if current_tool:
print(f"工具 {current_tool} 调用完成,参数: {tool_input}")
current_tool = Nonein_tool 标志区分「当前是否在执行工具」:工具执行期间暂停文本流,显示进度提示;工具完成后恢复文本流。async def streaming_ui():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Read", "Bash", "Grep"],
)
in_tool = False
async for message in query(
prompt="找出代码库里所有的 TODO 注释", options=options
):
if isinstance(message, StreamEvent):
event = message.event
event_type = event.get("type")
if event_type == "content_block_start":
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
tool_name = content_block.get("name")
print(f"\n[正在使用 {tool_name}...]", end="", flush=True)
in_tool = True
elif event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta" and not in_tool:
# 只在工具未运行时输出文本
import sys
sys.stdout.write(delta.get("text", ""))
sys.stdout.flush()
elif event_type == "content_block_stop":
if in_tool:
print(" 完成", flush=True)
in_tool = False
elif isinstance(message, ResultMessage):
print("\n\n--- 全部完成 ---")outputFormat / output_format 开启结构化输出时,JSON 结果不会作为流式 delta 到达,而是等到任务完全结束,才出现在 ResultMessage.structured_output 里。1ResultMessage。如果你的业务需要既展示进度又拿结构化输出,把两个逻辑分开处理即可——流式事件用于 UI 状态更新,ResultMessage 用于数据入库。break 提前退出 async for 循环。这是 SDK 文档和频道之前提到的一个坑:提前 break 可能触发 asyncio 的清理问题,导致连接状态异常。想在某条消息到达后停止处理,用标志位控制后续逻辑,而不是退出循环。text_delta 只包含当前片段。如果你需要完整文本(比如做 token 统计或日志),自己维护一个 full_text += delta["text"] 变量,而不是每次从头拼接。streaming_ui 示例,in_tool 标志是必要的——混在一起处理容易出现「工具输入 JSON 片段被当成用户可见文本输出」的问题。isinstance(message, StreamEvent) 检查,TypeScript 需要检查 message.type === 'stream_event',拿到事件后再用 message.event 读底层数据,语义一致但语法不同。parent_tool_use_id 是子 Agent 追踪的锚点。当你的 Agent 编排里有子 Agent 时,子 Agent 产生的 StreamEvent 会带上 parent_tool_use_id,指向父 Agent 触发这个子 Agent 的工具调用 ID。多层编排下可以据此构建树状执行视图,区分哪些流式事件来自主 Agent、哪些来自子 Agent。permission_mode 的五档设置、allowed_tools / disallowed_tools 的精确控制,以及 Hooks 与权限评估的组合用法。
このコンテンツについて、さらに観点や背景を補足しましょう。