api2cursor/adapters/responses_cc_adapter.py
2026-03-15 16:26:07 +08:00

1134 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Responses API 适配
Cursor 对 GPT、Claude-Opus 等模型使用 `/v1/responses` 格式。
本模块负责在 Responses 与 Chat Completions 两种表示之间做双向转换:
- 请求方向Responses → Chat Completions
- 响应方向Chat Completions → Responses非流式
- 流式方向CC chunk / Anthropic SSE → Responses SSE
这个模块之所以相对复杂,是因为 Responses 在“输出项”层面比 Chat Completions 更细:
它会把思考、文本、工具调用拆成独立项目,因此流式场景必须靠状态机把不同来源的
增量重新组织成稳定的 Responses 事件序列。
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any
from utils.http import gen_id
JsonDict = dict[str, Any]
# ═══════════════════════════════════════════════════════════
# 请求转换: Responses → CC
# ═══════════════════════════════════════════════════════════
def responses_to_cc(payload: JsonDict) -> JsonDict:
"""将 `/v1/responses` 请求转换为 `/v1/chat/completions` 请求。"""
messages: list[JsonDict] = []
if payload.get('instructions'):
messages.append({'role': 'system', 'content': payload['instructions']})
input_data = payload.get('input', [])
if isinstance(input_data, str):
messages.append({'role': 'user', 'content': input_data})
elif isinstance(input_data, list):
_convert_input_items(input_data, messages)
result: JsonDict = {
'model': payload.get('model', ''),
'messages': messages,
'stream': payload.get('stream', False),
}
_copy_request_options(payload, result)
return result
def cc_to_responses_request(payload: JsonDict) -> JsonDict:
"""将 Chat Completions 请求转换为原生 Responses 请求。"""
instructions: list[str] = []
input_items: list[JsonDict] = []
for message in payload.get('messages', []):
_append_responses_input_item(message, instructions, input_items)
result: JsonDict = {
'model': payload.get('model', ''),
'input': input_items,
'stream': payload.get('stream', False),
}
if instructions:
result['instructions'] = '\n\n'.join(instructions)
_copy_responses_request_options(payload, result)
return result
# ═══════════════════════════════════════════════════════════
# 非流式响应转换: CC → Responses
# ═══════════════════════════════════════════════════════════
def cc_to_responses(cc_resp: JsonDict, model: str = '') -> JsonDict:
"""将 Chat Completions 响应转换为 Responses 格式。"""
choice = (cc_resp.get('choices') or [{}])[0]
message = choice.get('message') or {}
finish_reason = choice.get('finish_reason', 'stop')
return {
'id': cc_resp.get('id', gen_id('resp_')),
'object': 'response',
'status': _response_status_from_finish_reason(finish_reason),
'model': model or cc_resp.get('model', ''),
'output': _build_responses_output(message),
'usage': _build_responses_usage(cc_resp.get('usage', {})),
}
def responses_to_cc_response(response_data: JsonDict, model: str = '') -> JsonDict:
"""将原生 Responses 非流式响应转换为 Chat Completions 响应。"""
output_items = response_data.get('output', [])
content_text, reasoning_text, tool_calls = _collect_cc_parts_from_responses_output(output_items)
finish_reason = _cc_finish_reason_from_responses(response_data, tool_calls)
message = {
'role': 'assistant',
'content': content_text or None,
}
if reasoning_text:
message['reasoning_content'] = reasoning_text
if tool_calls:
message['tool_calls'] = tool_calls
usage = response_data.get('usage', {})
return {
'id': response_data.get('id', gen_id('chatcmpl-')),
'object': 'chat.completion',
'model': model or response_data.get('model', ''),
'choices': [{
'index': 0,
'message': message,
'finish_reason': finish_reason,
}],
'usage': {
'prompt_tokens': usage.get('input_tokens', 0),
'completion_tokens': usage.get('output_tokens', 0),
'total_tokens': usage.get('total_tokens', 0),
},
}
# ═══════════════════════════════════════════════════════════
# 流式转换器: CC chunks / Anthropic SSE → Responses SSE
# ═══════════════════════════════════════════════════════════
@dataclass
class _ToolBuffer:
"""缓存单个工具调用的流式状态。
Responses 风格的 function_call 会把名称、参数增量和完成时机拆散在多个事件里,
转换器需要用这个缓冲结构暂存工具标识与累计参数,便于后续按顺序补齐事件。
"""
name: str
args: str
call_id: str
fc_id: str
class ResponsesStreamConverter:
"""有状态转换器:将 CC 流式 chunk、Anthropic SSE 或原生 Responses SSE 转换为 Responses SSE。
这个类是 `/v1/responses` 流式链路里的核心状态机,负责把不同来源的增量统一整理成
Responses 风格的事件序列。之所以必须做成有状态类,而不是简单的事件映射函数,是因为:
- Responses 协议按“输出项生命周期”发事件,而不是只按文本顺序吐 token
- 思考内容、普通文本、工具调用三类输出会彼此交错,需要维护关闭顺序
- 工具调用的名称、参数增量、完成事件分散在多个事件里,必须用缓冲区重组
- 同一个转换器既要处理 CC chunk也要处理 Anthropic SSE 和原生 Responses SSE
这里内部主要维护三组状态:
1. reasoning 输出项缓冲
2. assistant 文本输出项缓冲
3. function_call 输出项缓冲
最终目标是保证前端或调用方看到的 Responses 事件顺序稳定、字段完整、状态闭合。
"""
def __init__(self, response_id: str | None = None, model: str = ''):
"""初始化 Responses 流式状态机所需的各类缓冲区与标识。"""
self.resp_id = response_id or gen_id('resp_')
self.model = model
self._rs_buf = ''
self._rs_started = False
self._rs_closed = False
self._rs_id = gen_id('rs_')
self._text_buf = ''
self._text_started = False
self._text_closed = False
self._msg_id = gen_id('msg_')
self._tools: dict[int, _ToolBuffer] = {}
self._output_items: list[JsonDict] = []
self._finished = False
self._input_tokens = 0
def start_events(self) -> list[str]:
"""生成 Responses 流式生命周期的起始事件。
调用方在真正转发上游流之前应先发出这个事件,确保前端先拿到一个
`response.created`,后续再逐步追加 output_item / delta / completed。
"""
return [self._sse('response.created', {
'id': self.resp_id,
'object': 'response',
'status': 'in_progress',
'model': self.model,
'output': [],
})]
def process_cc_chunk(self, chunk: JsonDict) -> list[str]:
"""处理单个 Chat Completions chunk。
这个入口用于“先转成 CC再转成 Responses”的链路。它会把一个 chunk 中的
reasoning、文本、tool_calls 和 finish_reason 依次拆解成 Responses 事件。
"""
events: list[str] = []
usage = chunk.get('usage')
for choice in chunk.get('choices') or []:
events.extend(self._process_cc_choice(choice, usage))
return events
def process_anthropic_event(self, event_type: str, event_data: JsonDict) -> list[str]:
"""直接处理 Anthropic SSE 事件。
这里故意跳过 CC 中间态,是为了减少一次协议重组,尽量保留 Anthropic 原始事件
的时序关系,同时降低流式转换的中间状态复杂度。
"""
if event_type == 'message_start':
return self._handle_anthropic_message_start(event_data)
if event_type == 'content_block_start':
return self._handle_anthropic_content_block_start(event_data)
if event_type == 'content_block_delta':
return self._handle_anthropic_content_block_delta(event_data)
if event_type == 'message_delta':
return self._handle_anthropic_message_delta(event_data)
return []
def finalize(self) -> list[str]:
"""在上游流自然结束但尚未显式完成时,补发收尾事件。
有些后端不会补齐所有 completed 事件,这里统一把尚未关闭的 reasoning / text /
tool_call 项收尾,避免前端看到半开的输出项。
"""
if self._finished:
return []
self._finished = True
return self._finish_stream('stop', None)
def process_responses_event(self, event_type: str, event_data: JsonDict) -> list[str]:
"""处理上游原生 Responses SSE 事件。
当当前链路本身就是 `/v1/responses -> /v1/responses` 时,这里主要做轻量重写:
保持事件结构不变,只把顶层模型名改成 Cursor 侧看到的展示模型名。
"""
if event_type == 'response.created':
return [self._sse(event_type, self._rewrite_top_level_model(event_data))]
if event_type == 'response.completed':
self._finished = True
return [self._sse(event_type, self._rewrite_top_level_model(event_data))]
return [self._sse(event_type, event_data)]
def _process_cc_choice(self, choice: JsonDict, usage: Any) -> list[str]:
"""处理单个 CC choice。
同一个 chunk 里可能同时携带文本增量、思考增量、工具调用增量和结束原因;
这里按 Responses 协议要求的输出项顺序,把它们拆成一组更细粒度的 SSE 事件。
"""
events: list[str] = []
delta = choice.get('delta') or {}
finish_reason = choice.get('finish_reason')
if delta.get('reasoning_content'):
events.extend(self._append_reasoning_delta(delta['reasoning_content']))
if delta.get('content') is not None and delta['content'] != '':
events.extend(self._append_text_delta(delta['content']))
for tool_call in delta.get('tool_calls') or []:
events.extend(self._on_tool_call(tool_call))
if finish_reason and not self._finished:
self._finished = True
events.extend(self._finish_stream(finish_reason, usage))
return events
def _handle_anthropic_message_start(self, event_data: JsonDict) -> list[str]:
"""记录输入令牌统计。
Anthropic 会在消息开始事件里给出 input_tokens这里先缓存下来等消息结束时再与
output_tokens 合并成 Responses 需要的 usage 结构。
"""
usage = event_data.get('message', {}).get('usage', {})
self._input_tokens = usage.get('input_tokens', 0)
return []
def _handle_anthropic_content_block_start(self, event_data: JsonDict) -> list[str]:
"""处理 Anthropic 内容块起始事件。
不同 block 类型会开启不同的 Responses 输出项:
- thinking → reasoning 项
- text → message 项
- tool_use → function_call 项
"""
block = event_data.get('content_block', {})
block_type = block.get('type', '')
if block_type == 'thinking':
return self._ensure_reasoning_started()
if block_type == 'text':
return self._ensure_text_started()
if block_type == 'tool_use':
return self._start_tool_from_block(block)
return []
def _handle_anthropic_content_block_delta(self, event_data: JsonDict) -> list[str]:
"""处理 Anthropic 内容块增量事件。
这里负责把 thinking_delta / text_delta / input_json_delta 映射成 Responses 的
summary_text.delta / output_text.delta / function_call_arguments.delta。
"""
delta = event_data.get('delta', {})
delta_type = delta.get('type', '')
if delta_type == 'thinking_delta' and delta.get('thinking'):
return self._append_reasoning_delta(delta['thinking'])
if delta_type == 'text_delta' and delta.get('text'):
return self._append_text_delta(delta['text'])
if delta_type == 'input_json_delta' and delta.get('partial_json') and self._tools:
index = max(self._tools.keys())
return self._append_tool_arguments(index, delta['partial_json'])
return []
def _handle_anthropic_message_delta(self, event_data: JsonDict) -> list[str]:
"""处理 Anthropic 消息结束事件。
当上游发出 stop_reason 后,这里会把当前缓冲区统一收尾,并补出最终的 usage 和
`response.completed` 事件。
"""
if self._finished:
return []
delta = event_data.get('delta', {})
usage = event_data.get('usage', {})
finish_reason = _map_anthropic_stop_reason(delta.get('stop_reason', 'end_turn'))
self._finished = True
usage_payload = {
'input_tokens': self._input_tokens,
'output_tokens': usage.get('output_tokens', 0),
'total_tokens': self._input_tokens + usage.get('output_tokens', 0),
}
return self._finish_stream(finish_reason, usage_payload)
def _ensure_reasoning_started(self) -> list[str]:
"""确保 reasoning 输出项已经创建。
Responses 协议要求先有 `output_item.added`,后面才能合法发送 reasoning 的 delta。
所以这里负责幂等地创建 reasoning 项。
"""
if self._rs_started:
return []
self._rs_started = True
return [self._sse('response.output_item.added', {
'type': 'reasoning',
'id': self._rs_id,
'summary': [],
})]
def _append_reasoning_delta(self, text: str) -> list[str]:
"""向 reasoning 输出项追加思考内容增量。"""
events = self._ensure_reasoning_started()
self._rs_buf += text
events.append(self._sse('response.reasoning_summary_text.delta', {
'type': 'summary_text',
'delta': text,
}))
return events
def _append_text_delta(self, text: str) -> list[str]:
"""向 assistant 文本输出项追加文本增量。"""
events = self._ensure_text_started()
self._text_buf += text
events.append(self._sse('response.output_text.delta', {
'type': 'output_text',
'delta': text,
}))
return events
def _on_tool_call(self, tool_call: JsonDict) -> list[str]:
"""处理来自 CC chunk 的工具调用增量,并映射成 Responses function_call 事件。"""
events: list[str] = []
index = tool_call.get('index', 0)
function_data = tool_call.get('function') or {}
if index not in self._tools:
call_id = tool_call.get('id', gen_id('call_'))
name = function_data.get('name', '')
events.extend(self._start_tool(index=index, call_id=call_id, name=name))
if function_data.get('name'):
self._tools[index].name = function_data['name']
if function_data.get('arguments', ''):
events.extend(self._append_tool_arguments(index, function_data['arguments']))
return events
def _ensure_text_started(self) -> list[str]:
"""确保 assistant 文本输出项已开启,并在必要时先关闭 reasoning 项。"""
events: list[str] = []
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if not self._text_started:
self._text_started = True
events.append(self._sse('response.output_item.added', {
'type': 'message',
'id': self._msg_id,
'status': 'in_progress',
'role': 'assistant',
'content': [],
}))
events.append(self._sse('response.content_part.added', {
'type': 'output_text',
'text': '',
}))
return events
def _start_tool_from_block(self, block: JsonDict) -> list[str]:
"""根据 Anthropic `tool_use` block 创建对应的 function_call 输出项。"""
return self._start_tool(
index=len(self._tools),
call_id=block.get('id', gen_id('toolu_')),
name=block.get('name', ''),
)
def _start_tool(self, *, index: int, call_id: str, name: str) -> list[str]:
"""开启一个新的 Responses function_call 输出项,并关闭前置输出段。"""
events: list[str] = []
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if self._text_started and not self._text_closed:
events.extend(self._close_text())
function_call_id = gen_id('fc_')
self._tools[index] = _ToolBuffer(
name=name,
args='',
call_id=call_id,
fc_id=function_call_id,
)
events.append(self._sse('response.output_item.added', {
'type': 'function_call',
'id': function_call_id,
'status': 'in_progress',
'call_id': call_id,
'name': name,
'arguments': '',
}))
return events
def _append_tool_arguments(self, index: int, arguments_delta: str) -> list[str]:
"""向指定 function_call 缓冲区追加参数增量,并发出对应 SSE 事件。"""
buffer = self._tools[index]
buffer.args += arguments_delta
return [self._sse('response.function_call_arguments.delta', {
'type': 'function_call',
'delta': arguments_delta,
})]
def _close_reasoning(self) -> list[str]:
"""关闭 reasoning 输出项,并补发 summary 完成事件。"""
if self._rs_closed:
return []
self._rs_closed = True
reasoning_item = {
'type': 'reasoning',
'id': self._rs_id,
'summary': [{'type': 'summary_text', 'text': self._rs_buf}],
}
self._output_items.append(reasoning_item)
return [
self._sse('response.reasoning_summary_text.done', {
'type': 'summary_text',
'text': self._rs_buf,
}),
self._sse('response.output_item.done', reasoning_item),
]
def _close_text(self) -> list[str]:
"""关闭 assistant 文本输出项,并补发文本完成事件。"""
if self._text_closed:
return []
self._text_closed = True
message_item = {
'type': 'message',
'id': self._msg_id,
'status': 'completed',
'role': 'assistant',
'content': [{'type': 'output_text', 'text': self._text_buf}],
}
self._output_items.append(message_item)
return [
self._sse('response.output_text.done', {
'type': 'output_text',
'text': self._text_buf,
}),
self._sse('response.output_item.done', message_item),
]
def _finish_stream(self, finish_reason: str, usage: Any) -> list[str]:
"""统一关闭所有未完成输出项,并发出最终 completed 事件。
这是整个状态机的统一收口点:无论结束来源是 CC、Anthropic 还是手动 finalize
最终都通过这里补齐 reasoning / text / tool_call 的 done 事件。
"""
events: list[str] = []
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if self._text_started and not self._text_closed:
events.extend(self._close_text())
events.extend(self._finish_tool_calls())
usage_data = usage if isinstance(usage, dict) else {}
events.append(self._sse('response.completed', {
'id': self.resp_id,
'object': 'response',
'status': _response_status_from_finish_reason(finish_reason),
'model': self.model,
'output': self._output_items,
'usage': usage_data,
}))
return events
def _finish_tool_calls(self) -> list[str]:
"""关闭所有尚未完成的 function_call 输出项。"""
events: list[str] = []
for index in sorted(self._tools.keys()):
buffer = self._tools[index]
events.append(self._sse('response.function_call_arguments.done', {
'type': 'function_call',
'arguments': buffer.args,
}))
function_call_item = {
'type': 'function_call',
'id': buffer.fc_id,
'status': 'completed',
'call_id': buffer.call_id,
'name': buffer.name,
'arguments': buffer.args,
}
events.append(self._sse('response.output_item.done', function_call_item))
self._output_items.append(function_call_item)
return events
def _sse(self, event_type: str, data: JsonDict) -> str:
"""将事件类型与负载编码为标准 Responses SSE 字符串。"""
return f'event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n'
def _rewrite_top_level_model(self, payload: JsonDict) -> JsonDict:
"""在保持上游事件结构不变的前提下回填展示模型名。
原生 Responses 事件中 model 可能在顶层或嵌套在 response 子对象中,
两处都需要改写,避免界面显示上游原始模型名。
"""
if not isinstance(payload, dict):
return payload
copied = dict(payload)
if copied.get('model'):
copied['model'] = self.model or copied['model']
if isinstance(copied.get('response'), dict) and copied['response'].get('model'):
copied['response'] = dict(copied['response'])
copied['response']['model'] = self.model or copied['response']['model']
return copied
class ResponsesToCCStreamConverter:
"""将原生 Responses SSE 事件转换为 Chat Completions chunk。
当上游只支持 `/v1/responses` 时,聊天补全接口需要把 Responses 世界中的事件重新映射回
OpenAI Chat Completions 的流式 chunk保证 Cursor 仍然能按聊天补全协议消费结果。
这个类主要负责三件事:
1. 把 Responses 的文本与思考增量映射为 `content` / `reasoning_content` delta
2. 把 function_call 项与 arguments 增量重组为 `tool_calls` 增量
3. 在 response.completed 时补出 finish_reason 与 usage
"""
def __init__(self, request_id: str | None = None, model: str = ''):
"""初始化 Responses → Chat Completions 流式转换所需的状态。"""
self._id = request_id or gen_id('chatcmpl-')
self._model = model
self._tool_index = 0
self._tool_slots: dict[str, int] = {}
self._usage: JsonDict = {}
def process_event(self, event_type: str, event_data: JsonDict) -> list[JsonDict]:
"""处理单个 Responses 事件并产出一个或多个 CC chunk。
这里按事件类型分发,而不是一次性拼完整响应,是为了让聊天补全调用方仍然能保留
原生的流式体验。
"""
if event_type == 'response.created':
return [self._make_chunk(delta={'role': 'assistant', 'content': ''})]
if event_type == 'response.output_text.delta':
return [self._make_chunk(delta={'content': event_data.get('delta', '')})]
if event_type == 'response.reasoning_summary_text.delta':
return [self._make_chunk(delta={'reasoning_content': event_data.get('delta', '')})]
if event_type == 'response.output_item.added':
return self._handle_output_item_added(event_data)
if event_type == 'response.function_call_arguments.delta':
return self._handle_function_arguments_delta(event_data)
if event_type == 'response.completed':
return self._handle_completed(event_data)
return []
def _handle_output_item_added(self, event_data: JsonDict) -> list[JsonDict]:
"""处理 Responses 的 output_item.added 事件。
上游事件结构为 {type, item: {type, call_id, name, ...}, output_index}
实际的 function_call 信息在 item 子对象中。
"""
item = event_data.get('item') or event_data
if item.get('type') != 'function_call':
return []
call_id = item.get('call_id') or gen_id('call_')
index = self._tool_slots.setdefault(call_id, self._tool_index)
if index == self._tool_index:
self._tool_index += 1
return [self._make_chunk(delta={
'tool_calls': [{
'index': index,
'id': call_id,
'type': 'function',
'function': {
'name': item.get('name', ''),
'arguments': '',
},
}]
})]
def _handle_function_arguments_delta(self, event_data: JsonDict) -> list[JsonDict]:
"""处理工具参数增量,并追加到最近一次打开的 tool_call 上。"""
if not self._tool_slots:
return []
index = max(self._tool_slots.values())
return [self._make_chunk(delta={
'tool_calls': [{
'index': index,
'function': {
'arguments': event_data.get('delta', ''),
},
}]
})]
def _handle_completed(self, event_data: JsonDict) -> list[JsonDict]:
"""处理 response.completed补出聊天补全流的最终收尾 chunk。
上游事件结构为 {type, response: {output, usage, ...}}
实际的 output/usage 在 response 子对象中。
"""
resp = event_data.get('response') or event_data
self._usage = resp.get('usage', {}) or {}
finish_reason = 'tool_calls' if any(
isinstance(item, dict) and item.get('type') == 'function_call'
for item in resp.get('output', [])
) else 'stop'
chunk = self._make_chunk(delta={}, finish_reason=finish_reason)
chunk['usage'] = {
'prompt_tokens': self._usage.get('input_tokens', 0),
'completion_tokens': self._usage.get('output_tokens', 0),
'total_tokens': self._usage.get('total_tokens', 0),
}
return [chunk]
def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict:
"""构造标准 Chat Completions chunk。"""
choice: JsonDict = {'index': 0, 'delta': delta}
if finish_reason:
choice['finish_reason'] = finish_reason
return {
'id': self._id,
'object': 'chat.completion.chunk',
'model': self._model,
'choices': [choice],
}
# ═══════════════════════════════════════════════════════════
# 请求转换辅助
# ═══════════════════════════════════════════════════════════
def _copy_request_options(payload: JsonDict, result: JsonDict) -> None:
"""将 Responses 请求中的通用选项复制到 CC 请求体。"""
if 'tools' in payload:
result['tools'] = _convert_tools(payload['tools'])
for key in ('temperature', 'top_p'):
if key in payload:
result[key] = payload[key]
if 'max_output_tokens' in payload:
result['max_tokens'] = payload['max_output_tokens']
if 'tool_choice' in payload:
result['tool_choice'] = payload['tool_choice']
def _copy_responses_request_options(payload: JsonDict, result: JsonDict) -> None:
"""将聊天补全请求中的通用选项复制到原生 Responses 请求体。"""
if 'tools' in payload:
result['tools'] = _convert_cc_tools_to_responses(payload['tools'])
for key in ('temperature', 'top_p', 'tool_choice'):
if key in payload:
result[key] = payload[key]
if 'max_tokens' in payload:
result['max_output_tokens'] = payload['max_tokens']
def _append_responses_input_item(
message: Any,
instructions: list[str],
input_items: list[JsonDict],
) -> None:
"""将单条 Chat Completions 消息追加为 Responses `input` 项。
尽量使用 EasyInputMessage 格式({role, content})以减少 token 开销,
提高上游 prompt caching 的前缀匹配命中率。
"""
if not isinstance(message, dict):
return
role = message.get('role', '')
content = message.get('content')
if role == 'system':
text = _content_to_text(content)
if text:
instructions.append(text)
return
if role == 'tool':
input_items.append({
'type': 'function_call_output',
'call_id': message.get('tool_call_id', ''),
'output': _stringify_output(content),
})
return
text = _content_to_text(content)
has_tool_calls = bool(message.get('tool_calls'))
if role == 'assistant' and has_tool_calls:
if text:
input_items.append({
'type': 'message',
'role': 'assistant',
'content': [{'type': 'output_text', 'text': text}],
})
for tool_call in message.get('tool_calls') or []:
input_items.append(_build_responses_function_call_item(tool_call))
else:
input_items.append({'role': role or 'user', 'content': text or ''})
def _convert_input_items(items: list[Any], messages: list[JsonDict]) -> None:
"""将 Responses `input` 数组重建为 Chat Completions `messages` 列表。"""
index = 0
pending_reasoning: str | None = None
while index < len(items):
item = items[index]
if isinstance(item, str):
messages.append({'role': 'user', 'content': item})
index += 1
continue
if not isinstance(item, dict):
index += 1
continue
item_type = item.get('type', '')
role = item.get('role', '')
if item_type == 'reasoning':
pending_reasoning = _extract_reasoning_text(item)
index += 1
continue
if role and not item_type:
msg: JsonDict = {
'role': role,
'content': _normalize_simple_content(item.get('content', '')),
}
if role == 'assistant' and pending_reasoning:
msg['reasoning_content'] = pending_reasoning
pending_reasoning = None
messages.append(msg)
index += 1
continue
if item_type == 'message':
consumed = _append_message_item(items, start=index, messages=messages)
if item.get('role') == 'assistant' and pending_reasoning and messages:
messages[-1]['reasoning_content'] = pending_reasoning
pending_reasoning = None
index += consumed
continue
if item_type == 'function_call':
if pending_reasoning and messages and messages[-1].get('role') == 'assistant':
messages[-1]['reasoning_content'] = pending_reasoning
pending_reasoning = None
_append_function_call_item(item, messages)
index += 1
continue
if item_type == 'function_call_output':
messages.append(_convert_function_call_output_item(item))
index += 1
continue
if role:
messages.append({'role': role, 'content': str(item.get('content', ''))})
index += 1
def _append_message_item(items: list[Any], *, start: int, messages: list[JsonDict]) -> int:
"""将一个 message 项及其后续连续 function_call 项合并成一条消息。"""
item = items[start]
role = item.get('role', 'assistant')
content = _extract_text(item.get('content', []))
message: JsonDict = {'role': role, 'content': content or ''}
if role == 'assistant':
tool_calls, consumed = _collect_function_calls(items, start + 1)
if tool_calls:
message['tool_calls'] = tool_calls
if not message['content']:
message['content'] = None
messages.append(message)
return 1 + consumed
messages.append(message)
return 1
def _append_function_call_item(item: JsonDict, messages: list[JsonDict]) -> None:
"""将独立的 Responses `function_call` 项挂接到最近的 assistant 消息上。"""
tool_call = _build_cc_tool_call(item)
if messages and messages[-1]['role'] == 'assistant':
messages[-1].setdefault('tool_calls', []).append(tool_call)
if not messages[-1].get('content'):
messages[-1]['content'] = None
return
messages.append({'role': 'assistant', 'content': None, 'tool_calls': [tool_call]})
def _convert_function_call_output_item(item: JsonDict) -> JsonDict:
"""将 Responses 的 `function_call_output` 项转换为 OpenAI `tool` 消息。"""
output = item.get('output', '')
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
return {
'role': 'tool',
'tool_call_id': item.get('call_id', ''),
'content': output,
}
def _normalize_simple_content(content: Any) -> str:
"""将简单 content 载荷规范化为纯文本字符串。"""
if isinstance(content, list):
return _extract_text(content) or ''
return str(content) if content is not None else ''
def _collect_function_calls(items: list[Any], start: int) -> tuple[list[JsonDict], int]:
"""收集从指定位置开始连续出现的 `function_call` 项。"""
tool_calls: list[JsonDict] = []
index = start
while index < len(items):
next_item = items[index]
if isinstance(next_item, dict) and next_item.get('type') == 'function_call':
tool_calls.append(_build_cc_tool_call(next_item))
index += 1
else:
break
return tool_calls, index - start
def _build_cc_tool_call(item: JsonDict) -> JsonDict:
"""将单个 Responses `function_call` 项转换为 CC `tool_call` 结构。"""
return {
'id': item.get('call_id') or gen_id('call_'),
'type': 'function',
'function': {
'name': item.get('name', ''),
'arguments': item.get('arguments', '{}'),
},
}
# ═══════════════════════════════════════════════════════════
# 非流式响应转换辅助
# ═══════════════════════════════════════════════════════════
def _build_responses_output(message: JsonDict) -> list[JsonDict]:
"""将单条聊天补全消息展开为 Responses `output` 数组。"""
output: list[JsonDict] = []
if message.get('reasoning_content'):
output.append(_make_reasoning_output_item(message['reasoning_content']))
if message.get('content'):
output.append(_make_message_output_item(message['content']))
for tool_call in message.get('tool_calls') or []:
output.append(_make_function_call_output_item(tool_call))
return output
def _make_reasoning_output_item(text: str) -> JsonDict:
"""构造单个 Responses reasoning 输出项。"""
return {
'type': 'reasoning',
'id': gen_id('rs_'),
'summary': [{'type': 'summary_text', 'text': text}],
}
def _make_message_output_item(text: str) -> JsonDict:
"""构造单个 Responses message 输出项。"""
return {
'type': 'message',
'id': gen_id('msg_'),
'status': 'completed',
'role': 'assistant',
'content': [{'type': 'output_text', 'text': text}],
}
def _make_function_call_output_item(tool_call: JsonDict) -> JsonDict:
"""构造单个 Responses function_call 输出项。"""
function_data = tool_call.get('function') or {}
return {
'type': 'function_call',
'id': gen_id('fc_'),
'status': 'completed',
'call_id': tool_call.get('id', gen_id('call_')),
'name': function_data.get('name', ''),
'arguments': function_data.get('arguments', '{}'),
}
def _build_responses_usage(usage: JsonDict) -> JsonDict:
"""将 Chat Completions 的 usage 字段映射为 Responses usage 结构。"""
return {
'input_tokens': usage.get('prompt_tokens', 0),
'output_tokens': usage.get('completion_tokens', 0),
'total_tokens': usage.get('total_tokens', 0),
}
def _collect_cc_parts_from_responses_output(output_items: Any) -> tuple[str, str, list[JsonDict]]:
"""从 Responses `output` 中提取文本、思考摘要和工具调用。"""
content_text = ''
reasoning_text = ''
tool_calls: list[JsonDict] = []
if not isinstance(output_items, list):
return content_text, reasoning_text, tool_calls
for item in output_items:
if not isinstance(item, dict):
continue
item_type = item.get('type', '')
if item_type == 'message':
content_text += _extract_text(item.get('content', []))
elif item_type == 'reasoning':
reasoning_text += _extract_reasoning_text(item)
elif item_type == 'function_call':
tool_calls.append(_build_cc_tool_call_from_responses_output(item, index=len(tool_calls)))
return content_text, reasoning_text, tool_calls
def _extract_reasoning_text(item: JsonDict) -> str:
"""从 Responses reasoning 项中拼接出完整的摘要文本。"""
summary = item.get('summary', [])
if not isinstance(summary, list):
return ''
texts: list[str] = []
for part in summary:
if isinstance(part, dict) and part.get('type') == 'summary_text':
texts.append(part.get('text', ''))
return ''.join(texts)
def _build_cc_tool_call_from_responses_output(item: JsonDict, *, index: int) -> JsonDict:
"""将 Responses `function_call` 输出项转换为 CC `tool_call`。"""
return {
'index': index,
'id': item.get('call_id') or gen_id('call_'),
'type': 'function',
'function': {
'name': item.get('name', ''),
'arguments': item.get('arguments', '{}'),
},
}
def _cc_finish_reason_from_responses(response_data: JsonDict, tool_calls: list[JsonDict]) -> str:
"""根据 Responses 完成状态推断聊天补全的 finish_reason。"""
if tool_calls:
return 'tool_calls'
if response_data.get('status') == 'incomplete':
return 'length'
return 'stop'
def _response_status_from_finish_reason(finish_reason: str) -> str:
"""将聊天补全 finish_reason 映射为 Responses 顶层状态。"""
return 'incomplete' if finish_reason == 'length' else 'completed'
def _map_anthropic_stop_reason(stop_reason: str) -> str:
"""将 Anthropic 的 stop_reason 映射为聊天补全风格的结束原因。"""
return {'tool_use': 'tool_calls', 'max_tokens': 'length'}.get(stop_reason, 'stop')
# ═══════════════════════════════════════════════════════════
# 通用辅助
# ═══════════════════════════════════════════════════════════
def _extract_text(content: Any) -> str:
"""从多种内容块结构中提取并拼接纯文本。"""
if isinstance(content, str):
return content
if not isinstance(content, list):
return str(content) if content else ''
texts: list[str] = []
for part in content:
if isinstance(part, str):
texts.append(part)
elif isinstance(part, dict):
part_type = part.get('type', '')
if part_type in ('output_text', 'input_text', 'text'):
texts.append(part.get('text', ''))
elif part_type == 'refusal':
texts.append(part.get('refusal', ''))
return '\n'.join(texts) if texts else ''
def _content_to_text(content: Any) -> str:
"""将任意 content 载荷转换为单个字符串。"""
if isinstance(content, str):
return content
if isinstance(content, list):
return _extract_text(content)
return str(content) if content is not None else ''
def _content_to_responses_parts(content: Any, role: str = 'user') -> list[JsonDict]:
"""将普通消息内容转换为 Responses 内容块数组。
assistant 消息使用 output_text其他角色使用 input_text。
"""
if isinstance(content, list):
text = _extract_text(content)
else:
text = _content_to_text(content)
if not text:
return []
part_type = 'output_text' if role == 'assistant' else 'input_text'
return [{'type': part_type, 'text': text}]
def _stringify_output(content: Any) -> str:
"""将工具输出统一序列化为字符串,便于放入 `function_call_output`。"""
if isinstance(content, str):
return content
if content is None:
return ''
return json.dumps(content, ensure_ascii=False) if not isinstance(content, str) else content
def _build_responses_function_call_item(tool_call: JsonDict) -> JsonDict:
"""将 CC `tool_call` 结构转换为 Responses `function_call` 输入项。"""
function_data = tool_call.get('function') or {}
return {
'type': 'function_call',
'call_id': tool_call.get('id', gen_id('call_')),
'name': function_data.get('name', ''),
'arguments': function_data.get('arguments', '{}'),
}
def _convert_cc_tools_to_responses(tools: Any) -> list[JsonDict]:
"""将聊天补全风格的工具定义转换为 Responses `tools` 列表。"""
if not isinstance(tools, list):
return []
result: list[JsonDict] = []
for tool in tools:
if not isinstance(tool, dict):
continue
if tool.get('type') == 'function' and 'function' in tool:
function_data = tool['function']
result.append({
'type': 'function',
'name': function_data.get('name', ''),
'description': function_data.get('description', ''),
'parameters': function_data.get('parameters', {'type': 'object', 'properties': {}}),
})
elif tool.get('type') == 'function':
result.append(tool)
return result
def _convert_tools(tools: Any) -> list[JsonDict]:
"""规范化 Responses 请求中的工具定义列表。"""
if not isinstance(tools, list):
return []
result: list[JsonDict] = []
for tool in tools:
converted = _convert_tool_definition(tool)
if converted is not None:
result.append(converted)
return result
def _convert_tool_definition(tool: Any) -> JsonDict | None:
"""将扁平工具定义补成标准 Chat Completions `function` 工具格式。"""
if not isinstance(tool, dict):
return None
if tool.get('type') != 'function':
return None
if 'function' in tool:
return tool
return {
'type': 'function',
'function': {
'name': tool.get('name', ''),
'description': tool.get('description', ''),
'parameters': tool.get('parameters', {'type': 'object', 'properties': {}}),
},
}