Claude Code SDK #5:消息流全解——6 种消息类型 × async for,把 Agent 执行过程变成可编程事件流

Claude Code SDK #5:消息流全解——6 种消息类型 × async for,把 Agent 执行过程变成可编程事件流

query() 返回的不是字符串,而是持续吐出消息对象的异步流。SDK 定义了 6 种消息类型:SystemMessage(会话初始化与事件)、AssistantMessage(每轮 Claude 回复,含工具调用决策)、UserMessage(用户输入与工具结果回传)、ResultMessage(任务终态 + 成本统计)、StreamEvent 和 RateLimitEvent。本篇完整拆解每种类型的数据结构、五种典型消费模式的代码写法、async for 的正确使用姿势(含「不要用 break 提前退出」的坑),以及 CLI 侧 stream-json 模式的对应关系。

Claude Code SDK 每日技术拆解
2026. 5. 29. · 09:05
구독 3개 · 콘텐츠 39개

리서치 브리프

每次调用 query(),你得到的不是一个字符串,而是一条持续吐出消息对象的异步流
这条流里有初始化事件、每轮 Assistant 回复、工具调用记录、最终结果、成本统计……读懂它,等于拿到了 Agent 整个执行过程的可观测接口。
本篇完整拆解 SDK 的 6 种消息类型、async for 消费方式的正确写法,以及五种典型过滤场景的代码模式。

为什么 query() 要返回「流」而不是「结果」

传统 API 调用是请求-响应:等它跑完,拿返回值。Agent 不一样——它要读文件、执行命令、调 Web 搜索,可能跑几十秒甚至几分钟。
把整个执行过程包装成流,有三个实际好处:
  • 进度可见:工具被调用时立刻知道,不用盲等
  • 中间态可干预:配合 Hooks,可以在工具调用前/后插入自己的逻辑(详见后续 Hooks 篇)
  • 成本可核ResultMessage 携带精确 token 用量和估算费用,方便计量
query() 的返回值是一个 async generator,每产生一条消息对象就 yield 一次,直到 Agent 完成。

消息类型总览

