一次典型 Agent 任务的消息流分布
以「分析并修复一个 bug」为例,流中各类消息的大致数量

query() 返回的不是字符串,而是持续吐出消息对象的异步流。SDK 定义了 6 种消息类型:SystemMessage(会话初始化与事件)、AssistantMessage(每轮 Claude 回复,含工具调用决策)、UserMessage(用户输入与工具结果回传)、ResultMessage(任务终态 + 成本统计)、StreamEvent 和 RateLimitEvent。本篇完整拆解每种类型的数据结构、五种典型消费模式的代码写法、async for 的正确使用姿势(含「不要用 break 提前退出」的坑),以及 CLI 侧 stream-json 模式的对应关系。
リサーチノート
query(),你得到的不是一个字符串,而是一条持续吐出消息对象的异步流。async for 消费方式的正确写法,以及五种典型过滤场景的代码模式。ResultMessage 携带精确 token 用量和估算费用,方便计量query() 的返回值是一个 async generator,每产生一条消息对象就 yield 一次,直到 Agent 完成。Message 1:Message = (
UserMessage
| AssistantMessage
| SystemMessage
| ResultMessage
| StreamEvent
| RateLimitEvent
)| 类型 | 触发时机 | 核心字段 |
|---|---|---|
SystemMessage | 会话初始化、内部系统事件 | subtype、data |
AssistantMessage | 每次 Claude 产出回复 | content、model、usage |
UserMessage | 用户输入,含工具结果回传 | content、tool_use_result |
ResultMessage | Agent 完成整轮任务 | result、total_cost_usd、usage、num_turns |
StreamEvent | 部分流式事件(需启用 --include-partial-messages) | 由底层 API 事件构成 |
RateLimitEvent | 触发限速时 | 限速相关元数据 |
StreamEvent 和 RateLimitEvent 留到高级场景处理。subtype="init" 的 SystemMessage,data 字段里有本轮会话 ID:@dataclass
class SystemMessage:
subtype: str # "init"、"session_end" 等
data: dict[str, Any] # 元数据,因 subtype 不同而异init 消息里捞 session_id,用于后续的 Session continue / resume:import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, SystemMessage
async def main():
session_id = None
async for message in query(
prompt="分析 utils.py 里的函数",
options=ClaudeAgentOptions(allowed_tools=["Read", "Glob"]),
):
if isinstance(message, SystemMessage) and message.subtype == "init":
session_id = message.data["session_id"]
print(f"Session ID: {session_id}")
asyncio.run(main())AssistantMessage:@dataclass
class AssistantMessage:
content: list[ContentBlock] # TextBlock / ToolUseBlock 等
model: str # 生成该条回复的模型名
parent_tool_use_id: str | None = None # 子 Agent 场景:父工具 ID
error: AssistantMessageError | None = None # 出错时的错误类型
usage: dict[str, Any] | None = None # 本条消息的 token 用量
message_id: str | None = None # API 消息 IDcontent 是 ContentBlock 列表。常见子类型:TextBlock:Claude 的文本输出,读 .text 字段ToolUseBlock:Claude 决定调用工具,包含工具名和参数error 可能的值:authentication_failed、billing_error、rate_limit、invalid_request、server_error、max_output_tokens、unknown。from claude_agent_sdk import AssistantMessage, TextBlock
async for message in query(prompt="..."):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"Claude 说:{block.text}")
# 工具调用不在这里处理,SDK 会自动执行UserMessage 不只是用户输入——工具执行完毕后,结果以 UserMessage 的形式回传给 Claude。tool_use_result 字段标识它是一条工具结果消息:@dataclass
class UserMessage:
content: str | list[ContentBlock]
uuid: str | None = None
parent_tool_use_id: str | None = None # 对应哪次工具调用
tool_use_result: dict[str, Any] | None = None # 工具执行结果UserMessage——SDK 的 Agent 循环自动处理工具调用和结果回传。但在调试或审计场景,可以通过它观察工具实际返回了什么。ResultMessage:@dataclass
class ResultMessage:
subtype: str
duration_ms: int # 总耗时(毫秒)
duration_api_ms: int # API 调用累计耗时(毫秒)
is_error: bool # 是否以错误结束
num_turns: int # Agent 执行的回合数
session_id: str
stop_reason: str | None = None
total_cost_usd: float | None = None # 估算总费用(美元)
usage: dict[str, Any] | None = None # 含 input_tokens、output_tokens、
# cache_creation_input_tokens、
# cache_read_input_tokens
result: str | None = None # 最终文本结果
structured_output: Any = None # 结构化输出(需配合 --json-schema)result 字段才是 Agent 的最终答案。之前所有 AssistantMessage 是过程输出;ResultMessage.result 是终态输出。from claude_agent_sdk import ResultMessage
async for message in query(prompt="总结这个代码库"):
if isinstance(message, ResultMessage):
if message.is_error:
print(f"执行失败,原因:{message.stop_reason}")
else:
print(f"结果:{message.result}")
print(f"耗时:{message.duration_ms}ms")
print(f"费用:${message.total_cost_usd:.4f}")
print(f"Token 用量:{message.usage}")async for message in query(prompt="..."):
if hasattr(message, "result"):
print(message.result)hasattr(message, "result") 比 isinstance(message, ResultMessage) 更宽松,适合快速脚本。from claude_agent_sdk import (
query, ClaudeAgentOptions,
SystemMessage, AssistantMessage, TextBlock, ResultMessage
)
async for message in query(
prompt="重构整个 src/ 目录",
options=ClaudeAgentOptions(allowed_tools=["Read", "Edit", "Bash", "Glob"]),
):
if isinstance(message, SystemMessage) and message.subtype == "init":
print(f"[开始] session={message.data['session_id']}")
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock) and block.text:
print(f"[Claude] {block.text[:80]}...")
elif isinstance(message, ResultMessage):
print(f"[完成] {message.num_turns} 轮 | {message.duration_ms}ms | ${message.total_cost_usd:.4f}")costs = []
async for message in query(prompt="..."):
if isinstance(message, ResultMessage):
costs.append({
"cost": message.total_cost_usd,
"input_tokens": message.usage.get("input_tokens", 0),
"output_tokens": message.usage.get("output_tokens", 0),
"cache_read": message.usage.get("cache_read_input_tokens", 0),
})cache_read_input_tokens 是缓存命中的 token 数,命中率高说明 prompt 构造效率好,可以作为优化指标。from claude_agent_sdk import AssistantMessage
async for message in query(prompt="..."):
if isinstance(message, AssistantMessage) and message.error:
if message.error == "rate_limit":
print("触发限速,稍后重试")
elif message.error == "max_output_tokens":
print("输出超长,考虑拆分任务")
elif message.error in ("billing_error", "authentication_failed"):
raise RuntimeError(f"账号问题:{message.error}")AgentDefinition + Agent 工具),子 Agent 产生的消息会携带 parent_tool_use_id,用于区分消息来源:async for message in query(
prompt="用 code-reviewer 审查代码",
options=ClaudeAgentOptions(
allowed_tools=["Read", "Glob", "Grep", "Agent"],
agents={"code-reviewer": AgentDefinition(...)},
),
):
if isinstance(message, AssistantMessage):
if message.parent_tool_use_id is None:
print(f"[主 Agent] {message}")
else:
print(f"[子 Agent:{message.parent_tool_use_id[:8]}] {message}")break 提前中断循环,否则可能触发 asyncio 清理问题 1。# ❌ 有潜在问题
async for message in query(prompt="..."):
if isinstance(message, ResultMessage):
result = message.result
break # 可能导致 asyncio 清理异常
# ✅ 推荐写法
result = None
async for message in query(prompt="..."):
if isinstance(message, ResultMessage):
result = message.result
# 不 break,让流自然结束--output-format stream-json 模式 2。每条消息以换行符分隔的 JSON 对象实时输出到 stdout,适合脚本管道消费:# 实时处理每条消息
claude -p "分析 src/ 目录" --output-format stream-json \
| jq 'select(.type == "result") | {result, cost: .total_cost_usd}'
# 配合 --include-partial-messages 获取更细粒度的流式事件
claude -p "..." --output-format stream-json \
--verbose --include-partial-messages--verbose 标志让流包含完整的工具调用细节;--include-partial-messages 进一步暴露文本流式生成的中间片段(对应 Python SDK 的 StreamEvent)。ResultMessage.is_error
不要假设 Agent 总会成功。is_error=True + stop_reason 能给出可程序化处理的失败原因。session_id 做日志关联
每轮任务的所有消息共享同一 session_id,是链路追踪的天然 key。把它写进日志,就能按任务查全量记录。num_turns
ResultMessage.num_turns 反映 Agent 的「思考深度」。同样任务 turns 骤增可能意味着 prompt 不清或工具调用陷入循环,值得告警。usage.cache_read_input_tokens 优化成本
这个字段显示 prompt cache 命中的 token 量。对于重复运行的 CI 场景,cache 命中率直接影响 API 费用,可以把它纳入成本看板。
このコンテンツについて、さらに観点や背景を補足しましょう。