Session 后端一览
OpenAI Agents SDK 内置 Session 实现(含 extensions 包)
从「每轮对话都要手写 .to_input_list() 拼接历史」这个高频痛点切入,系统拆解 OpenAI Agents SDK Sessions 机制。覆盖 Sessions 工作原理(Runner 运行前自动注入历史、运行后自动追加)、SQLiteSession 快速上手、10 种 Session 后端全景对比(SQLite / AsyncSQLite / Redis / SQLAlchemy / MongoDB / Dapr / OpenAI Conversations / OpenAI Responses 压缩 / AdvancedSQLite / EncryptedSession)、session_input_callback 自定义历史合并逻辑、SessionSettings(limit=N) 控制历史长度、pop_item() 修正对话技巧、SessionABC 自定义后端实现,以及 Sessions + RunState 结合 Human in the Loop 中断恢复的核心用法。结尾给出 3 条生产建议,预告 #14 Human in the Loop。
Research Brief
# 第一轮
result1 = await Runner.run(agent, "你好")
# 第二轮,手动拼接历史
result2 = await Runner.run(agent, result1.to_input_list() + [{"role": "user", "content": "继续"}])
# 第三轮,再拼一次……
result3 = await Runner.run(agent, result2.to_input_list() + [{"role": "user", "content": "你刚才说的那个,展开讲讲"}]).to_input_list() 调用,多一次手动拼接,多一个潜在的 off-by-one 错误。session 参数后,Runner.run() 的执行流程变成这样:session.get_items(limit) → 拉取历史对话条目 → 自动拼接到当前输入前面。session.add_items() 里。to_input_list() 连碰都不用碰2。session 和 conversation_id、previous_response_id、auto_previous_response_id 是互斥的,同一次 run() 调用里只能选一个3。原因很简单,这三组参数都在做「告诉模型上下文是什么」这件事,两套同时开会打架。SQLiteSession 是最省事的选择:import asyncio
from agents import Agent, Runner
from agents.extensions.memory import SQLiteSession
async def main():
agent = Agent(
name="chat_agent",
instructions="你是一个乐于助人的助手。"
)
# 用同一个 session_id 跨轮共享历史
session = SQLiteSession(
session_id="user_123",
db_path="./chat_history.db" # 也可以传 ":memory:" 用内存数据库
)
# 第一轮——直接传字符串,不需要 to_input_list()
result1 = await Runner.run(agent, "我叫张三,记住我的名字", session=session)
print(result1.final_output)
# 第二轮——历史自动携带,模型能记住上文
result2 = await Runner.run(agent, "我叫什么名字?", session=session)
print(result2.final_output) # 会正确回答「张三」
asyncio.run(main())| 后端 | 适用场景 | 特点 |
|---|---|---|
SQLiteSession | 本地开发、单进程应用 | 同步 + 异步,WAL 模式,零依赖 |
AsyncSQLiteSession | 高并发本地场景 | 全异步 SQLite |
AdvancedSQLiteSession | 需要精细查询的本地场景 | 支持索引和复杂查询 |
RedisSession | 多进程 / 水平扩展 | 需要 Redis 实例,低延迟 |
SQLAlchemySession | 已有关系数据库的项目 | 支持 PostgreSQL / MySQL 等 |
MongoDBSession | 已用 MongoDB 或需要文档存储 | v0.14.2 新增6 |
DaprSession | Dapr sidecar 环境 | 通过 Dapr state store 接入 |
EncryptedSession | 对数据加密有要求 | 包装其他后端,加一层加密 |
OpenAIConversationsSession | 需要 OpenAI 托管对话 | 由 OpenAI Conversations API 管理 |
OpenAIResponsesCompactionSession | 超长对话场景 | 自动压缩历史,减少 token 占用 |
OpenAIResponsesCompactionSession 针对的是一个真实痛点:对话轮次一多,历史条目会把上下文窗口撑满7。它会自动压缩早期历史,代价是丢失部分细节,换来的是更长的对话寿命。SessionSettings(limit=N) — 限制历史长度get_items() 会拉取全部历史。轮次多了,token 成本会直线上升。from agents.extensions.memory import SQLiteSession, SessionSettings
from agents import RunConfig
session = SQLiteSession("user_123", "./chat.db")
# 只携带最近 10 条历史
config = RunConfig(session_settings=SessionSettings(limit=10))
result = await Runner.run(agent, "继续上次的话题", session=session, run_config=config)session_input_callback — 自定义历史合并逻辑Runner 在合并历史和新输入之前,会先调用这个回调,把控制权交给你:from agents import RunConfig
from agents.items import TResponseInputItem
def my_history_merger(
history: list[TResponseInputItem],
new_input: list[TResponseInputItem]
) -> list[TResponseInputItem]:
# 示例:只保留最近 5 轮,并在历史前插入系统摘要
recent_history = history[-10:] if len(history) > 10 else history
summary_message = [{
"role": "system",
"content": "以下是对话历史摘要:用户正在咨询产品 A 的退款问题。"
}]
return summary_message + recent_history + new_input
config = RunConfig(session_input_callback=my_history_merger)
result = await Runner.run(agent, "我刚才问了什么?", session=session, run_config=config)pop_item() — 撤销上一条消息# 用户认为上一轮回答有误,需要重新回答
last_item = await session.pop_item()
print(f"已撤销: {last_item}")
# 重新提问(不携带被撤销的那条)
result = await Runner.run(agent, "刚才的问题我换个说法", session=session)pop_item() 删除并返回最近一项4。注意是「最近一项」,不是「最近一轮对话」——如果一轮对话产生了多个条目(工具调用 + 工具结果 + 模型响应),你需要多次调用 pop_item() 才能完整清除。SessionABC 是 Session Protocol 的抽象基类,实现四个方法就够了4:from typing import Optional
from agents.extensions.memory import SessionABC
from agents.items import TResponseInputItem
class RedisClusterSession(SessionABC):
def __init__(self, session_id: str, redis_client):
self.session_id = session_id
self.redis = redis_client
self._key = f"session:{session_id}:items"
async def get_items(self, limit: Optional[int] = None) -> list[TResponseInputItem]:
"""从 Redis 拉取历史,limit 控制返回条数"""
raw_items = await self.redis.lrange(self._key, 0, -1)
items = [json.loads(i) for i in raw_items]
return items[-limit:] if limit else items
async def add_items(self, items: list[TResponseInputItem]) -> None:
"""追加新条目到 Redis list"""
pipeline = self.redis.pipeline()
for item in items:
pipeline.rpush(self._key, json.dumps(item))
await pipeline.execute()
async def pop_item(self) -> Optional[TResponseInputItem]:
"""删除并返回最后一条"""
raw = await self.redis.rpop(self._key)
return json.loads(raw) if raw else None
async def clear_session(self) -> None:
"""清空整个会话"""
await self.redis.delete(self._key)from agents import Runner, RunConfig, RunState
# 第一阶段:Agent 跑到某个检查点,需要人类确认
result = await Runner.run(
agent,
"帮我给 production 数据库执行这段 SQL",
session=session
)
# 检查是否触发了人工审批
if result.interrupted:
# 将 run_state 序列化,等待人类审批
pending_state: RunState = result.run_state
# 存到数据库、消息队列等……
await save_pending_approval(pending_state)
return # 先返回,等审批
# 第二阶段:审批通过后,从 RunState 恢复
pending_state = await load_pending_approval()
final_result = await Runner.run(
agent,
input=None, # 从 RunState 恢复,无需重传输入
session=session,
run_state=pending_state
)session_id 绑定用户身份,不要用临时字符串。
session_id 决定历史的隔离边界。用 user_id + conversation_id 的组合更安全,避免不同用户或不同会话的历史意外混用。SessionSettings(limit=N)。
不设上限的历史会随时间线性增长,直到某天某个长期用户的请求把你的 token 预算打穿。50 到 100 条是常见的合理区间,结合你的对话平均长度调整。conversation_id)二选一,不要混用。
两套状态并行维护是维护噩梦。如果你的 Agent 要跨多个 worker 或微服务共享对话,选 OpenAIConversationsSession 或直接用 conversation_id;如果在单进程或少量 worker 内,用本地 Sessions 后端7。session 参数,剩下的由 Runner 负责。不用再手写 .to_input_list(),不用担心拼接顺序,不用管历史什么时候存、怎么存。
Add more perspectives or context around this content.