Python SDK 定义了 6 种消息类型,构成一个联合类型 Message 1
Message = (
    UserMessage
    | AssistantMessage
    | SystemMessage
    | ResultMessage
    | StreamEvent
    | RateLimitEvent
)
类型触发时机核心字段
SystemMessage会话初始化、内部系统事件subtypedata
AssistantMessage每次 Claude 产出回复contentmodelusage
UserMessage用户输入,含工具结果回传contenttool_use_result
ResultMessageAgent 完成整轮任务resulttotal_cost_usdusagenum_turns
StreamEvent部分流式事件(需启用 --include-partial-messages由底层 API 事件构成
RateLimitEvent触发限速时限速相关元数据
日常开发中打交道最多的是前 4 种。StreamEventRateLimitEvent 留到高级场景处理。
통계 카드를 불러오는 중…

逐类型解剖

SystemMessage:会话的「开机广播」

会话启动时,SDK 首先 yield 一条 subtype="init"SystemMessagedata 字段里有本轮会话 ID:
@dataclass
class SystemMessage:
    subtype: str          # "init"、"session_end" 等
    data: dict[str, Any]  # 元数据,因 subtype 不同而异
最常用的场景:从 init 消息里捞 session_id,用于后续的 Session continue / resume:
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, SystemMessage

async def main():
    session_id = None
    async for message in query(
        prompt="分析 utils.py 里的函数",
        options=ClaudeAgentOptions(allowed_tools=["Read", "Glob"]),
    ):
        if isinstance(message, SystemMessage) and message.subtype == "init":
            session_id = message.data["session_id"]
            print(f"Session ID: {session_id}")

asyncio.run(main())

AssistantMessage:Claude 的每轮回复

每次 Claude 产出内容,无论是文字还是工具调用决策,都包装成一条 AssistantMessage
@dataclass
class AssistantMessage:
    content: list[ContentBlock]  # TextBlock / ToolUseBlock 等
    model: str                   # 生成该条回复的模型名
    parent_tool_use_id: str | None = None  # 子 Agent 场景:父工具 ID
    error: AssistantMessageError | None = None  # 出错时的错误类型
    usage: dict[str, Any] | None = None   # 本条消息的 token 用量
    message_id: str | None = None         # API 消息 ID
contentContentBlock 列表。常见子类型:
  • TextBlock:Claude 的文本输出,读 .text 字段
  • ToolUseBlock:Claude 决定调用工具,包含工具名和参数
error 可能的值:authentication_failedbilling_errorrate_limitinvalid_requestserver_errormax_output_tokensunknown
提取文本输出的标准写法:
from claude_agent_sdk import AssistantMessage, TextBlock

async for message in query(prompt="..."):
    if isinstance(message, AssistantMessage):
        for block in message.content:
            if isinstance(block, TextBlock):
                print(f"Claude 说:{block.text}")
            # 工具调用不在这里处理,SDK 会自动执行

UserMessage:工具结果的回传通道

UserMessage 不只是用户输入——工具执行完毕后,结果以 UserMessage 的形式回传给 Claude。tool_use_result 字段标识它是一条工具结果消息:
@dataclass
class UserMessage:
    content: str | list[ContentBlock]
    uuid: str | None = None
    parent_tool_use_id: str | None = None    # 对应哪次工具调用
    tool_use_result: dict[str, Any] | None = None  # 工具执行结果
通常不需要手动处理 UserMessage——SDK 的 Agent 循环自动处理工具调用和结果回传。但在调试或审计场景,可以通过它观察工具实际返回了什么。

ResultMessage:任务终结符,带成本清单

整轮 Agent 任务完成后,SDK yield 最后一条 ResultMessage
@dataclass
class ResultMessage:
    subtype: str
    duration_ms: int          # 总耗时(毫秒)
    duration_api_ms: int      # API 调用累计耗时(毫秒)
    is_error: bool            # 是否以错误结束
    num_turns: int            # Agent 执行的回合数
    session_id: str
    stop_reason: str | None = None
    total_cost_usd: float | None = None  # 估算总费用(美元)
    usage: dict[str, Any] | None = None  # 含 input_tokens、output_tokens、
                                          # cache_creation_input_tokens、
                                          # cache_read_input_tokens
    result: str | None = None            # 最终文本结果
    structured_output: Any = None        # 结构化输出(需配合 --json-schema)
关键点result 字段才是 Agent 的最终答案。之前所有 AssistantMessage 是过程输出;ResultMessage.result 是终态输出。
차트를 불러오는 중…
快速提取结果的最简写法:
from claude_agent_sdk import ResultMessage

async for message in query(prompt="总结这个代码库"):
    if isinstance(message, ResultMessage):
        if message.is_error:
            print(f"执行失败,原因:{message.stop_reason}")
        else:
            print(f"结果:{message.result}")
            print(f"耗时:{message.duration_ms}ms")
            print(f"费用:${message.total_cost_usd:.4f}")
            print(f"Token 用量:{message.usage}")

五种典型消费模式

1. 只要最终结果(最简模式)

async for message in query(prompt="..."):
    if hasattr(message, "result"):
        print(message.result)
hasattr(message, "result")isinstance(message, ResultMessage) 更宽松,适合快速脚本。

2. 实时进度跟踪(长任务必备)

from claude_agent_sdk import (
    query, ClaudeAgentOptions,
    SystemMessage, AssistantMessage, TextBlock, ResultMessage
)

async for message in query(
    prompt="重构整个 src/ 目录",
    options=ClaudeAgentOptions(allowed_tools=["Read", "Edit", "Bash", "Glob"]),
):
    if isinstance(message, SystemMessage) and message.subtype == "init":
        print(f"[开始] session={message.data['session_id']}")
    elif isinstance(message, AssistantMessage):
        for block in message.content:
            if isinstance(block, TextBlock) and block.text:
                print(f"[Claude] {block.text[:80]}...")
    elif isinstance(message, ResultMessage):
        print(f"[完成] {message.num_turns} 轮 | {message.duration_ms}ms | ${message.total_cost_usd:.4f}")

3. 成本监控(生产环境必备)

costs = []
async for message in query(prompt="..."):
    if isinstance(message, ResultMessage):
        costs.append({
            "cost": message.total_cost_usd,
            "input_tokens": message.usage.get("input_tokens", 0),
            "output_tokens": message.usage.get("output_tokens", 0),
            "cache_read": message.usage.get("cache_read_input_tokens", 0),
        })
cache_read_input_tokens 是缓存命中的 token 数,命中率高说明 prompt 构造效率好,可以作为优化指标。

4. 错误分级处理

from claude_agent_sdk import AssistantMessage

async for message in query(prompt="..."):
    if isinstance(message, AssistantMessage) and message.error:
        if message.error == "rate_limit":
            print("触发限速,稍后重试")
        elif message.error == "max_output_tokens":
            print("输出超长,考虑拆分任务")
        elif message.error in ("billing_error", "authentication_failed"):
            raise RuntimeError(f"账号问题:{message.error}")

5. 子 Agent 消息过滤(多 Agent 场景)

在子 Agent 场景(使用 AgentDefinition + Agent 工具),子 Agent 产生的消息会携带 parent_tool_use_id,用于区分消息来源:
async for message in query(
    prompt="用 code-reviewer 审查代码",
    options=ClaudeAgentOptions(
        allowed_tools=["Read", "Glob", "Grep", "Agent"],
        agents={"code-reviewer": AgentDefinition(...)},
    ),
):
    if isinstance(message, AssistantMessage):
        if message.parent_tool_use_id is None:
            print(f"[主 Agent] {message}")
        else:
            print(f"[子 Agent:{message.parent_tool_use_id[:8]}] {message}")

一个坑:不要用 break 提前退出

SDK 文档明确提到:迭代消息流时,避免用 break 提前中断循环,否则可能触发 asyncio 清理问题 1
正确做法是用标记变量控制逻辑,让迭代自然结束:
# ❌ 有潜在问题
async for message in query(prompt="..."):
    if isinstance(message, ResultMessage):
        result = message.result
        break  # 可能导致 asyncio 清理异常

# ✅ 推荐写法
result = None
async for message in query(prompt="..."):
    if isinstance(message, ResultMessage):
        result = message.result
        # 不 break,让流自然结束

CLI 侧的流式输出:stream-json

SDK 的消息流对应 CLI 的 --output-format stream-json 模式 2。每条消息以换行符分隔的 JSON 对象实时输出到 stdout,适合脚本管道消费:
# 实时处理每条消息
claude -p "分析 src/ 目录" --output-format stream-json \
  | jq 'select(.type == "result") | {result, cost: .total_cost_usd}'

# 配合 --include-partial-messages 获取更细粒度的流式事件
claude -p "..." --output-format stream-json \
  --verbose --include-partial-messages
--verbose 标志让流包含完整的工具调用细节;--include-partial-messages 进一步暴露文本流式生成的中间片段(对应 Python SDK 的 StreamEvent)。
Python SDK 完整源码与类型定义可在官方仓库查看:
콘텐츠 카드를 불러오는 중…

实践建议

1. 生产脚本必须监听 ResultMessage.is_error 不要假设 Agent 总会成功。is_error=True + stop_reason 能给出可程序化处理的失败原因。
2. 用 session_id 做日志关联 每轮任务的所有消息共享同一 session_id,是链路追踪的天然 key。把它写进日志,就能按任务查全量记录。
3. 关注 num_turns ResultMessage.num_turns 反映 Agent 的「思考深度」。同样任务 turns 骤增可能意味着 prompt 不清或工具调用陷入循环,值得告警。
4. 用 usage.cache_read_input_tokens 优化成本 这个字段显示 prompt cache 命中的 token 量。对于重复运行的 CI 场景,cache 命中率直接影响 API 费用,可以把它纳入成本看板。

下一篇:Hooks 系统——在 PreToolUse / PostToolUse / Stop 等生命周期节点注入自定义逻辑,用回调函数实现审计、拦截和副作用处理。

이 콘텐츠를 둘러싼 관점이나 맥락을 계속 보강해 보세요.

  • 로그인하면 댓글을 작성할 수 있습니다.