diff --git a/README.md b/README.md index 6cd334c..c73ad0e 100644 --- a/README.md +++ b/README.md @@ -11,22 +11,32 @@ Cursor 根据模型名发送不同格式的请求: | `claude-sonnet-*`、`glm-*` | `/v1/chat/completions` (OpenAI CC) | | `gpt-*`、`claude-opus-*` | `/v1/responses` (OpenAI Responses) | -而中转站通常只支持 `/v1/chat/completions` 或 `/v1/messages`。 +而中转站通常只支持 `/v1/chat/completions`、`/v1/messages` 或 `/v1/responses`。 本项目在中间做协议转换,**不管 Cursor 发什么格式,都能正确转发到中转站;不管中转站返回什么格式,都让 Cursor 能正确接收**。 ## 架构 +可以把这个项目理解成“三种入口协议 + 三种上游后端协议”的协议桥: + +```text +Cursor API 2 Cursor 中转站 + │ │ │ + ├─ /v1/chat/completions ─────→ chat.py ─────┬─ openai 后端 ─────────→ /v1/chat/completions + │ ├─ anthropic 后端 ─────→ /v1/messages + │ └─ responses 后端 ─────→ /v1/responses + │ + ├─ /v1/responses ────────────→ responses.py ─┬─ openai 后端 ───────→ /v1/chat/completions + │ ├─ anthropic 后端 ───→ /v1/messages + │ └─ responses 后端 ───→ /v1/responses + │ + └─ /v1/messages ─────────────→ messages.py ─────────────────────────→ /v1/messages ``` -Cursor API 2 Cursor 中转站 - │ │ │ - ├─ /v1/chat/completions ──→ chat.py ─┬─ openai 后端 ────────→ /v1/chat/completions - │ └─ anthropic 后端 ────→ /v1/messages - │ │ - ├─ /v1/responses ──────→ responses.py → 转为 CC → 同上 → 转回 Responses - │ │ - └─ /v1/messages ───────→ messages.py → 直接透传 ────────────→ /v1/messages -``` + +其中: +- `chat.py` 负责接住 Cursor 的 Chat Completions 请求,并根据模型映射决定发往哪种后端协议 +- `responses.py` 负责接住 Cursor 的 Responses 请求,并在需要时做 `Responses ↔ CC` 或 `Responses ↔ Messages` 桥接 +- `messages.py` 负责 Anthropic 原生消息的直通场景 ## 快速开始 @@ -70,11 +80,13 @@ docker compose up -d - **Cursor 模型名** — 在 Cursor 自定义模型中填入的名称 - **上游模型名** — 发送到中转站的实际模型名 -- **后端类型** — `openai` (CC 格式) / `anthropic` (Messages 格式) / `auto` (自动检测) +- **后端类型** — `openai` (CC 格式) / `anthropic` (Messages 格式) / `responses` (Responses 格式) / `auto` (自动检测) - **自定义地址/密钥** — 可选,覆盖全局设置,实现分流到不同中转站 **示例**:在 Cursor 中添加 `claude-sonnet-4-5-20250929`,映射到上游 `gpt-5.3-codex`,后端选 `openai`。Cursor 会用 CC 格式发送请求,代理直接转发到中转站的 `/v1/chat/completions`。 +如果你的中转站只支持 `/v1/responses`,可以把后端类型选成 `responses`。此时代理会把 Cursor 发来的请求转换或透传为 Responses 格式,再发往中转站的 `/v1/responses`。 + > **提示**:使用 Claude 风格的模型名(如 `claude-sonnet-4-5-20250929`)可以让 Cursor 显示思考过程(thinking)。 ### 在 Cursor 中配置 @@ -86,26 +98,27 @@ docker compose up -d ## 项目结构 -``` +```text api2cursor/ -├── start.py # 启动入口 -├── app.py # Flask 应用工厂 -├── config.py # 环境变量配置 -├── settings.py # 持久化配置管理 -├── routes/ # 路由层 -│ ├── chat.py # /v1/chat/completions -│ ├── responses.py # /v1/responses -│ ├── messages.py # /v1/messages (透传) -│ └── admin.py # 管理面板 + API -├── adapters/ # 适配层(格式转换) -│ ├── openai_anthropic.py# CC ↔ Messages 双向转换 -│ ├── openai_fixer.py # OpenAI 请求/响应修复 -│ └── responses_adapter.py# Responses ↔ CC 双向转换 -├── utils/ # 工具层 -│ ├── http.py # 请求转发、SSE 解析 -│ ├── tool_fixer.py # 工具参数修复 -│ └── think_tag.py # 标签提取 -└── static/ # 管理面板前端 +├── start.py # 启动入口 +├── app.py # Flask 应用工厂 +├── config.py # 环境变量配置 +├── settings.py # 持久化配置管理 +├── routes/ # 路由层:按对外 API 入口拆分 +│ ├── chat.py # /v1/chat/completions +│ ├── responses.py # /v1/responses +│ ├── messages.py # /v1/messages(透传) +│ ├── admin.py # 管理面板 + API +│ └── common.py # 路由公共上下文、日志与 SSE 辅助 +├── adapters/ # 适配层:按协议桥接职责拆分 +│ ├── cc_anthropic_adapter.py # Chat Completions ↔ Anthropic Messages +│ ├── openai_compat_fixer.py # OpenAI / Chat Completions 兼容修复 +│ └── responses_cc_adapter.py # Responses ↔ Chat Completions + 原生 Responses 流桥接 +├── utils/ # 通用工具层 +│ ├── http.py # 请求转发、SSE 解析 +│ ├── tool_fixer.py # 工具参数修复 +│ └── think_tag.py # 标签提取 +└── static/ # 管理面板前端 ├── admin.html ├── admin.css └── admin.js diff --git a/adapters/cc_anthropic_adapter.py b/adapters/cc_anthropic_adapter.py new file mode 100644 index 0000000..55d7107 --- /dev/null +++ b/adapters/cc_anthropic_adapter.py @@ -0,0 +1,561 @@ +"""OpenAI Chat Completions ↔ Anthropic Messages 格式转换 + +这个模块是项目里最核心的协议桥之一,负责在两套主流对话协议之间做双向适配: +- 请求方向:OpenAI Chat Completions → Anthropic Messages +- 响应方向:Anthropic Messages → OpenAI Chat Completions +- 流式方向:Anthropic SSE 事件 → OpenAI Chat Completions chunk + +这里的代码看起来会比普通字段映射更重,是因为它不仅要做字段重命名,还要处理: +- system 消息上提 +- tool_calls / tool_use 双向映射 +- tool 消息 / tool_result 双向映射 +- 图片块转换 +- 思考内容与流式工具参数的时序保留 +""" + +from __future__ import annotations + +import json +from typing import Any + +from utils.http import gen_id +from utils.tool_fixer import fix_anthropic_tool_use, normalize_args, repair_str_replace_args + +JsonDict = dict[str, Any] + + +# Anthropic stop_reason → OpenAI finish_reason +_STOP_REASON_MAP = { + 'end_turn': 'stop', + 'max_tokens': 'length', + 'tool_use': 'tool_calls', + 'stop_sequence': 'stop', +} + + +# ═══════════════════════════════════════════════════════════ +# 请求转换: CC → Messages +# ═══════════════════════════════════════════════════════════ + + +def cc_to_messages_request(payload: JsonDict) -> JsonDict: + """将 OpenAI Chat Completions 请求转换为 Anthropic Messages 请求。 + + 这一步不是简单替换字段名,而是主动把 OpenAI 世界中的几类特殊语义映射到 + Anthropic 世界: + - `system` 消息提取到顶层 `system` + - assistant 的 `tool_calls` 变成 `tool_use` 内容块 + - `tool` 角色消息变成 user 侧的 `tool_result` 内容块 + + 另外,这里会把相邻同角色消息做合并,因为 Anthropic 对消息角色交替的要求更严格。 + """ + messages = payload.get('messages', []) + anthropic_messages: list[JsonDict] = [] + system_parts: list[str] = [] + + for message in messages: + converted, system_text = _convert_request_message(message) + if system_text is not None: + system_parts.append(system_text) + continue + if converted is not None: + anthropic_messages.append(converted) + + anthropic_messages = _merge_same_role(anthropic_messages) + return _build_messages_request(payload, anthropic_messages, system_parts) + + +# ═══════════════════════════════════════════════════════════ +# 非流式响应转换: Messages → CC +# ═══════════════════════════════════════════════════════════ + + +def messages_to_cc_response(data: JsonDict, request_id: str | None = None) -> JsonDict: + """将 Anthropic Messages 非流式响应转换为 OpenAI CC 响应。""" + request_id = request_id or gen_id('chatcmpl-') + data = fix_anthropic_tool_use(data) + + content_text, reasoning_text, tool_calls = _collect_response_parts(data.get('content', [])) + message = _build_cc_message(content_text, reasoning_text, tool_calls) + usage = data.get('usage', {}) + + return { + 'id': request_id, + 'object': 'chat.completion', + 'model': data.get('model', 'claude'), + 'choices': [{ + 'index': 0, + 'message': message, + 'finish_reason': _STOP_REASON_MAP.get(data.get('stop_reason', 'end_turn'), 'stop'), + }], + 'usage': _build_cc_usage( + input_tokens=usage.get('input_tokens', 0), + output_tokens=usage.get('output_tokens', 0), + ), + } + + +# ═══════════════════════════════════════════════════════════ +# 流式响应转换: Anthropic SSE → CC chunks +# ═══════════════════════════════════════════════════════════ + + +class AnthropicStreamConverter: + """将 Anthropic SSE 事件逐个转换为 OpenAI Chat Completions chunk。 + + 之所以做成有状态转换器,而不是单纯的函数映射,是因为 Anthropic 的流式工具调用 + 会把名字、参数、结束信号拆散在多个事件中,而 OpenAI chunk 语义要求我们按顺序 + 组装出连续的 `tool_calls` 增量。 + + 这个类主要维护三类状态: + 1. 当前请求的 chunk ID + 2. 当前工具调用的索引位置 + 3. 输入 / 输出令牌统计 + + 最终目标是把 Anthropic 的事件流稳定映射成 Cursor 能直接消费的 CC chunk 流。 + """ + + def __init__(self, request_id: str | None = None): + self._id = request_id or gen_id('chatcmpl-') + self._tool_index = -1 + self._input_tokens = 0 + self._output_tokens = 0 + + def process_event(self, event_type: str, event_data: JsonDict) -> list[str]: + """处理单个 Anthropic SSE 事件。 + + 调用方会按事件顺序不断喂入 event/data,这里根据事件类型拆成一个或多个 CC chunk + 字符串,交给上层直接作为 SSE data 发送给 Cursor。 + """ + if event_type == 'message_start': + return self._handle_message_start(event_data) + if event_type == 'content_block_start': + return self._handle_content_block_start(event_data) + if event_type == 'content_block_delta': + return self._handle_content_block_delta(event_data) + if event_type == 'message_delta': + return self._handle_message_delta(event_data) + return [] + + def _handle_message_start(self, event_data: JsonDict) -> list[str]: + """处理消息开始事件,产出 assistant 角色起始 chunk。 + + 这个起始 chunk 很重要,因为 Cursor 侧通常会依赖首个带 role 的 chunk 来初始化 + 当前 assistant 消息。 + """ + message = event_data.get('message', {}) + self._input_tokens = message.get('usage', {}).get('input_tokens', 0) + + chunk = self._make_chunk(delta={'role': 'assistant', 'content': ''}) + if message.get('model'): + chunk['model'] = message['model'] + return [self._dump_chunk(chunk)] + + def _handle_content_block_start(self, event_data: JsonDict) -> list[str]: + """处理内容块开始事件。 + + 目前这里只需要显式处理 `tool_use`,因为文本和 thinking 的真正内容都在后续 delta + 事件里;而 tool_use 需要先开一个空 arguments 的 tool_call 槽位。 + """ + block = event_data.get('content_block', {}) + if block.get('type') != 'tool_use': + return [] + + self._tool_index += 1 + return [self._dump_chunk(self._make_chunk(delta={ + 'tool_calls': [{ + 'index': self._tool_index, + 'id': block.get('id', gen_id('toolu_')), + 'type': 'function', + 'function': { + 'name': block.get('name', ''), + 'arguments': '', + }, + }] + }))] + + def _handle_content_block_delta(self, event_data: JsonDict) -> list[str]: + """处理内容块增量事件。 + + Anthropic 会把文本、思考内容、工具参数拆成不同 delta 类型,这里要分别映射成 + OpenAI chunk 里的 `content`、`reasoning_content` 和 `tool_calls.function.arguments`。 + """ + delta = event_data.get('delta', {}) + delta_type = delta.get('type', '') + + if delta_type == 'text_delta' and delta.get('text'): + return [self._dump_chunk(self._make_chunk(delta={'content': delta['text']}))] + + if delta_type == 'thinking_delta' and delta.get('thinking'): + return [self._dump_chunk(self._make_chunk(delta={'reasoning_content': delta['thinking']}))] + + if delta_type == 'input_json_delta' and delta.get('partial_json'): + return [self._dump_chunk(self._make_chunk(delta={ + 'tool_calls': [{ + 'index': self._tool_index, + 'function': {'arguments': delta['partial_json']}, + }] + }))] + + return [] + + def _handle_message_delta(self, event_data: JsonDict) -> list[str]: + """处理消息收尾事件,补出 finish_reason 和 usage。 + + 当 Anthropic 发出 `message_delta` 时,说明这一轮 assistant 输出已经收束, + 这里会统一生成最后一个带 usage 的收尾 chunk。 + """ + delta = event_data.get('delta', {}) + usage = event_data.get('usage', {}) + self._output_tokens = usage.get('output_tokens', 0) + + chunk = self._make_chunk( + delta={}, + finish_reason=_STOP_REASON_MAP.get(delta.get('stop_reason', ''), 'stop'), + ) + chunk['usage'] = _build_cc_usage( + input_tokens=self._input_tokens, + output_tokens=self._output_tokens, + ) + return [self._dump_chunk(chunk)] + + def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: + """构造标准 OpenAI Chat Completions chunk 对象。""" + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': self._id, + 'object': 'chat.completion.chunk', + 'model': 'claude', + 'choices': [choice], + } + + @staticmethod + def _dump_chunk(chunk: JsonDict) -> str: + """统一序列化 chunk,方便上层直接写入 SSE data。""" + return json.dumps(chunk) + + +# ═══════════════════════════════════════════════════════════ +# 请求转换辅助 +# ═══════════════════════════════════════════════════════════ + + +def _convert_request_message(message: Any) -> tuple[JsonDict | None, str | None]: + """将单条 OpenAI 消息转换为 Anthropic 消息或 system 文本。""" + if not isinstance(message, dict): + return None, None + + role = message.get('role', '') + content = message.get('content', '') + + if role == 'system': + return None, _flatten_text(content) + if role == 'tool': + return _convert_tool_role_message(message), None + + anthropic_role = 'assistant' if role == 'assistant' else 'user' + anthropic_content = _convert_content(message) + + if role == 'assistant' and 'tool_calls' in message: + anthropic_content = _append_tool_use_blocks(anthropic_content, message.get('tool_calls', [])) + + if not anthropic_content and anthropic_content != 0: + return None, None + return {'role': anthropic_role, 'content': anthropic_content}, None + + +def _convert_tool_role_message(message: JsonDict) -> JsonDict | None: + """将 OpenAI 的 tool 角色消息转换为 Anthropic 的 tool_result 内容块。""" + content = message.get('content', '') + text = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) + anthropic_content = [{ + 'type': 'tool_result', + 'tool_use_id': message.get('tool_call_id', ''), + 'content': text, + }] + + if not anthropic_content: + return None + return {'role': 'user', 'content': anthropic_content} + + +def _append_tool_use_blocks(content: Any, tool_calls: list[Any]) -> list[JsonDict]: + """把 OpenAI assistant.tool_calls 追加成 Anthropic tool_use 内容块。""" + blocks = _to_blocks(content) + for tool_call in tool_calls: + if not isinstance(tool_call, dict): + continue + function_data = tool_call.get('function', {}) + blocks.append({ + 'type': 'tool_use', + 'id': tool_call.get('id', gen_id('toolu_')), + 'name': function_data.get('name', ''), + 'input': _parse_tool_arguments(function_data.get('arguments', '{}')), + }) + return blocks + + +def _build_messages_request( + payload: JsonDict, + anthropic_messages: list[JsonDict], + system_parts: list[str], +) -> JsonDict: + """组装最终的 Anthropic Messages 请求体。""" + result: JsonDict = { + 'model': payload.get('model', 'claude-sonnet-4-20250514'), + 'messages': anthropic_messages, + # 沿用项目当前策略:未设置或设置过小都兜底到 8192,避免上游因默认值过小过早截断。 + 'max_tokens': max(payload.get('max_tokens') or 8192, 8192), + } + + if system_parts: + result['system'] = '\n\n'.join(system_parts) + if 'tools' in payload: + result['tools'] = _convert_tools(payload['tools']) + + for key in ('temperature', 'top_p', 'stream'): + if key in payload: + result[key] = payload[key] + + return result + + +# ═══════════════════════════════════════════════════════════ +# 非流式响应转换辅助 +# ═══════════════════════════════════════════════════════════ + + +def _collect_response_parts(content_blocks: Any) -> tuple[str, str, list[JsonDict]]: + """从 Anthropic content 块中提取文本、思考内容和工具调用。""" + content_text = '' + reasoning_text = '' + tool_calls: list[JsonDict] = [] + + if not isinstance(content_blocks, list): + return content_text, reasoning_text, tool_calls + + for block in content_blocks: + if not isinstance(block, dict): + continue + + block_type = block.get('type', '') + if block_type == 'text': + content_text += block.get('text', '') + elif block_type == 'thinking': + reasoning_text += block.get('thinking', '') + elif block_type == 'tool_use': + tool_calls.append(_convert_tool_use_block(block, index=len(tool_calls))) + + return content_text, reasoning_text, tool_calls + + +def _convert_tool_use_block(block: JsonDict, *, index: int) -> JsonDict: + """将 Anthropic 的 tool_use 块转换为 OpenAI tool_call。""" + tool_name = block.get('name', '') + input_data = block.get('input', {}) + + if isinstance(input_data, dict): + input_data = normalize_args(input_data) + input_data = repair_str_replace_args(tool_name, input_data) + arguments_text = json.dumps(input_data, ensure_ascii=False) + else: + arguments_text = str(input_data) + + return { + 'index': index, + 'id': block.get('id', gen_id('toolu_')), + 'type': 'function', + 'function': { + 'name': tool_name, + 'arguments': arguments_text, + }, + } + + +def _build_cc_message(content_text: str, reasoning_text: str, tool_calls: list[JsonDict]) -> JsonDict: + """构造 OpenAI CC 响应中的 assistant message。""" + message: JsonDict = { + 'role': 'assistant', + 'content': content_text or None, + } + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls + return message + + +def _build_cc_usage(*, input_tokens: int, output_tokens: int) -> JsonDict: + """将 Anthropic usage 字段映射为 OpenAI usage。""" + return { + 'prompt_tokens': input_tokens, + 'completion_tokens': output_tokens, + 'total_tokens': input_tokens + output_tokens, + } + + +# ═══════════════════════════════════════════════════════════ +# 通用辅助 +# ═══════════════════════════════════════════════════════════ + + +def _parse_tool_arguments(arguments: Any) -> Any: + """将 tool_call.arguments 尽量解析为对象,供 Anthropic tool_use.input 使用。 + + Anthropic 的 `tool_use.input` 天然期望对象结构;如果这里直接保留原始字符串, + 后续上游会把它当普通文本而不是工具参数对象。 + """ + if not isinstance(arguments, str): + return arguments if arguments is not None else {} + try: + return json.loads(arguments) + except json.JSONDecodeError: + return {} + + +def _flatten_text(content: Any) -> str: + """将 content 扁平化为纯文本,主要用于 system 消息上提。""" + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for part in content: + if isinstance(part, str): + parts.append(part) + elif isinstance(part, dict) and part.get('type') == 'text': + parts.append(part.get('text', '')) + return '\n'.join(parts) + return str(content) + + +def _convert_content(message: JsonDict) -> Any: + """将 OpenAI 消息的 content 字段转换为 Anthropic 内容格式。""" + content = message.get('content', '') + if content is None: + return '' + if isinstance(content, str): + return content + if not isinstance(content, list): + return str(content) + + blocks: list[JsonDict] = [] + for part in content: + converted = _convert_content_part(part) + if converted is not None: + blocks.append(converted) + return blocks + + +def _convert_content_part(part: Any) -> JsonDict | None: + """将单个 OpenAI content part 转为 Anthropic block。""" + if isinstance(part, str): + return {'type': 'text', 'text': part} + if not isinstance(part, dict): + return None + + part_type = part.get('type', '') + if part_type == 'text': + return {'type': 'text', 'text': part.get('text', '')} + if part_type == 'image_url': + return _convert_image(part) + if part_type in ('tool_use', 'tool_result'): + return part + return None + + +def _convert_image(part: JsonDict) -> JsonDict: + """将 OpenAI image_url 格式转换为 Anthropic image 格式。""" + url_data = part.get('image_url', {}) + url = url_data.get('url', '') if isinstance(url_data, dict) else str(url_data) + + if url.startswith('data:'): + media_type, _, base64_data = url.partition(';base64,') + return { + 'type': 'image', + 'source': { + 'type': 'base64', + 'media_type': media_type.replace('data:', '') or 'image/png', + 'data': base64_data, + }, + } + + return { + 'type': 'image', + 'source': { + 'type': 'url', + 'url': url, + }, + } + + +def _convert_tools(tools: Any) -> list[JsonDict]: + """将 OpenAI tools 转为 Anthropic tools 格式。 + + 这里兼容两种常见输入: + - 标准 OpenAI `{"type": "function", "function": {...}}` + - Cursor 常见的扁平工具格式 `{"name": ..., "input_schema": ...}` + """ + if not isinstance(tools, list): + return [] + + result: list[JsonDict] = [] + for tool in tools: + converted = _convert_tool_definition(tool) + if converted is not None: + result.append(converted) + return result + + +def _convert_tool_definition(tool: Any) -> JsonDict | None: + """转换单个工具定义。""" + if not isinstance(tool, dict): + return None + + if tool.get('type') == 'function' and 'function' in tool: + function_data = tool['function'] + return { + 'name': function_data.get('name', ''), + 'description': function_data.get('description', ''), + 'input_schema': function_data.get('parameters', {'type': 'object', 'properties': {}}), + } + + if 'name' in tool and 'input_schema' in tool: + return { + 'name': tool.get('name', ''), + 'description': tool.get('description', ''), + 'input_schema': tool.get('input_schema', {'type': 'object', 'properties': {}}), + } + + return None + + +def _to_blocks(content: Any) -> list[JsonDict]: + """将内容统一转换成 block 列表。""" + if isinstance(content, str): + return [{'type': 'text', 'text': content}] if content else [] + if isinstance(content, list): + return list(content) + return [{'type': 'text', 'text': str(content)}] if content else [] + + +def _merge_same_role(messages: list[JsonDict]) -> list[JsonDict]: + """合并相邻同角色消息。 + + Anthropic 要求消息角色严格交替,而 OpenAI/调用方不一定遵守这一点。 + 这里仅合并“相邻同角色”消息,以最小改动满足 Anthropic 约束,同时尽量保留 + 原本的消息顺序和内容块排列。 + """ + if not messages: + return messages + + merged = [messages[0]] + for message in messages[1:]: + if message['role'] == merged[-1]['role']: + previous_blocks = _to_blocks(merged[-1]['content']) + current_blocks = _to_blocks(message['content']) + merged[-1]['content'] = previous_blocks + current_blocks + else: + merged.append(message) + return merged diff --git a/adapters/openai_anthropic.py b/adapters/openai_anthropic.py deleted file mode 100644 index 50c508d..0000000 --- a/adapters/openai_anthropic.py +++ /dev/null @@ -1,350 +0,0 @@ -"""OpenAI Chat Completions ↔ Anthropic Messages 格式转换 - -请求方向: CC → Messages(Cursor 的 CC 请求转为 Anthropic 格式发给上游) -响应方向: Messages → CC(上游 Anthropic 响应转为 CC 格式返回给 Cursor) -包含非流式和流式两种转换。 -""" - -import json -import uuid -import logging - -from utils.tool_fixer import normalize_args, repair_str_replace_args, fix_anthropic_tool_use -from utils.http import gen_id - -logger = logging.getLogger(__name__) - -# Anthropic stop_reason → OpenAI finish_reason -_STOP_REASON_MAP = { - 'end_turn': 'stop', - 'max_tokens': 'length', - 'tool_use': 'tool_calls', - 'stop_sequence': 'stop', -} - - -# ═══════════════════════════════════════════════════════════ -# 请求转换: CC → Messages -# ═══════════════════════════════════════════════════════════ - - -def cc_to_messages_request(payload): - """将 OpenAI CC 格式请求转换为 Anthropic Messages 格式""" - messages = payload.get('messages', []) - anthropic_msgs = [] - system_parts = [] - - for msg in messages: - role = msg.get('role', '') - content = msg.get('content', '') - - # system 消息提取到顶层 - if role == 'system': - system_parts.append(_flatten_text(content)) - continue - - anthropic_role = 'assistant' if role == 'assistant' else 'user' - anthropic_content = _convert_content(msg) - - # assistant 的 tool_calls → tool_use content blocks - if role == 'assistant' and 'tool_calls' in msg: - blocks = _to_blocks(anthropic_content) - for tc in msg['tool_calls']: - func = tc.get('function', {}) - arguments = func.get('arguments', '{}') - if isinstance(arguments, str): - try: - arguments = json.loads(arguments) - except json.JSONDecodeError: - arguments = {} - blocks.append({ - 'type': 'tool_use', - 'id': tc.get('id', f'toolu_{uuid.uuid4().hex[:24]}'), - 'name': func.get('name', ''), - 'input': arguments, - }) - anthropic_content = blocks - - # tool 角色 → user + tool_result - if role == 'tool': - text = content if isinstance(content, str) else json.dumps(content) - anthropic_content = [{ - 'type': 'tool_result', - 'tool_use_id': msg.get('tool_call_id', ''), - 'content': text, - }] - anthropic_role = 'user' - - if not anthropic_content and anthropic_content != 0: - continue - - anthropic_msgs.append({'role': anthropic_role, 'content': anthropic_content}) - - # Anthropic 要求角色必须交替 - anthropic_msgs = _merge_same_role(anthropic_msgs) - - result = { - 'model': payload.get('model', 'claude-sonnet-4-20250514'), - 'messages': anthropic_msgs, - 'max_tokens': max(payload.get('max_tokens') or 8192, 8192), - } - - if system_parts: - result['system'] = '\n\n'.join(system_parts) - if 'tools' in payload: - result['tools'] = _convert_tools(payload['tools']) - for key in ('temperature', 'top_p', 'stream'): - if key in payload: - result[key] = payload[key] - - return result - - -# ═══════════════════════════════════════════════════════════ -# 非流式响应转换: Messages → CC -# ═══════════════════════════════════════════════════════════ - - -def messages_to_cc_response(data, request_id=None): - """将 Anthropic Messages 响应转换为 OpenAI CC 格式""" - request_id = request_id or gen_id('chatcmpl-') - data = fix_anthropic_tool_use(data) - - content_text = '' - reasoning = '' - tool_calls = [] - - for block in data.get('content', []): - if not isinstance(block, dict): - continue - btype = block.get('type', '') - if btype == 'text': - content_text += block.get('text', '') - elif btype == 'thinking': - reasoning += block.get('thinking', '') - elif btype == 'tool_use': - args = block.get('input', {}) - if isinstance(args, dict): - args = normalize_args(args) - args = repair_str_replace_args(block.get('name', ''), args) - tool_calls.append({ - 'index': len(tool_calls), - 'id': block.get('id', f'toolu_{uuid.uuid4().hex[:24]}'), - 'type': 'function', - 'function': { - 'name': block.get('name', ''), - 'arguments': json.dumps(args, ensure_ascii=False) if isinstance(args, dict) else str(args), - }, - }) - - stop_reason = data.get('stop_reason', 'end_turn') - message = {'role': 'assistant', 'content': content_text or None} - if reasoning: - message['reasoning_content'] = reasoning - if tool_calls: - message['tool_calls'] = tool_calls - - usage = data.get('usage', {}) - return { - 'id': request_id, - 'object': 'chat.completion', - 'model': data.get('model', 'claude'), - 'choices': [{ - 'index': 0, - 'message': message, - 'finish_reason': _STOP_REASON_MAP.get(stop_reason, 'stop'), - }], - 'usage': { - 'prompt_tokens': usage.get('input_tokens', 0), - 'completion_tokens': usage.get('output_tokens', 0), - 'total_tokens': usage.get('input_tokens', 0) + usage.get('output_tokens', 0), - }, - } - - -# ═══════════════════════════════════════════════════════════ -# 流式响应转换: Anthropic SSE → CC chunks -# ═══════════════════════════════════════════════════════════ - - -class AnthropicStreamConverter: - """将 Anthropic SSE 事件逐个转换为 OpenAI CC 流式 chunk""" - - def __init__(self, request_id=None): - self._id = request_id or gen_id('chatcmpl-') - self._tool_index = -1 - self._input_tokens = 0 - self._output_tokens = 0 - - def process_event(self, event_type, event_data): - """处理一个 Anthropic SSE 事件,返回 CC chunk JSON 字符串列表""" - chunks = [] - - if event_type == 'message_start': - msg = event_data.get('message', {}) - self._input_tokens = msg.get('usage', {}).get('input_tokens', 0) - chunk = self._make_chunk(delta={'role': 'assistant', 'content': ''}) - if msg.get('model'): - chunk['model'] = msg['model'] - chunks.append(json.dumps(chunk)) - - elif event_type == 'content_block_start': - block = event_data.get('content_block', {}) - if block.get('type') == 'tool_use': - self._tool_index += 1 - chunks.append(json.dumps(self._make_chunk(delta={ - 'tool_calls': [{ - 'index': self._tool_index, - 'id': block.get('id', f'toolu_{uuid.uuid4().hex[:24]}'), - 'type': 'function', - 'function': {'name': block.get('name', ''), 'arguments': ''}, - }] - }))) - - elif event_type == 'content_block_delta': - delta = event_data.get('delta', {}) - dtype = delta.get('type', '') - if dtype == 'text_delta' and delta.get('text'): - chunks.append(json.dumps(self._make_chunk( - delta={'content': delta['text']}))) - elif dtype == 'thinking_delta' and delta.get('thinking'): - chunks.append(json.dumps(self._make_chunk( - delta={'reasoning_content': delta['thinking']}))) - elif dtype == 'input_json_delta' and delta.get('partial_json'): - chunks.append(json.dumps(self._make_chunk(delta={ - 'tool_calls': [{ - 'index': self._tool_index, - 'function': {'arguments': delta['partial_json']}, - }] - }))) - - elif event_type == 'message_delta': - delta = event_data.get('delta', {}) - usage = event_data.get('usage', {}) - self._output_tokens = usage.get('output_tokens', 0) - finish = _STOP_REASON_MAP.get(delta.get('stop_reason', ''), 'stop') - chunk = self._make_chunk(delta={}, finish_reason=finish) - chunk['usage'] = { - 'prompt_tokens': self._input_tokens, - 'completion_tokens': self._output_tokens, - 'total_tokens': self._input_tokens + self._output_tokens, - } - chunks.append(json.dumps(chunk)) - - return chunks - - def _make_chunk(self, delta, finish_reason=None): - choice = {'index': 0, 'delta': delta} - if finish_reason: - choice['finish_reason'] = finish_reason - return { - 'id': self._id, - 'object': 'chat.completion.chunk', - 'model': 'claude', - 'choices': [choice], - } - - -# ═══════════════════════════════════════════════════════════ -# 内部辅助函数 -# ═══════════════════════════════════════════════════════════ - - -def _flatten_text(content): - """将 content 扁平化为纯文本""" - if isinstance(content, str): - return content - if isinstance(content, list): - parts = [] - for p in content: - if isinstance(p, str): - parts.append(p) - elif isinstance(p, dict) and p.get('type') == 'text': - parts.append(p.get('text', '')) - return '\n'.join(parts) - return str(content) - - -def _convert_content(msg): - """将 OpenAI 消息的 content 字段转为 Anthropic 格式""" - content = msg.get('content', '') - if content is None: - return '' - if isinstance(content, str): - return content - if isinstance(content, list): - blocks = [] - for part in content: - if isinstance(part, str): - blocks.append({'type': 'text', 'text': part}) - elif isinstance(part, dict): - ptype = part.get('type', '') - if ptype == 'text': - blocks.append({'type': 'text', 'text': part.get('text', '')}) - elif ptype == 'image_url': - blocks.append(_convert_image(part)) - elif ptype in ('tool_use', 'tool_result'): - blocks.append(part) - return blocks - return str(content) - - -def _convert_image(part): - """将 OpenAI image_url 格式转为 Anthropic image 格式""" - url_data = part.get('image_url', {}) - url = url_data.get('url', '') if isinstance(url_data, dict) else str(url_data) - if url.startswith('data:'): - media_type, _, b64 = url.partition(';base64,') - return { - 'type': 'image', - 'source': { - 'type': 'base64', - 'media_type': media_type.replace('data:', '') or 'image/png', - 'data': b64, - }, - } - return {'type': 'image', 'source': {'type': 'url', 'url': url}} - - -def _convert_tools(tools): - """将 OpenAI tools 转为 Anthropic tools 格式(兼容 Cursor 扁平格式)""" - result = [] - for tool in tools: - if tool.get('type') == 'function' and 'function' in tool: - func = tool['function'] - result.append({ - 'name': func.get('name', ''), - 'description': func.get('description', ''), - 'input_schema': func.get('parameters', {'type': 'object', 'properties': {}}), - }) - elif 'name' in tool and 'input_schema' in tool: - result.append({ - 'name': tool.get('name', ''), - 'description': tool.get('description', ''), - 'input_schema': tool.get('input_schema', {'type': 'object', 'properties': {}}), - }) - return result - - -def _to_blocks(content): - """将 content 统一转为 blocks 列表""" - if isinstance(content, str): - return [{'type': 'text', 'text': content}] if content else [] - if isinstance(content, list): - return list(content) - return [{'type': 'text', 'text': str(content)}] if content else [] - - -def _merge_same_role(messages): - """合并相邻同角色消息(Anthropic 要求角色必须交替)""" - if not messages: - return messages - merged = [messages[0]] - for msg in messages[1:]: - if msg['role'] == merged[-1]['role']: - prev = _to_blocks(merged[-1]['content']) - curr = _to_blocks(msg['content']) - merged[-1]['content'] = prev + curr - else: - merged.append(msg) - return merged diff --git a/adapters/openai_compat_fixer.py b/adapters/openai_compat_fixer.py new file mode 100644 index 0000000..e73d7d1 --- /dev/null +++ b/adapters/openai_compat_fixer.py @@ -0,0 +1,405 @@ +"""OpenAI 格式修复 + +这个模块专门处理 OpenAI Chat Completions 兼容层里的“脏活”: +- 请求方向:把 Cursor 发来的近似 OpenAI 格式修整成更标准的请求 +- 响应方向:把上游返回的近似 OpenAI 格式修整成 Cursor 更容易消费的结果 + +这里之所以集中做兼容性修复,而不是散落在路由层,是因为这些规则本质上属于 +“协议清洗”而不是“请求编排”。路由层只应该关心把请求送到哪里,修复规则则应该 +在适配层统一收口,避免两条主链路各自维护一份类似逻辑。 +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from utils.http import gen_id +from utils.think_tag import extract_from_text +from utils.tool_fixer import normalize_args, repair_str_replace_args + +logger = logging.getLogger(__name__) + +JsonDict = dict[str, Any] + + +# ─── 请求预处理 ─────────────────────────────────── + + +def normalize_request(payload: JsonDict, upstream_model: str | None = None) -> JsonDict: + """预处理 Cursor 发来的 OpenAI 风格请求。 + + 这个函数只做“让请求更像标准 OpenAI CC”的整理,不负责路由或网络层决策。 + 当前处理的重点有两类: + 1. Cursor 偶尔会在 CC 端点混入 Anthropic 风格内容块,需要先转回 OpenAI 语义。 + 2. 工具定义和 tool_choice 可能是 Cursor 的便捷写法,需要标准化后再发给上游。 + """ + if upstream_model: + payload['model'] = upstream_model + + if 'messages' in payload: + payload['messages'] = _convert_anthropic_messages(payload['messages']) + + if 'tools' not in payload: + return payload + + payload['tools'] = [_normalize_tool_definition(tool) for tool in payload['tools']] + _normalize_tool_choice(payload) + return payload + + +# ─── 消息兼容转换 ───────────────────────────────── + + +def _convert_anthropic_messages(messages: Any) -> Any: + """将消息中的 Anthropic tool_use/tool_result 块转回 OpenAI 风格消息。 + + Cursor 在少数场景下会把 Anthropic 风格内容块直接发到 + `/v1/chat/completions`。如果不在这里先转换,后续上游即使是 OpenAI 兼容接口, + 也未必能理解这类内容块。 + """ + if not isinstance(messages, list): + return messages + + converted: list[JsonDict] = [] + for message in messages: + converted.extend(_convert_single_message(message)) + return converted + + +def _convert_single_message(message: Any) -> list[JsonDict]: + """将单条消息转换为 1 条或多条 OpenAI 风格消息。""" + if not isinstance(message, dict): + return [message] + + content = message.get('content') + if not isinstance(content, list): + return [message] + + has_tool_use, has_tool_result = _detect_tool_blocks(content) + if not has_tool_use and not has_tool_result: + return [message] + + role = message.get('role', '') + if role == 'assistant' and has_tool_use: + return [_convert_assistant_tool_use_message(content)] + if has_tool_result: + return _convert_tool_result_message(role, content) + return [message] + + +def _detect_tool_blocks(content: list[Any]) -> tuple[bool, bool]: + """识别内容块里是否包含 Anthropic 风格工具调用或工具结果。""" + has_tool_use = any( + isinstance(block, dict) and block.get('type') == 'tool_use' + for block in content + ) + has_tool_result = any( + isinstance(block, dict) and block.get('type') == 'tool_result' + for block in content + ) + return has_tool_use, has_tool_result + + +def _convert_assistant_tool_use_message(content: list[Any]) -> JsonDict: + """将 assistant 的 tool_use 内容块转为 OpenAI tool_calls。""" + text_parts: list[str] = [] + tool_calls: list[JsonDict] = [] + + for block in content: + if not isinstance(block, dict): + continue + if block.get('type') == 'text': + text_parts.append(block.get('text', '')) + elif block.get('type') == 'tool_use': + tool_calls.append({ + 'id': block.get('id', gen_id('call_')), + 'type': 'function', + 'function': { + 'name': block.get('name', ''), + 'arguments': json.dumps(block.get('input', {}), ensure_ascii=False), + }, + }) + + result: JsonDict = { + 'role': 'assistant', + 'content': '\n'.join(text_parts) if text_parts else None, + } + if tool_calls: + result['tool_calls'] = tool_calls + return result + + +def _convert_tool_result_message(role: str, content: list[Any]) -> list[JsonDict]: + """将 tool_result 块拆成 OpenAI 的 tool 消息,并保留其余内容块。""" + converted: list[JsonDict] = [] + other_parts: list[Any] = [] + + for block in content: + if not isinstance(block, dict): + continue + if block.get('type') == 'tool_result': + converted.append({ + 'role': 'tool', + 'tool_call_id': block.get('tool_use_id', ''), + 'content': _stringify_tool_result_content(block.get('content', '')), + }) + else: + other_parts.append(block) + + if other_parts: + converted.append({'role': role, 'content': other_parts}) + return converted + + +def _stringify_tool_result_content(content: Any) -> str: + """将 tool_result 的 content 规范为字符串。 + + OpenAI 的 tool 消息内容天然更偏向字符串;而 Anthropic 的 tool_result 允许列表块。 + 这里做一次降维,避免后续上游把结构化结果误当成普通消息块。 + """ + if isinstance(content, str): + return content + if isinstance(content, list): + return '\n'.join( + block.get('text', '') + for block in content + if isinstance(block, dict) and block.get('type') == 'text' + ) + return str(content) + + +def _normalize_tool_definition(tool: Any) -> Any: + """将 Cursor 可能使用的扁平工具定义补成标准 OpenAI function tool。 + + 这里不主动过滤未知字段,只做最小标准化,避免在兼容层里过早丢失调用方提供的 + 额外上下文。 + """ + if not isinstance(tool, dict): + return tool + if tool.get('type') == 'function' and 'function' in tool: + return tool + if 'name' not in tool: + return tool + return { + 'type': 'function', + 'function': { + 'name': tool.get('name', ''), + 'description': tool.get('description', ''), + 'parameters': ( + tool.get('input_schema') + or tool.get('parameters') + or {'type': 'object', 'properties': {}} + ), + }, + } + + +def _normalize_tool_choice(payload: JsonDict) -> None: + """规范化 tool_choice。 + + 这里保留当前项目已有的映射约定: + - `{"type": "auto"}` → `"auto"` + - `{"type": "any"}` → `"required"` + + 这样做是因为部分上游只接受 OpenAI 常见的字符串写法,而不接受 Cursor/Anthropic + 风格的对象写法。 + """ + tool_choice = payload.get('tool_choice') + if not isinstance(tool_choice, dict): + return + if tool_choice.get('type') == 'auto': + payload['tool_choice'] = 'auto' + elif tool_choice.get('type') == 'any': + payload['tool_choice'] = 'required' + + +# ─── 非流式响应修复 ─────────────────────────────── + + +def fix_response(data: Any) -> Any: + """修复上游返回的非流式 OpenAI 响应。""" + if not isinstance(data, dict): + return data + + for choice in data.get('choices') or []: + _fix_response_choice(choice) + return data + + +def _fix_response_choice(choice: Any) -> None: + """修复单个非流式 choice。""" + if not isinstance(choice, dict): + return + + message = choice.get('message') or {} + if not isinstance(message, dict): + return + + _promote_reasoning_field(message) + _extract_reasoning_from_content(message) + _convert_legacy_message_function_call(message, choice) + _fix_tool_calls(message, choice) + + +def _promote_reasoning_field(container: JsonDict) -> None: + """兼容不同上游返回的 reasoning 字段命名差异。""" + if 'reasoningContent' in container and 'reasoning_content' not in container: + container['reasoning_content'] = container.pop('reasoningContent') + + +def _extract_reasoning_from_content(message: JsonDict) -> None: + """从 `...` 中提取 reasoning_content。 + + 有些上游把思考内容直接塞进 content 字符串里,而不是单独返回 reasoning 字段。 + 这里主动提取,是为了让 Cursor 端更稳定地展示思考过程。 + """ + content = message.get('content') or '' + if not isinstance(content, str): + return + if '' not in content or message.get('reasoning_content'): + return + + cleaned, reasoning = extract_from_text(content) + if not reasoning: + return + + message['reasoning_content'] = reasoning + message['content'] = cleaned + logger.info('已提取 标签内容并映射为 reasoning_content,长度=%s', len(reasoning)) + + +def _convert_legacy_message_function_call(message: JsonDict, choice: JsonDict) -> None: + """将旧版 function_call 字段升级为新版 tool_calls。""" + if 'function_call' not in message or 'tool_calls' in message: + return + + function_call = message.pop('function_call') or {} + message['tool_calls'] = [{ + 'id': gen_id('call_'), + 'type': 'function', + 'function': { + 'name': function_call.get('name', ''), + 'arguments': function_call.get('arguments', '{}'), + }, + }] + _rewrite_function_call_finish_reason(choice) + + +# ─── 流式 chunk 修复 ────────────────────────────── + + +def fix_stream_chunk(data: Any) -> Any: + """修复上游返回的流式 OpenAI chunk。""" + if not isinstance(data, dict): + return data + + for choice in data.get('choices') or []: + _fix_stream_choice(choice) + return data + + +def _fix_stream_choice(choice: Any) -> None: + """修复单个流式 choice。""" + if not isinstance(choice, dict): + return + + delta = choice.get('delta') or {} + if not isinstance(delta, dict): + return + + _promote_reasoning_field(delta) + _convert_legacy_delta_function_call(delta, choice) + _ensure_stream_tool_calls(delta) + _rewrite_function_call_finish_reason(choice) + + +def _convert_legacy_delta_function_call(delta: JsonDict, choice: JsonDict) -> None: + """将流式旧版 function_call 增量升级为 tool_calls 增量。""" + if 'function_call' not in delta or 'tool_calls' in delta: + return + + function_call = delta.pop('function_call') or {} + tool_call: JsonDict = {'index': 0, 'type': 'function', 'function': {}} + if 'name' in function_call: + tool_call['id'] = gen_id('call_') + tool_call['function']['name'] = function_call['name'] + if 'arguments' in function_call: + tool_call['function']['arguments'] = function_call['arguments'] + + delta['tool_calls'] = [tool_call] + _rewrite_function_call_finish_reason(choice) + + +def _ensure_stream_tool_calls(delta: JsonDict) -> None: + """补全流式 tool_calls 的最小必需字段。 + + 流式增量中的 tool_calls 往往是不完整片段,这里只补齐索引、ID、类型等元信息, + 不主动改写 arguments 内容,避免破坏增量拼接语义。 + """ + for tool_call in delta.get('tool_calls') or []: + if 'index' not in tool_call: + tool_call['index'] = 0 + function_data = tool_call.get('function') or {} + if 'id' in tool_call or 'name' in function_data: + if not tool_call.get('id'): + tool_call['id'] = gen_id('call_') + if 'type' not in tool_call: + tool_call['type'] = 'function' + + +# ─── tool_calls 修复 ────────────────────────────── + + +def _fix_tool_calls(message: JsonDict, choice: JsonDict) -> None: + """修复非流式消息中的 tool_calls 字段。""" + tool_calls = message.get('tool_calls') + if not tool_calls: + return + + for index, tool_call in enumerate(tool_calls): + _fill_tool_call_metadata(tool_call, index=index) + _normalize_tool_call_arguments(tool_call) + + if choice.get('finish_reason') not in ('tool_calls', 'function_call'): + choice['finish_reason'] = 'tool_calls' + + +def _fill_tool_call_metadata(tool_call: JsonDict, *, index: int) -> None: + """补齐非流式 tool_call 的通用元数据。""" + if not tool_call.get('id'): + tool_call['id'] = gen_id('call_') + if 'index' not in tool_call: + tool_call['index'] = index + if tool_call.get('type') != 'function': + tool_call['type'] = 'function' + + +def _normalize_tool_call_arguments(tool_call: JsonDict) -> None: + """规范化 tool_call 参数。 + + 这里会顺带调用工具参数修复器,原因是很多兼容性问题不在协议层,而在工具参数本身: + 比如 `file_path`/`path` 命名差异、智能引号、StrReplace 精确匹配失败等。 + """ + function_data = tool_call.get('function') or {} + raw_arguments = function_data.get('arguments', '{}') + + try: + arguments = ( + json.loads(raw_arguments) + if isinstance(raw_arguments, str) + else (raw_arguments or {}) + ) + except json.JSONDecodeError: + arguments = {} + + arguments = normalize_args(arguments) + arguments = repair_str_replace_args(function_data.get('name', ''), arguments) + function_data['arguments'] = json.dumps(arguments, ensure_ascii=False) + + +def _rewrite_function_call_finish_reason(choice: JsonDict) -> None: + """将旧版 finish_reason=function_call 升级为 tool_calls。""" + if choice.get('finish_reason') == 'function_call': + choice['finish_reason'] = 'tool_calls' diff --git a/adapters/openai_fixer.py b/adapters/openai_fixer.py deleted file mode 100644 index 971def8..0000000 --- a/adapters/openai_fixer.py +++ /dev/null @@ -1,267 +0,0 @@ -"""OpenAI 格式修复 - -修复 Cursor 发出的 OpenAI 格式请求和上游返回的响应中的各种兼容性问题: - 请求修复: Cursor 扁平格式 tools → 标准嵌套格式, tool_choice 规范化 - 响应修复: reasoningContent → reasoning_content, 标签提取, - function_call → tool_calls, tool_calls 字段补全, 参数修复 -""" - -import json -import logging - -from utils.http import gen_id -from utils.tool_fixer import normalize_args, repair_str_replace_args -from utils.think_tag import extract_from_text - -logger = logging.getLogger(__name__) - - -# ─── 请求预处理 ─────────────────────────────────── - - -def normalize_request(payload, upstream_model=None): - """预处理 Cursor 发来的 OpenAI 格式请求""" - if upstream_model: - payload['model'] = upstream_model - - # Cursor 可能在 CC 端点发送 Anthropic 格式的 tool_use/tool_result 消息 - if 'messages' in payload: - payload['messages'] = _convert_anthropic_messages(payload['messages']) - - if 'tools' not in payload: - return payload - - # 修复 Cursor 可能发出的扁平格式 tools - normalized = [] - for tool in payload['tools']: - if tool.get('type') == 'function' and 'function' in tool: - normalized.append(tool) - elif 'name' in tool: - normalized.append({ - 'type': 'function', - 'function': { - 'name': tool.get('name', ''), - 'description': tool.get('description', ''), - 'parameters': tool.get('input_schema') - or tool.get('parameters') - or {'type': 'object', 'properties': {}}, - }, - }) - else: - normalized.append(tool) - payload['tools'] = normalized - - # tool_choice 规范化 - tc = payload.get('tool_choice') - if isinstance(tc, dict): - if tc.get('type') == 'auto': - payload['tool_choice'] = 'auto' - elif tc.get('type') == 'any': - payload['tool_choice'] = 'required' - - return payload - - -def _convert_anthropic_messages(messages): - """将消息中的 Anthropic 格式 tool_use/tool_result 转为 OpenAI 格式 - - Cursor 有时在 CC 端点中发送 Anthropic 风格的内容块: - assistant: [{"type":"tool_use", "id":"...", "name":"Read", "input":{...}}] - user: [{"type":"tool_result", "tool_use_id":"...", "content":[...]}] - OpenAI 格式应为: - assistant: {"tool_calls":[{"id":"...", "function":{"name":"Read","arguments":"..."}}]} - tool: {"tool_call_id":"...", "content":"..."} - """ - converted = [] - for msg in messages: - content = msg.get('content') - if not isinstance(content, list): - converted.append(msg) - continue - - has_tool_use = any( - isinstance(b, dict) and b.get('type') == 'tool_use' for b in content - ) - has_tool_result = any( - isinstance(b, dict) and b.get('type') == 'tool_result' for b in content - ) - - if not has_tool_use and not has_tool_result: - converted.append(msg) - continue - - role = msg.get('role', '') - - if role == 'assistant' and has_tool_use: - text_parts = [] - tool_calls = [] - for block in content: - if not isinstance(block, dict): - continue - if block.get('type') == 'text': - text_parts.append(block.get('text', '')) - elif block.get('type') == 'tool_use': - tool_calls.append({ - 'id': block.get('id', gen_id('call_')), - 'type': 'function', - 'function': { - 'name': block.get('name', ''), - 'arguments': json.dumps( - block.get('input', {}), ensure_ascii=False - ), - }, - }) - new_msg = {'role': 'assistant'} - new_msg['content'] = '\n'.join(text_parts) if text_parts else None - if tool_calls: - new_msg['tool_calls'] = tool_calls - converted.append(new_msg) - - elif has_tool_result: - other_parts = [] - for block in content: - if not isinstance(block, dict): - continue - if block.get('type') == 'tool_result': - rc = block.get('content', '') - if isinstance(rc, list): - rc = '\n'.join( - b.get('text', '') for b in rc - if isinstance(b, dict) and b.get('type') == 'text' - ) - elif not isinstance(rc, str): - rc = str(rc) - converted.append({ - 'role': 'tool', - 'tool_call_id': block.get('tool_use_id', ''), - 'content': rc, - }) - else: - other_parts.append(block) - if other_parts: - converted.append({'role': role, 'content': other_parts}) - else: - converted.append(msg) - - return converted - - -# ─── 非流式响应修复 ─────────────────────────────── - - -def fix_response(data): - """修复上游返回的非流式 OpenAI 响应""" - if not isinstance(data, dict): - return data - - for choice in (data.get('choices') or []): - msg = choice.get('message') or {} - - # reasoningContent → reasoning_content - if 'reasoningContent' in msg and 'reasoning_content' not in msg: - msg['reasoning_content'] = msg.pop('reasoningContent') - - # 标签 → reasoning_content - content = msg.get('content') or '' - if isinstance(content, str) and '' in content and not msg.get('reasoning_content'): - cleaned, reasoning = extract_from_text(content) - if reasoning: - msg['reasoning_content'] = reasoning - msg['content'] = cleaned - logger.info(f'提取 标签 → reasoning_content ({len(reasoning)} 字符)') - - # 旧版 function_call → 新版 tool_calls - if 'function_call' in msg and 'tool_calls' not in msg: - fc = msg.pop('function_call') - msg['tool_calls'] = [{ - 'id': gen_id('call_'), - 'type': 'function', - 'function': { - 'name': fc.get('name', ''), - 'arguments': fc.get('arguments', '{}'), - }, - }] - if choice.get('finish_reason') == 'function_call': - choice['finish_reason'] = 'tool_calls' - - # 修复 tool_calls 字段 - _fix_tool_calls(msg, choice) - - return data - - -# ─── 流式 chunk 修复 ────────────────────────────── - - -def fix_stream_chunk(data): - """修复上游返回的流式 OpenAI chunk""" - if not isinstance(data, dict): - return data - - for choice in (data.get('choices') or []): - delta = choice.get('delta') or {} - - # reasoningContent → reasoning_content - if 'reasoningContent' in delta and 'reasoning_content' not in delta: - delta['reasoning_content'] = delta.pop('reasoningContent') - - # 旧版 function_call → tool_calls - if 'function_call' in delta and 'tool_calls' not in delta: - fc = delta.pop('function_call') - tc = {'index': 0, 'type': 'function', 'function': {}} - if 'name' in fc: - tc['id'] = gen_id('call_') - tc['function']['name'] = fc['name'] - if 'arguments' in fc: - tc['function']['arguments'] = fc['arguments'] - delta['tool_calls'] = [tc] - if choice.get('finish_reason') == 'function_call': - choice['finish_reason'] = 'tool_calls' - - # 补全 tool_calls 字段 - for tc in (delta.get('tool_calls') or []): - if 'index' not in tc: - tc['index'] = 0 - func = tc.get('function') or {} - if 'id' in tc or 'name' in func: - if not tc.get('id'): - tc['id'] = gen_id('call_') - if 'type' not in tc: - tc['type'] = 'function' - - if choice.get('finish_reason') == 'function_call': - choice['finish_reason'] = 'tool_calls' - - return data - - -# ─── 内部辅助 ───────────────────────────────────── - - -def _fix_tool_calls(msg, choice): - """修复消息中的 tool_calls 字段""" - tool_calls = msg.get('tool_calls') - if not tool_calls: - return - - for i, tc in enumerate(tool_calls): - if not tc.get('id'): - tc['id'] = gen_id('call_') - if 'index' not in tc: - tc['index'] = i - if tc.get('type') != 'function': - tc['type'] = 'function' - - func = tc.get('function', {}) - args_raw = func.get('arguments', '{}') - try: - args = json.loads(args_raw) if isinstance(args_raw, str) else (args_raw or {}) - except json.JSONDecodeError: - args = {} - - args = normalize_args(args) - args = repair_str_replace_args(func.get('name', ''), args) - func['arguments'] = json.dumps(args, ensure_ascii=False) - - if choice.get('finish_reason') not in ('tool_calls', 'function_call'): - choice['finish_reason'] = 'tool_calls' diff --git a/adapters/responses_adapter.py b/adapters/responses_adapter.py deleted file mode 100644 index 18cd2c8..0000000 --- a/adapters/responses_adapter.py +++ /dev/null @@ -1,533 +0,0 @@ -"""Responses API 适配 - -Cursor 对 GPT/Claude-Opus 等模型使用 /v1/responses 格式。 -本模块将 Responses 格式与 Chat Completions 格式互相转换: - 请求: Responses → CC - 响应: CC → Responses(非流式 + 流式) - 流式: 支持从 CC chunks 或 Anthropic SSE 事件直接转换 -""" - -import json -import logging - -from utils.http import gen_id - -logger = logging.getLogger(__name__) - - -# ═══════════════════════════════════════════════════════════ -# 请求转换: Responses → CC -# ═══════════════════════════════════════════════════════════ - - -def responses_to_cc(payload): - """将 /v1/responses 请求转换为 /v1/chat/completions 格式""" - messages = [] - - if payload.get('instructions'): - messages.append({'role': 'system', 'content': payload['instructions']}) - - input_data = payload.get('input', []) - if isinstance(input_data, str): - messages.append({'role': 'user', 'content': input_data}) - elif isinstance(input_data, list): - _convert_input_items(input_data, messages) - - result = { - 'model': payload.get('model', ''), - 'messages': messages, - 'stream': payload.get('stream', False), - } - - if 'tools' in payload: - result['tools'] = _convert_tools(payload['tools']) - for key in ('temperature', 'top_p'): - if key in payload: - result[key] = payload[key] - if 'max_output_tokens' in payload: - result['max_tokens'] = payload['max_output_tokens'] - if 'tool_choice' in payload: - result['tool_choice'] = payload['tool_choice'] - - return result - - -# ═══════════════════════════════════════════════════════════ -# 非流式响应转换: CC → Responses -# ═══════════════════════════════════════════════════════════ - - -def cc_to_responses(cc_resp, model=''): - """将 CC 响应转换为 Responses 格式""" - choice = (cc_resp.get('choices') or [{}])[0] - msg = choice.get('message') or {} - finish = choice.get('finish_reason', 'stop') - - output = [] - - if msg.get('reasoning_content'): - output.append({ - 'type': 'reasoning', - 'id': gen_id('rs_'), - 'summary': [{'type': 'summary_text', 'text': msg['reasoning_content']}], - }) - - if msg.get('content'): - output.append({ - 'type': 'message', - 'id': gen_id('msg_'), - 'status': 'completed', - 'role': 'assistant', - 'content': [{'type': 'output_text', 'text': msg['content']}], - }) - - for tc in (msg.get('tool_calls') or []): - func = tc.get('function') or {} - output.append({ - 'type': 'function_call', - 'id': gen_id('fc_'), - 'status': 'completed', - 'call_id': tc.get('id', gen_id('call_')), - 'name': func.get('name', ''), - 'arguments': func.get('arguments', '{}'), - }) - - usage = cc_resp.get('usage', {}) - return { - 'id': cc_resp.get('id', gen_id('resp_')), - 'object': 'response', - 'status': 'incomplete' if finish == 'length' else 'completed', - 'model': model or cc_resp.get('model', ''), - 'output': output, - 'usage': { - 'input_tokens': usage.get('prompt_tokens', 0), - 'output_tokens': usage.get('completion_tokens', 0), - 'total_tokens': usage.get('total_tokens', 0), - }, - } - - -# ═══════════════════════════════════════════════════════════ -# 流式转换器: CC chunks / Anthropic SSE → Responses SSE -# ═══════════════════════════════════════════════════════════ - - -class ResponsesStreamConverter: - """有状态转换器:将 CC 流式 chunk 或 Anthropic SSE 事件转为 Responses SSE 事件""" - - def __init__(self, response_id=None, model=''): - self.resp_id = response_id or gen_id('resp_') - self.model = model - - # 思考内容缓冲 - self._rs_buf = '' - self._rs_started = False - self._rs_closed = False - self._rs_id = gen_id('rs_') - - # 文本内容缓冲 - self._text_buf = '' - self._text_started = False - self._text_closed = False - self._msg_id = gen_id('msg_') - - # 工具调用缓冲 {index: {name, args, call_id, fc_id}} - self._tools = {} - self._output_items = [] - self._finished = False - self._input_tokens = 0 - - # ─── 公开接口 ───────────────────────────────── - - def start_events(self): - """生成流开始事件""" - return [self._sse('response.created', { - 'id': self.resp_id, 'object': 'response', - 'status': 'in_progress', 'model': self.model, 'output': [], - })] - - def process_cc_chunk(self, chunk): - """处理 CC 格式的流式 chunk,返回 Responses SSE 事件列表""" - events = [] - for choice in (chunk.get('choices') or []): - delta = choice.get('delta') or {} - finish = choice.get('finish_reason') - - if delta.get('reasoning_content'): - events.extend(self._on_reasoning(delta['reasoning_content'])) - if delta.get('content') is not None and delta['content'] != '': - events.extend(self._on_text(delta['content'])) - for tc in (delta.get('tool_calls') or []): - events.extend(self._on_tool_call(tc)) - if finish and not self._finished: - self._finished = True - events.extend(self._do_finish(finish, chunk.get('usage'))) - - return events - - def process_anthropic_event(self, event_type, event_data): - """直接处理 Anthropic SSE 事件(跳过 CC 中间转换,更高效)""" - events = [] - - if event_type == 'message_start': - usage = event_data.get('message', {}).get('usage', {}) - self._input_tokens = usage.get('input_tokens', 0) - - elif event_type == 'content_block_start': - block = event_data.get('content_block', {}) - btype = block.get('type', '') - if btype == 'thinking' and not self._rs_started: - self._rs_started = True - events.append(self._sse('response.output_item.added', { - 'type': 'reasoning', 'id': self._rs_id, 'summary': [], - })) - elif btype == 'text': - events.extend(self._ensure_text_started()) - elif btype == 'tool_use': - events.extend(self._start_tool_from_block(block)) - - elif event_type == 'content_block_delta': - delta = event_data.get('delta', {}) - dtype = delta.get('type', '') - if dtype == 'thinking_delta' and delta.get('thinking'): - self._rs_buf += delta['thinking'] - events.append(self._sse('response.reasoning_summary_text.delta', { - 'type': 'summary_text', 'delta': delta['thinking'], - })) - elif dtype == 'text_delta' and delta.get('text'): - self._text_buf += delta['text'] - events.append(self._sse('response.output_text.delta', { - 'type': 'output_text', 'delta': delta['text'], - })) - elif dtype == 'input_json_delta' and delta.get('partial_json') and self._tools: - idx = max(self._tools.keys()) - self._tools[idx]['args'] += delta['partial_json'] - events.append(self._sse('response.function_call_arguments.delta', { - 'type': 'function_call', 'delta': delta['partial_json'], - })) - - elif event_type == 'message_delta': - delta = event_data.get('delta', {}) - stop = delta.get('stop_reason', 'end_turn') - usage = event_data.get('usage', {}) - finish = {'tool_use': 'tool_calls', 'max_tokens': 'length'}.get(stop, 'stop') - if not self._finished: - self._finished = True - u = { - 'input_tokens': self._input_tokens, - 'output_tokens': usage.get('output_tokens', 0), - 'total_tokens': self._input_tokens + usage.get('output_tokens', 0), - } - events.extend(self._do_finish(finish, u)) - - return events - - def finalize(self): - """流结束时补发未关闭的事件""" - if self._finished: - return [] - self._finished = True - return self._do_finish('stop', None) - - # ─── 内部事件处理 ───────────────────────────── - - def _on_reasoning(self, text): - """处理思考内容 delta""" - events = [] - if not self._rs_started: - self._rs_started = True - events.append(self._sse('response.output_item.added', { - 'type': 'reasoning', 'id': self._rs_id, 'summary': [], - })) - self._rs_buf += text - events.append(self._sse('response.reasoning_summary_text.delta', { - 'type': 'summary_text', 'delta': text, - })) - return events - - def _on_text(self, text): - """处理文本内容 delta""" - events = self._ensure_text_started() - self._text_buf += text - events.append(self._sse('response.output_text.delta', { - 'type': 'output_text', 'delta': text, - })) - return events - - def _on_tool_call(self, tc): - """处理工具调用 delta""" - events = [] - idx = tc.get('index', 0) - func = tc.get('function') or {} - - if idx not in self._tools: - if self._rs_started and not self._rs_closed: - events.extend(self._close_reasoning()) - if self._text_started and not self._text_closed: - events.extend(self._close_text()) - call_id = tc.get('id', gen_id('call_')) - name = func.get('name', '') - fc_id = gen_id('fc_') - self._tools[idx] = {'name': name, 'args': '', 'call_id': call_id, 'fc_id': fc_id} - events.append(self._sse('response.output_item.added', { - 'type': 'function_call', 'id': fc_id, - 'status': 'in_progress', 'call_id': call_id, - 'name': name, 'arguments': '', - })) - - if func.get('name'): - self._tools[idx]['name'] = func['name'] - if func.get('arguments', ''): - self._tools[idx]['args'] += func['arguments'] - events.append(self._sse('response.function_call_arguments.delta', { - 'type': 'function_call', 'delta': func['arguments'], - })) - return events - - def _ensure_text_started(self): - """确保文本输出项已开始""" - events = [] - if self._rs_started and not self._rs_closed: - events.extend(self._close_reasoning()) - if not self._text_started: - self._text_started = True - events.append(self._sse('response.output_item.added', { - 'type': 'message', 'id': self._msg_id, - 'status': 'in_progress', 'role': 'assistant', 'content': [], - })) - events.append(self._sse('response.content_part.added', { - 'type': 'output_text', 'text': '', - })) - return events - - def _start_tool_from_block(self, block): - """从 Anthropic tool_use block 开始新的工具调用""" - events = [] - if self._rs_started and not self._rs_closed: - events.extend(self._close_reasoning()) - if self._text_started and not self._text_closed: - events.extend(self._close_text()) - idx = len(self._tools) - tool_id = block.get('id', gen_id('toolu_')) - name = block.get('name', '') - fc_id = gen_id('fc_') - self._tools[idx] = {'name': name, 'args': '', 'call_id': tool_id, 'fc_id': fc_id} - events.append(self._sse('response.output_item.added', { - 'type': 'function_call', 'id': fc_id, - 'status': 'in_progress', 'call_id': tool_id, - 'name': name, 'arguments': '', - })) - return events - - # ─── 关闭/结束事件 ──────────────────────────── - - def _close_reasoning(self): - if self._rs_closed: - return [] - self._rs_closed = True - rs = { - 'type': 'reasoning', 'id': self._rs_id, - 'summary': [{'type': 'summary_text', 'text': self._rs_buf}], - } - self._output_items.append(rs) - return [ - self._sse('response.reasoning_summary_text.done', { - 'type': 'summary_text', 'text': self._rs_buf, - }), - self._sse('response.output_item.done', rs), - ] - - def _close_text(self): - if self._text_closed: - return [] - self._text_closed = True - msg = { - 'type': 'message', 'id': self._msg_id, - 'status': 'completed', 'role': 'assistant', - 'content': [{'type': 'output_text', 'text': self._text_buf}], - } - self._output_items.append(msg) - return [ - self._sse('response.output_text.done', {'type': 'output_text', 'text': self._text_buf}), - self._sse('response.output_item.done', msg), - ] - - def _do_finish(self, finish_reason, usage): - """生成流结束的所有关闭事件""" - events = [] - if self._rs_started and not self._rs_closed: - events.extend(self._close_reasoning()) - if self._text_started and not self._text_closed: - events.extend(self._close_text()) - - for idx in sorted(self._tools.keys()): - buf = self._tools[idx] - events.append(self._sse('response.function_call_arguments.done', { - 'type': 'function_call', 'arguments': buf['args'], - })) - fc = { - 'type': 'function_call', 'id': buf['fc_id'], - 'status': 'completed', 'call_id': buf['call_id'], - 'name': buf['name'], 'arguments': buf['args'], - } - events.append(self._sse('response.output_item.done', fc)) - self._output_items.append(fc) - - usage_data = usage if isinstance(usage, dict) else {} - events.append(self._sse('response.completed', { - 'id': self.resp_id, 'object': 'response', - 'status': 'incomplete' if finish_reason == 'length' else 'completed', - 'model': self.model, 'output': self._output_items, 'usage': usage_data, - })) - return events - - def _sse(self, event_type, data): - """构建 SSE 事件字符串""" - return f'event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n' - - -# ═══════════════════════════════════════════════════════════ -# 内部辅助函数 -# ═══════════════════════════════════════════════════════════ - - -def _convert_input_items(items, messages): - """将 Responses input 数组转换为 CC messages""" - i = 0 - while i < len(items): - item = items[i] - - if isinstance(item, str): - messages.append({'role': 'user', 'content': item}) - i += 1 - continue - - if not isinstance(item, dict): - i += 1 - continue - - item_type = item.get('type', '') - role = item.get('role', '') - - # 简单角色消息(无 type 字段) - if role and not item_type: - content = item.get('content', '') - if isinstance(content, list): - content = _extract_text(content) - messages.append({'role': role, 'content': content or ''}) - i += 1 - continue - - # Responses message 对象 - if item_type == 'message' or (role and not item_type): - role = item.get('role', 'assistant') - content = _extract_text(item.get('content', [])) - msg = {'role': role, 'content': content or ''} - if role == 'assistant': - tool_calls, consumed = _collect_function_calls(items, i + 1) - if tool_calls: - msg['tool_calls'] = tool_calls - if not msg['content']: - msg['content'] = None - messages.append(msg) - i += 1 + consumed - continue - messages.append(msg) - i += 1 - continue - - # function_call(工具调用) - if item_type == 'function_call': - tc = { - 'id': item.get('call_id') or gen_id('call_'), - 'type': 'function', - 'function': { - 'name': item.get('name', ''), - 'arguments': item.get('arguments', '{}'), - }, - } - if messages and messages[-1]['role'] == 'assistant': - messages[-1].setdefault('tool_calls', []).append(tc) - if not messages[-1].get('content'): - messages[-1]['content'] = None - else: - messages.append({'role': 'assistant', 'content': None, 'tool_calls': [tc]}) - i += 1 - continue - - # function_call_output(工具结果) - if item_type == 'function_call_output': - output = item.get('output', '') - if not isinstance(output, str): - output = json.dumps(output, ensure_ascii=False) - messages.append({ - 'role': 'tool', - 'tool_call_id': item.get('call_id', ''), - 'content': output, - }) - i += 1 - continue - - if role: - messages.append({'role': role, 'content': str(item.get('content', ''))}) - i += 1 - - -def _collect_function_calls(items, start): - """收集紧随 assistant message 之后的连续 function_call 项""" - tool_calls = [] - j = start - while j < len(items): - nxt = items[j] - if isinstance(nxt, dict) and nxt.get('type') == 'function_call': - tool_calls.append({ - 'id': nxt.get('call_id') or gen_id('call_'), - 'type': 'function', - 'function': { - 'name': nxt.get('name', ''), - 'arguments': nxt.get('arguments', '{}'), - }, - }) - j += 1 - else: - break - return tool_calls, j - start - - -def _extract_text(content): - """从 content 中提取纯文本""" - if isinstance(content, str): - return content - if not isinstance(content, list): - return str(content) if content else '' - texts = [] - for part in content: - if isinstance(part, str): - texts.append(part) - elif isinstance(part, dict): - t = part.get('type', '') - if t in ('output_text', 'input_text', 'text'): - texts.append(part.get('text', '')) - elif t == 'refusal': - texts.append(part.get('refusal', '')) - return '\n'.join(texts) if texts else '' - - -def _convert_tools(tools): - """将 Responses tools 转为 CC tools 格式""" - result = [] - for t in tools: - if t.get('type') != 'function': - continue - if 'function' in t: - result.append(t) - else: - result.append({ - 'type': 'function', - 'function': { - 'name': t.get('name', ''), - 'description': t.get('description', ''), - 'parameters': t.get('parameters', {'type': 'object', 'properties': {}}), - }, - }) - return result diff --git a/adapters/responses_cc_adapter.py b/adapters/responses_cc_adapter.py new file mode 100644 index 0000000..a3eb966 --- /dev/null +++ b/adapters/responses_cc_adapter.py @@ -0,0 +1,1052 @@ +"""Responses API 适配 + +Cursor 对 GPT、Claude-Opus 等模型使用 `/v1/responses` 格式。 +本模块负责在 Responses 与 Chat Completions 两种表示之间做双向转换: +- 请求方向:Responses → Chat Completions +- 响应方向:Chat Completions → Responses(非流式) +- 流式方向:CC chunk / Anthropic SSE → Responses SSE + +这个模块之所以相对复杂,是因为 Responses 在“输出项”层面比 Chat Completions 更细: +它会把思考、文本、工具调用拆成独立项目,因此流式场景必须靠状态机把不同来源的 +增量重新组织成稳定的 Responses 事件序列。 +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any + +from utils.http import gen_id + +JsonDict = dict[str, Any] + + +# ═══════════════════════════════════════════════════════════ +# 请求转换: Responses → CC +# ═══════════════════════════════════════════════════════════ + + +def responses_to_cc(payload: JsonDict) -> JsonDict: + """将 `/v1/responses` 请求转换为 `/v1/chat/completions` 请求。""" + messages: list[JsonDict] = [] + + if payload.get('instructions'): + messages.append({'role': 'system', 'content': payload['instructions']}) + + input_data = payload.get('input', []) + if isinstance(input_data, str): + messages.append({'role': 'user', 'content': input_data}) + elif isinstance(input_data, list): + _convert_input_items(input_data, messages) + + result: JsonDict = { + 'model': payload.get('model', ''), + 'messages': messages, + 'stream': payload.get('stream', False), + } + _copy_request_options(payload, result) + return result + + +def cc_to_responses_request(payload: JsonDict) -> JsonDict: + """将 Chat Completions 请求转换为原生 Responses 请求。""" + instructions: list[str] = [] + input_items: list[JsonDict] = [] + + for message in payload.get('messages', []): + _append_responses_input_item(message, instructions, input_items) + + result: JsonDict = { + 'model': payload.get('model', ''), + 'input': input_items, + 'stream': payload.get('stream', False), + } + if instructions: + result['instructions'] = '\n\n'.join(instructions) + _copy_responses_request_options(payload, result) + return result + + +# ═══════════════════════════════════════════════════════════ +# 非流式响应转换: CC → Responses +# ═══════════════════════════════════════════════════════════ + + +def cc_to_responses(cc_resp: JsonDict, model: str = '') -> JsonDict: + """将 Chat Completions 响应转换为 Responses 格式。""" + choice = (cc_resp.get('choices') or [{}])[0] + message = choice.get('message') or {} + finish_reason = choice.get('finish_reason', 'stop') + + return { + 'id': cc_resp.get('id', gen_id('resp_')), + 'object': 'response', + 'status': _response_status_from_finish_reason(finish_reason), + 'model': model or cc_resp.get('model', ''), + 'output': _build_responses_output(message), + 'usage': _build_responses_usage(cc_resp.get('usage', {})), + } + + +def responses_to_cc_response(response_data: JsonDict, model: str = '') -> JsonDict: + """将原生 Responses 非流式响应转换为 Chat Completions 响应。""" + output_items = response_data.get('output', []) + content_text, reasoning_text, tool_calls = _collect_cc_parts_from_responses_output(output_items) + finish_reason = _cc_finish_reason_from_responses(response_data, tool_calls) + message = { + 'role': 'assistant', + 'content': content_text or None, + } + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls + + usage = response_data.get('usage', {}) + return { + 'id': response_data.get('id', gen_id('chatcmpl-')), + 'object': 'chat.completion', + 'model': model or response_data.get('model', ''), + 'choices': [{ + 'index': 0, + 'message': message, + 'finish_reason': finish_reason, + }], + 'usage': { + 'prompt_tokens': usage.get('input_tokens', 0), + 'completion_tokens': usage.get('output_tokens', 0), + 'total_tokens': usage.get('total_tokens', 0), + }, + } + + +# ═══════════════════════════════════════════════════════════ +# 流式转换器: CC chunks / Anthropic SSE → Responses SSE +# ═══════════════════════════════════════════════════════════ + + +@dataclass +class _ToolBuffer: + """缓存单个工具调用的流式状态。""" + + name: str + args: str + call_id: str + fc_id: str + + +class ResponsesStreamConverter: + """有状态转换器:将 CC 流式 chunk、Anthropic SSE 或原生 Responses SSE 转换为 Responses SSE。 + + 这个类是 `/v1/responses` 流式链路里的核心状态机,负责把不同来源的增量统一整理成 + Responses 风格的事件序列。之所以必须做成有状态类,而不是简单的事件映射函数,是因为: + - Responses 协议按“输出项生命周期”发事件,而不是只按文本顺序吐 token + - 思考内容、普通文本、工具调用三类输出会彼此交错,需要维护关闭顺序 + - 工具调用的名称、参数增量、完成事件分散在多个事件里,必须用缓冲区重组 + - 同一个转换器既要处理 CC chunk,也要处理 Anthropic SSE 和原生 Responses SSE + + 这里内部主要维护三组状态: + 1. reasoning 输出项缓冲 + 2. assistant 文本输出项缓冲 + 3. function_call 输出项缓冲 + + 最终目标是保证前端或调用方看到的 Responses 事件顺序稳定、字段完整、状态闭合。 + """ + + def __init__(self, response_id: str | None = None, model: str = ''): + self.resp_id = response_id or gen_id('resp_') + self.model = model + + self._rs_buf = '' + self._rs_started = False + self._rs_closed = False + self._rs_id = gen_id('rs_') + + self._text_buf = '' + self._text_started = False + self._text_closed = False + self._msg_id = gen_id('msg_') + + self._tools: dict[int, _ToolBuffer] = {} + + self._output_items: list[JsonDict] = [] + self._finished = False + self._input_tokens = 0 + + def start_events(self) -> list[str]: + """生成 Responses 流式生命周期的起始事件。 + + 调用方在真正转发上游流之前应先发出这个事件,确保前端先拿到一个 + `response.created`,后续再逐步追加 output_item / delta / completed。 + """ + return [self._sse('response.created', { + 'id': self.resp_id, + 'object': 'response', + 'status': 'in_progress', + 'model': self.model, + 'output': [], + })] + + def process_cc_chunk(self, chunk: JsonDict) -> list[str]: + """处理单个 Chat Completions chunk。 + + 这个入口用于“先转成 CC,再转成 Responses”的链路。它会把一个 chunk 中的 + reasoning、文本、tool_calls 和 finish_reason 依次拆解成 Responses 事件。 + """ + events: list[str] = [] + usage = chunk.get('usage') + for choice in chunk.get('choices') or []: + events.extend(self._process_cc_choice(choice, usage)) + return events + + def process_anthropic_event(self, event_type: str, event_data: JsonDict) -> list[str]: + """直接处理 Anthropic SSE 事件。 + + 这里故意跳过 CC 中间态,是为了减少一次协议重组,尽量保留 Anthropic 原始事件 + 的时序关系,同时降低流式转换的中间状态复杂度。 + """ + if event_type == 'message_start': + return self._handle_anthropic_message_start(event_data) + if event_type == 'content_block_start': + return self._handle_anthropic_content_block_start(event_data) + if event_type == 'content_block_delta': + return self._handle_anthropic_content_block_delta(event_data) + if event_type == 'message_delta': + return self._handle_anthropic_message_delta(event_data) + return [] + + def finalize(self) -> list[str]: + """在上游流自然结束但尚未显式完成时,补发收尾事件。 + + 有些后端不会补齐所有 completed 事件,这里统一把尚未关闭的 reasoning / text / + tool_call 项收尾,避免前端看到半开的输出项。 + """ + if self._finished: + return [] + self._finished = True + return self._finish_stream('stop', None) + + def process_responses_event(self, event_type: str, event_data: JsonDict) -> list[str]: + """处理上游原生 Responses SSE 事件。 + + 当当前链路本身就是 `/v1/responses -> /v1/responses` 时,这里主要做轻量重写: + 保持事件结构不变,只把顶层模型名改成 Cursor 侧看到的展示模型名。 + """ + if event_type == 'response.created': + return [self._sse(event_type, self._rewrite_top_level_model(event_data))] + if event_type == 'response.completed': + self._finished = True + return [self._sse(event_type, self._rewrite_top_level_model(event_data))] + return [self._sse(event_type, event_data)] + + def _process_cc_choice(self, choice: JsonDict, usage: Any) -> list[str]: + """处理单个 CC choice。 + + 同一个 chunk 里可能同时携带文本增量、思考增量、工具调用增量和结束原因; + 这里按 Responses 协议要求的输出项顺序,把它们拆成一组更细粒度的 SSE 事件。 + """ + events: list[str] = [] + delta = choice.get('delta') or {} + finish_reason = choice.get('finish_reason') + + if delta.get('reasoning_content'): + events.extend(self._append_reasoning_delta(delta['reasoning_content'])) + if delta.get('content') is not None and delta['content'] != '': + events.extend(self._append_text_delta(delta['content'])) + for tool_call in delta.get('tool_calls') or []: + events.extend(self._on_tool_call(tool_call)) + + if finish_reason and not self._finished: + self._finished = True + events.extend(self._finish_stream(finish_reason, usage)) + return events + + def _handle_anthropic_message_start(self, event_data: JsonDict) -> list[str]: + """记录输入令牌统计。 + + Anthropic 会在消息开始事件里给出 input_tokens,这里先缓存下来,等消息结束时再与 + output_tokens 合并成 Responses 需要的 usage 结构。 + """ + usage = event_data.get('message', {}).get('usage', {}) + self._input_tokens = usage.get('input_tokens', 0) + return [] + + def _handle_anthropic_content_block_start(self, event_data: JsonDict) -> list[str]: + """处理 Anthropic 内容块起始事件。 + + 不同 block 类型会开启不同的 Responses 输出项: + - thinking → reasoning 项 + - text → message 项 + - tool_use → function_call 项 + """ + block = event_data.get('content_block', {}) + block_type = block.get('type', '') + + if block_type == 'thinking': + return self._ensure_reasoning_started() + if block_type == 'text': + return self._ensure_text_started() + if block_type == 'tool_use': + return self._start_tool_from_block(block) + return [] + + def _handle_anthropic_content_block_delta(self, event_data: JsonDict) -> list[str]: + """处理 Anthropic 内容块增量事件。 + + 这里负责把 thinking_delta / text_delta / input_json_delta 映射成 Responses 的 + summary_text.delta / output_text.delta / function_call_arguments.delta。 + """ + delta = event_data.get('delta', {}) + delta_type = delta.get('type', '') + + if delta_type == 'thinking_delta' and delta.get('thinking'): + return self._append_reasoning_delta(delta['thinking']) + if delta_type == 'text_delta' and delta.get('text'): + return self._append_text_delta(delta['text']) + if delta_type == 'input_json_delta' and delta.get('partial_json') and self._tools: + index = max(self._tools.keys()) + return self._append_tool_arguments(index, delta['partial_json']) + return [] + + def _handle_anthropic_message_delta(self, event_data: JsonDict) -> list[str]: + """处理 Anthropic 消息结束事件。 + + 当上游发出 stop_reason 后,这里会把当前缓冲区统一收尾,并补出最终的 usage 和 + `response.completed` 事件。 + """ + if self._finished: + return [] + + delta = event_data.get('delta', {}) + usage = event_data.get('usage', {}) + finish_reason = _map_anthropic_stop_reason(delta.get('stop_reason', 'end_turn')) + self._finished = True + + usage_payload = { + 'input_tokens': self._input_tokens, + 'output_tokens': usage.get('output_tokens', 0), + 'total_tokens': self._input_tokens + usage.get('output_tokens', 0), + } + return self._finish_stream(finish_reason, usage_payload) + + def _ensure_reasoning_started(self) -> list[str]: + """确保 reasoning 输出项已经创建。 + + Responses 协议要求先有 `output_item.added`,后面才能合法发送 reasoning 的 delta。 + 所以这里负责幂等地创建 reasoning 项。 + """ + if self._rs_started: + return [] + self._rs_started = True + return [self._sse('response.output_item.added', { + 'type': 'reasoning', + 'id': self._rs_id, + 'summary': [], + })] + + def _append_reasoning_delta(self, text: str) -> list[str]: + """向 reasoning 输出项追加思考内容增量。""" + events = self._ensure_reasoning_started() + self._rs_buf += text + events.append(self._sse('response.reasoning_summary_text.delta', { + 'type': 'summary_text', + 'delta': text, + })) + return events + + def _append_text_delta(self, text: str) -> list[str]: + """向 assistant 文本输出项追加文本增量。""" + events = self._ensure_text_started() + self._text_buf += text + events.append(self._sse('response.output_text.delta', { + 'type': 'output_text', + 'delta': text, + })) + return events + + def _on_tool_call(self, tool_call: JsonDict) -> list[str]: + events: list[str] = [] + index = tool_call.get('index', 0) + function_data = tool_call.get('function') or {} + + if index not in self._tools: + call_id = tool_call.get('id', gen_id('call_')) + name = function_data.get('name', '') + events.extend(self._start_tool(index=index, call_id=call_id, name=name)) + + if function_data.get('name'): + self._tools[index].name = function_data['name'] + if function_data.get('arguments', ''): + events.extend(self._append_tool_arguments(index, function_data['arguments'])) + return events + + def _ensure_text_started(self) -> list[str]: + events: list[str] = [] + if self._rs_started and not self._rs_closed: + events.extend(self._close_reasoning()) + if not self._text_started: + self._text_started = True + events.append(self._sse('response.output_item.added', { + 'type': 'message', + 'id': self._msg_id, + 'status': 'in_progress', + 'role': 'assistant', + 'content': [], + })) + events.append(self._sse('response.content_part.added', { + 'type': 'output_text', + 'text': '', + })) + return events + + def _start_tool_from_block(self, block: JsonDict) -> list[str]: + return self._start_tool( + index=len(self._tools), + call_id=block.get('id', gen_id('toolu_')), + name=block.get('name', ''), + ) + + def _start_tool(self, *, index: int, call_id: str, name: str) -> list[str]: + events: list[str] = [] + if self._rs_started and not self._rs_closed: + events.extend(self._close_reasoning()) + if self._text_started and not self._text_closed: + events.extend(self._close_text()) + + function_call_id = gen_id('fc_') + self._tools[index] = _ToolBuffer( + name=name, + args='', + call_id=call_id, + fc_id=function_call_id, + ) + events.append(self._sse('response.output_item.added', { + 'type': 'function_call', + 'id': function_call_id, + 'status': 'in_progress', + 'call_id': call_id, + 'name': name, + 'arguments': '', + })) + return events + + def _append_tool_arguments(self, index: int, arguments_delta: str) -> list[str]: + buffer = self._tools[index] + buffer.args += arguments_delta + return [self._sse('response.function_call_arguments.delta', { + 'type': 'function_call', + 'delta': arguments_delta, + })] + + def _close_reasoning(self) -> list[str]: + if self._rs_closed: + return [] + self._rs_closed = True + + reasoning_item = { + 'type': 'reasoning', + 'id': self._rs_id, + 'summary': [{'type': 'summary_text', 'text': self._rs_buf}], + } + self._output_items.append(reasoning_item) + return [ + self._sse('response.reasoning_summary_text.done', { + 'type': 'summary_text', + 'text': self._rs_buf, + }), + self._sse('response.output_item.done', reasoning_item), + ] + + def _close_text(self) -> list[str]: + if self._text_closed: + return [] + self._text_closed = True + + message_item = { + 'type': 'message', + 'id': self._msg_id, + 'status': 'completed', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': self._text_buf}], + } + self._output_items.append(message_item) + return [ + self._sse('response.output_text.done', { + 'type': 'output_text', + 'text': self._text_buf, + }), + self._sse('response.output_item.done', message_item), + ] + + def _finish_stream(self, finish_reason: str, usage: Any) -> list[str]: + """统一关闭所有未完成输出项,并发出最终 completed 事件。 + + 这是整个状态机的统一收口点:无论结束来源是 CC、Anthropic 还是手动 finalize, + 最终都通过这里补齐 reasoning / text / tool_call 的 done 事件。 + """ + events: list[str] = [] + if self._rs_started and not self._rs_closed: + events.extend(self._close_reasoning()) + if self._text_started and not self._text_closed: + events.extend(self._close_text()) + events.extend(self._finish_tool_calls()) + + usage_data = usage if isinstance(usage, dict) else {} + events.append(self._sse('response.completed', { + 'id': self.resp_id, + 'object': 'response', + 'status': _response_status_from_finish_reason(finish_reason), + 'model': self.model, + 'output': self._output_items, + 'usage': usage_data, + })) + return events + + def _finish_tool_calls(self) -> list[str]: + """关闭所有尚未完成的 function_call 输出项。""" + events: list[str] = [] + for index in sorted(self._tools.keys()): + buffer = self._tools[index] + events.append(self._sse('response.function_call_arguments.done', { + 'type': 'function_call', + 'arguments': buffer.args, + })) + function_call_item = { + 'type': 'function_call', + 'id': buffer.fc_id, + 'status': 'completed', + 'call_id': buffer.call_id, + 'name': buffer.name, + 'arguments': buffer.args, + } + events.append(self._sse('response.output_item.done', function_call_item)) + self._output_items.append(function_call_item) + return events + + def _sse(self, event_type: str, data: JsonDict) -> str: + return f'event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n' + + def _rewrite_top_level_model(self, payload: JsonDict) -> JsonDict: + """在保持上游事件结构不变的前提下回填展示模型名。 + + Responses 原生事件经常会在顶层带一个 `model` 字段;这里不重写内部结构,只把这个 + 顶层字段替换为当前映射的 Cursor 模型名,避免界面显示上游原始模型名。 + """ + if not isinstance(payload, dict): + return payload + copied = dict(payload) + if copied.get('model'): + copied['model'] = self.model or copied['model'] + return copied + + +class ResponsesToCCStreamConverter: + """将原生 Responses SSE 事件转换为 Chat Completions chunk。 + + 当上游只支持 `/v1/responses` 时,聊天补全接口需要把 Responses 世界中的事件重新映射回 + OpenAI Chat Completions 的流式 chunk,保证 Cursor 仍然能按聊天补全协议消费结果。 + + 这个类主要负责三件事: + 1. 把 Responses 的文本与思考增量映射为 `content` / `reasoning_content` delta + 2. 把 function_call 项与 arguments 增量重组为 `tool_calls` 增量 + 3. 在 response.completed 时补出 finish_reason 与 usage + """ + + def __init__(self, request_id: str | None = None, model: str = ''): + self._id = request_id or gen_id('chatcmpl-') + self._model = model + self._tool_index = 0 + self._tool_slots: dict[str, int] = {} + self._usage: JsonDict = {} + + def process_event(self, event_type: str, event_data: JsonDict) -> list[JsonDict]: + """处理单个 Responses 事件并产出一个或多个 CC chunk。 + + 这里按事件类型分发,而不是一次性拼完整响应,是为了让聊天补全调用方仍然能保留 + 原生的流式体验。 + """ + if event_type == 'response.created': + return [self._make_chunk(delta={'role': 'assistant', 'content': ''})] + if event_type == 'response.output_text.delta': + return [self._make_chunk(delta={'content': event_data.get('delta', '')})] + if event_type == 'response.reasoning_summary_text.delta': + return [self._make_chunk(delta={'reasoning_content': event_data.get('delta', '')})] + if event_type == 'response.output_item.added': + return self._handle_output_item_added(event_data) + if event_type == 'response.function_call_arguments.delta': + return self._handle_function_arguments_delta(event_data) + if event_type == 'response.completed': + return self._handle_completed(event_data) + return [] + + def _handle_output_item_added(self, event_data: JsonDict) -> list[JsonDict]: + """处理 Responses 的 output_item.added 事件。 + + 这里主要关心 function_call,因为文本和 reasoning 的真正内容会分别在后续 delta + 事件里补全;function_call 则需要先创建一个空参数的 tool_call 槽位。 + """ + if event_data.get('type') != 'function_call': + return [] + call_id = event_data.get('call_id') or gen_id('call_') + index = self._tool_slots.setdefault(call_id, self._tool_index) + if index == self._tool_index: + self._tool_index += 1 + return [self._make_chunk(delta={ + 'tool_calls': [{ + 'index': index, + 'id': call_id, + 'type': 'function', + 'function': { + 'name': event_data.get('name', ''), + 'arguments': '', + }, + }] + })] + + def _handle_function_arguments_delta(self, event_data: JsonDict) -> list[JsonDict]: + """处理工具参数增量,并追加到最近一次打开的 tool_call 上。""" + if not self._tool_slots: + return [] + index = max(self._tool_slots.values()) + return [self._make_chunk(delta={ + 'tool_calls': [{ + 'index': index, + 'function': { + 'arguments': event_data.get('delta', ''), + }, + }] + })] + + def _handle_completed(self, event_data: JsonDict) -> list[JsonDict]: + """处理 response.completed,补出聊天补全流的最终收尾 chunk。""" + self._usage = event_data.get('usage', {}) or {} + finish_reason = 'tool_calls' if any( + isinstance(item, dict) and item.get('type') == 'function_call' + for item in event_data.get('output', []) + ) else 'stop' + chunk = self._make_chunk(delta={}, finish_reason=finish_reason) + chunk['usage'] = { + 'prompt_tokens': self._usage.get('input_tokens', 0), + 'completion_tokens': self._usage.get('output_tokens', 0), + 'total_tokens': self._usage.get('total_tokens', 0), + } + return [chunk] + + def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: + """构造标准 Chat Completions chunk。""" + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': self._id, + 'object': 'chat.completion.chunk', + 'model': self._model, + 'choices': [choice], + } + + +# ═══════════════════════════════════════════════════════════ +# 请求转换辅助 +# ═══════════════════════════════════════════════════════════ + + +def _copy_request_options(payload: JsonDict, result: JsonDict) -> None: + if 'tools' in payload: + result['tools'] = _convert_tools(payload['tools']) + for key in ('temperature', 'top_p'): + if key in payload: + result[key] = payload[key] + if 'max_output_tokens' in payload: + result['max_tokens'] = payload['max_output_tokens'] + if 'tool_choice' in payload: + result['tool_choice'] = payload['tool_choice'] + + +def _copy_responses_request_options(payload: JsonDict, result: JsonDict) -> None: + if 'tools' in payload: + result['tools'] = _convert_cc_tools_to_responses(payload['tools']) + for key in ('temperature', 'top_p', 'tool_choice'): + if key in payload: + result[key] = payload[key] + if 'max_tokens' in payload: + result['max_output_tokens'] = payload['max_tokens'] + + +def _append_responses_input_item( + message: Any, + instructions: list[str], + input_items: list[JsonDict], +) -> None: + if not isinstance(message, dict): + return + + role = message.get('role', '') + content = message.get('content') + + if role == 'system': + text = _content_to_text(content) + if text: + instructions.append(text) + return + + if role == 'tool': + input_items.append({ + 'type': 'function_call_output', + 'call_id': message.get('tool_call_id', ''), + 'output': _stringify_output(content), + }) + return + + item: JsonDict = { + 'type': 'message', + 'role': role or 'user', + 'content': _content_to_responses_parts(content), + } + input_items.append(item) + + if role == 'assistant': + for tool_call in message.get('tool_calls') or []: + input_items.append(_build_responses_function_call_item(tool_call)) + + +def _convert_input_items(items: list[Any], messages: list[JsonDict]) -> None: + index = 0 + while index < len(items): + item = items[index] + + if isinstance(item, str): + messages.append({'role': 'user', 'content': item}) + index += 1 + continue + + if not isinstance(item, dict): + index += 1 + continue + + item_type = item.get('type', '') + role = item.get('role', '') + + if role and not item_type: + messages.append({ + 'role': role, + 'content': _normalize_simple_content(item.get('content', '')), + }) + index += 1 + continue + + if item_type == 'message': + consumed = _append_message_item(items, start=index, messages=messages) + index += consumed + continue + + if item_type == 'function_call': + _append_function_call_item(item, messages) + index += 1 + continue + + if item_type == 'function_call_output': + messages.append(_convert_function_call_output_item(item)) + index += 1 + continue + + if role: + messages.append({'role': role, 'content': str(item.get('content', ''))}) + index += 1 + + +def _append_message_item(items: list[Any], *, start: int, messages: list[JsonDict]) -> int: + item = items[start] + role = item.get('role', 'assistant') + content = _extract_text(item.get('content', [])) + message: JsonDict = {'role': role, 'content': content or ''} + + if role == 'assistant': + tool_calls, consumed = _collect_function_calls(items, start + 1) + if tool_calls: + message['tool_calls'] = tool_calls + if not message['content']: + message['content'] = None + messages.append(message) + return 1 + consumed + + messages.append(message) + return 1 + + +def _append_function_call_item(item: JsonDict, messages: list[JsonDict]) -> None: + tool_call = _build_cc_tool_call(item) + + if messages and messages[-1]['role'] == 'assistant': + messages[-1].setdefault('tool_calls', []).append(tool_call) + if not messages[-1].get('content'): + messages[-1]['content'] = None + return + + messages.append({'role': 'assistant', 'content': None, 'tool_calls': [tool_call]}) + + +def _convert_function_call_output_item(item: JsonDict) -> JsonDict: + output = item.get('output', '') + if not isinstance(output, str): + output = json.dumps(output, ensure_ascii=False) + return { + 'role': 'tool', + 'tool_call_id': item.get('call_id', ''), + 'content': output, + } + + +def _normalize_simple_content(content: Any) -> str: + if isinstance(content, list): + return _extract_text(content) or '' + return str(content) if content is not None else '' + + +def _collect_function_calls(items: list[Any], start: int) -> tuple[list[JsonDict], int]: + tool_calls: list[JsonDict] = [] + index = start + while index < len(items): + next_item = items[index] + if isinstance(next_item, dict) and next_item.get('type') == 'function_call': + tool_calls.append(_build_cc_tool_call(next_item)) + index += 1 + else: + break + return tool_calls, index - start + + +def _build_cc_tool_call(item: JsonDict) -> JsonDict: + return { + 'id': item.get('call_id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': item.get('name', ''), + 'arguments': item.get('arguments', '{}'), + }, + } + + +# ═══════════════════════════════════════════════════════════ +# 非流式响应转换辅助 +# ═══════════════════════════════════════════════════════════ + + +def _build_responses_output(message: JsonDict) -> list[JsonDict]: + output: list[JsonDict] = [] + + if message.get('reasoning_content'): + output.append(_make_reasoning_output_item(message['reasoning_content'])) + if message.get('content'): + output.append(_make_message_output_item(message['content'])) + for tool_call in message.get('tool_calls') or []: + output.append(_make_function_call_output_item(tool_call)) + + return output + + +def _make_reasoning_output_item(text: str) -> JsonDict: + return { + 'type': 'reasoning', + 'id': gen_id('rs_'), + 'summary': [{'type': 'summary_text', 'text': text}], + } + + +def _make_message_output_item(text: str) -> JsonDict: + return { + 'type': 'message', + 'id': gen_id('msg_'), + 'status': 'completed', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': text}], + } + + +def _make_function_call_output_item(tool_call: JsonDict) -> JsonDict: + function_data = tool_call.get('function') or {} + return { + 'type': 'function_call', + 'id': gen_id('fc_'), + 'status': 'completed', + 'call_id': tool_call.get('id', gen_id('call_')), + 'name': function_data.get('name', ''), + 'arguments': function_data.get('arguments', '{}'), + } + + +def _build_responses_usage(usage: JsonDict) -> JsonDict: + return { + 'input_tokens': usage.get('prompt_tokens', 0), + 'output_tokens': usage.get('completion_tokens', 0), + 'total_tokens': usage.get('total_tokens', 0), + } + + +def _collect_cc_parts_from_responses_output(output_items: Any) -> tuple[str, str, list[JsonDict]]: + content_text = '' + reasoning_text = '' + tool_calls: list[JsonDict] = [] + + if not isinstance(output_items, list): + return content_text, reasoning_text, tool_calls + + for item in output_items: + if not isinstance(item, dict): + continue + item_type = item.get('type', '') + if item_type == 'message': + content_text += _extract_text(item.get('content', [])) + elif item_type == 'reasoning': + reasoning_text += _extract_reasoning_text(item) + elif item_type == 'function_call': + tool_calls.append(_build_cc_tool_call_from_responses_output(item, index=len(tool_calls))) + + return content_text, reasoning_text, tool_calls + + +def _extract_reasoning_text(item: JsonDict) -> str: + summary = item.get('summary', []) + if not isinstance(summary, list): + return '' + texts: list[str] = [] + for part in summary: + if isinstance(part, dict) and part.get('type') == 'summary_text': + texts.append(part.get('text', '')) + return ''.join(texts) + + +def _build_cc_tool_call_from_responses_output(item: JsonDict, *, index: int) -> JsonDict: + return { + 'index': index, + 'id': item.get('call_id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': item.get('name', ''), + 'arguments': item.get('arguments', '{}'), + }, + } + + +def _cc_finish_reason_from_responses(response_data: JsonDict, tool_calls: list[JsonDict]) -> str: + if tool_calls: + return 'tool_calls' + if response_data.get('status') == 'incomplete': + return 'length' + return 'stop' + + +def _response_status_from_finish_reason(finish_reason: str) -> str: + return 'incomplete' if finish_reason == 'length' else 'completed' + + +def _map_anthropic_stop_reason(stop_reason: str) -> str: + return {'tool_use': 'tool_calls', 'max_tokens': 'length'}.get(stop_reason, 'stop') + + +# ═══════════════════════════════════════════════════════════ +# 通用辅助 +# ═══════════════════════════════════════════════════════════ + + +def _extract_text(content: Any) -> str: + if isinstance(content, str): + return content + if not isinstance(content, list): + return str(content) if content else '' + + texts: list[str] = [] + for part in content: + if isinstance(part, str): + texts.append(part) + elif isinstance(part, dict): + part_type = part.get('type', '') + if part_type in ('output_text', 'input_text', 'text'): + texts.append(part.get('text', '')) + elif part_type == 'refusal': + texts.append(part.get('refusal', '')) + return '\n'.join(texts) if texts else '' + + +def _content_to_text(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + return _extract_text(content) + return str(content) if content is not None else '' + + +def _content_to_responses_parts(content: Any) -> list[JsonDict]: + if isinstance(content, list): + text = _extract_text(content) + else: + text = _content_to_text(content) + return [{'type': 'input_text', 'text': text}] if text else [] + + +def _stringify_output(content: Any) -> str: + if isinstance(content, str): + return content + if content is None: + return '' + return json.dumps(content, ensure_ascii=False) if not isinstance(content, str) else content + + +def _build_responses_function_call_item(tool_call: JsonDict) -> JsonDict: + function_data = tool_call.get('function') or {} + return { + 'type': 'function_call', + 'call_id': tool_call.get('id', gen_id('call_')), + 'name': function_data.get('name', ''), + 'arguments': function_data.get('arguments', '{}'), + } + + +def _convert_cc_tools_to_responses(tools: Any) -> list[JsonDict]: + if not isinstance(tools, list): + return [] + + result: list[JsonDict] = [] + for tool in tools: + if not isinstance(tool, dict): + continue + if tool.get('type') == 'function' and 'function' in tool: + function_data = tool['function'] + result.append({ + 'type': 'function', + 'name': function_data.get('name', ''), + 'description': function_data.get('description', ''), + 'parameters': function_data.get('parameters', {'type': 'object', 'properties': {}}), + }) + elif tool.get('type') == 'function': + result.append(tool) + return result + + +def _convert_tools(tools: Any) -> list[JsonDict]: + if not isinstance(tools, list): + return [] + + result: list[JsonDict] = [] + for tool in tools: + converted = _convert_tool_definition(tool) + if converted is not None: + result.append(converted) + return result + + +def _convert_tool_definition(tool: Any) -> JsonDict | None: + if not isinstance(tool, dict): + return None + if tool.get('type') != 'function': + return None + if 'function' in tool: + return tool + return { + 'type': 'function', + 'function': { + 'name': tool.get('name', ''), + 'description': tool.get('description', ''), + 'parameters': tool.get('parameters', {'type': 'object', 'properties': {}}), + }, + } diff --git a/routes/chat.py b/routes/chat.py index 8e106da..89382e0 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -1,216 +1,388 @@ """路由: /v1/chat/completions 处理 Cursor 发来的 OpenAI Chat Completions 格式请求。 -根据模型映射的 backend 字段分发到 OpenAI 或 Anthropic 后端。 +根据模型映射的后端类型,转发到 OpenAI 兼容接口、Anthropic Messages 接口, +或原生 OpenAI Responses 接口。 """ +from __future__ import annotations + import json import logging +from typing import Any -from flask import Blueprint, request, jsonify +from flask import Blueprint, jsonify, request -import settings -from config import Config -from adapters.openai_fixer import normalize_request, fix_response, fix_stream_chunk -from adapters.openai_anthropic import ( - cc_to_messages_request, messages_to_cc_response, AnthropicStreamConverter, +from adapters.cc_anthropic_adapter import ( + AnthropicStreamConverter, + cc_to_messages_request, + messages_to_cc_response, +) +from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request +from adapters.responses_cc_adapter import ( + ResponsesToCCStreamConverter, + cc_to_responses_request, + responses_to_cc, + responses_to_cc_response, +) +from config import Config +from routes.common import ( + RouteContext, + build_anthropic_target, + build_openai_target, + build_responses_target, + build_route_context, + chat_error_chunk, + log_route_context, + log_usage, + sse_data_message, ) -from adapters.responses_adapter import responses_to_cc from utils.http import ( - build_openai_headers, build_anthropic_headers, - forward_request, sse_response, - iter_openai_sse, iter_anthropic_sse, + forward_request, + iter_anthropic_sse, + iter_openai_sse, + iter_responses_sse, + sse_response, ) from utils.think_tag import ThinkTagExtractor + logger = logging.getLogger(__name__) - -def _dbg(msg): - """DEBUG 模式下输出详细日志""" - if Config.DEBUG: - logger.info(f'[调试] {msg}') - bp = Blueprint('chat', __name__) +def _dbg(message: str) -> None: + """仅在调试模式下输出详细日志。""" + if Config.DEBUG: + logger.info('[聊天补全调试] %s', message) + + @bp.route('/v1/chat/completions', methods=['POST']) def chat_completions(): + """处理聊天补全请求并按模型映射分发到不同后端。""" payload = request.get_json(force=True) + payload, message_count = _normalize_chat_payload(payload) + + client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) - # 保留 Cursor 发送的原始模型名,响应时需要回填 - cursor_model = payload.get('model', 'unknown') - msg_count = len(payload.get('messages', [])) + ctx = build_route_context(client_model, is_stream) - # 容错:Responses 格式误入 CC 端点 - if msg_count == 0 and 'input' in payload: - logger.info('检测到 Responses 格式(有 input 无 messages),自动转换') - payload = responses_to_cc(payload) - msg_count = len(payload.get('messages', [])) - elif msg_count == 0: - logger.warning(f'messages 为空, payload keys: {list(payload.keys())}') - - mapping = settings.resolve_model(cursor_model) - backend = mapping['backend'] - upstream = mapping['upstream_model'] - url_base = mapping['target_url'] - api_key = mapping['api_key'] - - logger.info( - f'[CC] {cursor_model} → {upstream} ' - f'后端={backend} 流式={is_stream} 消息数={msg_count}' - ) + log_route_context('聊天补全', ctx, extra=f'消息数={message_count}') _log_messages(payload) - if backend == 'openai': - return _via_openai(payload, upstream, url_base, api_key, is_stream, cursor_model) - else: - return _via_anthropic(payload, upstream, url_base, api_key, is_stream, cursor_model) + if ctx.backend == 'openai': + return _handle_openai_backend(ctx, payload) + if ctx.backend == 'responses': + return _handle_responses_backend(ctx, payload) + return _handle_anthropic_backend(ctx, payload) -# ─── OpenAI 后端 ────────────────────────────────── +def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], int]: + """整理聊天补全入口的请求体。 + + 这里保留了一层兼容逻辑:当 Cursor 或调用方把 Responses 格式误发到 + `/v1/chat/completions` 时,先降级转换成 Chat Completions,再进入统一主流程。 + """ + message_count = len(payload.get('messages', [])) + + if message_count == 0 and 'input' in payload: + logger.info('检测到 Responses 格式误入聊天补全接口,已自动转换为 Chat Completions 格式') + payload = responses_to_cc(payload) + message_count = len(payload.get('messages', [])) + elif message_count == 0: + logger.warning('消息列表为空,请求字段=%s', list(payload.keys())) + + return payload, message_count -def _via_openai(payload, upstream, url_base, api_key, is_stream, cursor_model): - """通过 OpenAI 兼容后端转发""" - _dbg(f'Cursor 原始请求 keys={list(payload.keys())} ' - f'其他字段={json.dumps({k: v for k, v in payload.items() if k != "messages"}, ensure_ascii=False, default=str)[:500]}') +def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any]): + """处理走 OpenAI 兼容后端的聊天补全请求。""" + _dbg( + '原始请求字段=' + str(list(payload.keys())) + ' ' + + '附加字段=' + + json.dumps( + {k: v for k, v in payload.items() if k != 'messages'}, + ensure_ascii=False, + default=str, + )[:500] + ) - payload = normalize_request(payload, upstream) - _dbg(f'normalize 后 model={payload.get("model")} tools数={len(payload.get("tools", []))}') + payload = normalize_request(payload, ctx.upstream_model) + _dbg( + f'标准化完成:模型={payload.get("model")} ' + f'工具数={len(payload.get("tools", []))}' + ) - headers = build_openai_headers(api_key) - url = f'{url_base.rstrip("/")}/v1/chat/completions' + url, headers = build_openai_target(ctx) - if not is_stream: - payload['stream'] = False - resp, err = forward_request(url, headers, payload) - if err: - return err - raw = resp.json() - _dbg(f'上游原始响应={json.dumps(raw, ensure_ascii=False, default=str)[:1000]}') - data = fix_response(raw) - data['model'] = cursor_model - _dbg(f'修复后响应={json.dumps(data, ensure_ascii=False, default=str)[:1000]}') - usage = data.get('usage', {}) - logger.info( - f'[CC] 完成 prompt={usage.get("prompt_tokens", 0)} ' - f'completion={usage.get("completion_tokens", 0)}' - ) - return jsonify(data) + if ctx.is_stream: + return _handle_openai_stream(ctx, payload, url, headers) + return _handle_openai_non_stream(ctx, payload, url, headers) - # 流式处理 + +def _handle_openai_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 OpenAI 兼容后端的非流式返回。""" + payload['stream'] = False + resp, err = forward_request(url, headers, payload) + if err: + return err + + raw = resp.json() + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = fix_response(raw) + return _finalize_chat_response(ctx, data, debug_label='修复后响应') + + +def _handle_openai_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 OpenAI 兼容后端的流式返回。""" payload['stream'] = True - _n = [0] def generate(): resp, err = forward_request(url, headers, payload, stream=True) if err: - yield f'data: {json.dumps({"error": {"message": err, "type": "upstream_error"}})}\n\n' + yield chat_error_chunk(str(err)) return - think_ext = ThinkTagExtractor() + think_extractor = ThinkTagExtractor() + chunk_count = 0 for chunk in iter_openai_sse(resp): - if chunk is None: # [DONE] - _dbg(f'流结束,共 {_n[0]} 个 chunk') - yield 'data: [DONE]\n\n' + if chunk is None: + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + yield sse_data_message('[DONE]') return - if _n[0] < 10: - _dbg(f'上游原始 chunk#{_n[0]}={json.dumps(chunk, ensure_ascii=False, default=str)[:500]}') + if chunk_count < 10: + _dbg( + f'上游原始片段#{chunk_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) chunk = fix_stream_chunk(chunk) - chunk['model'] = cursor_model + chunk['model'] = ctx.client_model - for out in think_ext.process_chunk(chunk): - if _n[0] < 10: - _dbg(f'发给Cursor chunk#{_n[0]}={json.dumps(out, ensure_ascii=False, default=str)[:500]}') - yield f'data: {json.dumps(out)}\n\n' + for out in think_extractor.process_chunk(chunk): + if chunk_count < 10: + _dbg( + f'返回片段#{chunk_count}=' + + json.dumps(out, ensure_ascii=False, default=str)[:500] + ) + yield sse_data_message(out) - _n[0] += 1 + chunk_count += 1 return sse_response(generate()) -# ─── Anthropic 后端 ─────────────────────────────── +def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any]): + """处理走原生 Responses 后端的聊天补全请求。 + + 当上游只支持 `/v1/responses` 时,需要先把聊天补全请求转换为 Responses 请求, + 返回时再转换回聊天补全协议。 + """ + responses_payload = cc_to_responses_request(payload) + responses_payload['model'] = ctx.upstream_model + _dbg( + '已转换为 Responses 请求:字段=' + str(list(responses_payload.keys())) + + f' 输入项数={len(responses_payload.get("input", []))}' + ) + + url, headers = build_responses_target(ctx) + + if ctx.is_stream: + return _handle_responses_stream(ctx, responses_payload, url, headers) + return _handle_responses_non_stream(ctx, responses_payload, url, headers) -def _via_anthropic(payload, upstream, url_base, api_key, is_stream, cursor_model): - """通过 Anthropic 后端转发(CC → Messages → CC)""" - payload['model'] = upstream - anthropic_payload = cc_to_messages_request(payload) - _dbg(f'CC→Messages 转换后 keys={list(anthropic_payload.keys())} ' - f'messages数={len(anthropic_payload.get("messages", []))}') +def _handle_responses_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理原生 Responses 后端的非流式返回。""" + payload['stream'] = False + resp, err = forward_request(url, headers, payload) + if err: + return err - headers = build_anthropic_headers(api_key) - url = f'{url_base.rstrip("/")}/v1/messages' + raw = resp.json() + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - if not is_stream: - anthropic_payload['stream'] = False - resp, err = forward_request(url, headers, anthropic_payload) - if err: - return err - raw = resp.json() - _dbg(f'上游原始响应={json.dumps(raw, ensure_ascii=False, default=str)[:1000]}') - data = messages_to_cc_response(raw) - data['model'] = cursor_model - _dbg(f'Messages→CC 转换后={json.dumps(data, ensure_ascii=False, default=str)[:1000]}') - usage = data.get('usage', {}) - logger.info( - f'[CC] 完成 prompt={usage.get("prompt_tokens", 0)} ' - f'completion={usage.get("completion_tokens", 0)}' - ) - return jsonify(data) + data = responses_to_cc_response(raw, ctx.client_model) + return _finalize_chat_response(ctx, data, debug_label='Responses 转回聊天补全后') - # 流式处理 - anthropic_payload['stream'] = True - converter = AnthropicStreamConverter() - _n = [0] + +def _handle_responses_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理原生 Responses 后端的流式返回。""" + payload['stream'] = True + converter = ResponsesToCCStreamConverter(model=ctx.client_model) def generate(): - resp, err = forward_request(url, headers, anthropic_payload, stream=True) + resp, err = forward_request(url, headers, payload, stream=True) if err: - yield f'data: {json.dumps({"error": {"message": err, "type": "upstream_error"}})}\n\n' + yield chat_error_chunk(str(err)) return + event_count = 0 + for event_type, event_data in iter_responses_sse(resp): + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + + for chunk in converter.process_event(event_type, event_data): + if event_count < 10: + _dbg( + f'返回片段#{event_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) + yield sse_data_message(chunk) + + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + yield sse_data_message('[DONE]') + + return sse_response(generate()) + + +def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any]): + """处理走 Anthropic Messages 后端的聊天补全请求。""" + payload['model'] = ctx.upstream_model + anthropic_payload = cc_to_messages_request(payload) + _dbg( + '已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys())) + + f' 消息数={len(anthropic_payload.get("messages", []))}' + ) + + url, headers = build_anthropic_target(ctx) + + if ctx.is_stream: + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers) + + +def _handle_anthropic_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 Anthropic 后端的非流式返回。""" + payload['stream'] = False + resp, err = forward_request(url, headers, payload) + if err: + return err + + raw = resp.json() + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = messages_to_cc_response(raw) + return _finalize_chat_response(ctx, data, debug_label='Messages 转回聊天补全后') + + +def _handle_anthropic_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 Anthropic 后端的流式返回。 + + 这里仍然保留独立的事件级转换器,而不是先落成完整响应再回放, + 是为了尽量保持 Cursor 端的流式体验和工具调用时序。 + """ + payload['stream'] = True + converter = AnthropicStreamConverter() + + def generate(): + resp, err = forward_request(url, headers, payload, stream=True) + if err: + yield chat_error_chunk(str(err)) + return + + event_count = 0 for event_type, event_data in iter_anthropic_sse(resp): - if _n[0] < 10: - _dbg(f'上游事件#{_n[0]} {event_type}={json.dumps(event_data, ensure_ascii=False, default=str)[:500]}') + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) for chunk_str in converter.process_event(event_type, event_data): try: chunk_obj = json.loads(chunk_str) - chunk_obj['model'] = cursor_model - chunk_str = json.dumps(chunk_obj) + chunk_obj['model'] = ctx.client_model + chunk_str = json.dumps(chunk_obj, ensure_ascii=False) except (json.JSONDecodeError, TypeError): pass - if _n[0] < 10: - _dbg(f'发给Cursor chunk#{_n[0]}={chunk_str[:500]}') - yield f'data: {chunk_str}\n\n' - _n[0] += 1 + if event_count < 10: + _dbg(f'返回片段#{event_count}={chunk_str[:500]}') + yield sse_data_message(chunk_str) - _dbg(f'流结束,共 {_n[0]} 个事件') - yield 'data: [DONE]\n\n' + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + yield sse_data_message('[DONE]') return sse_response(generate()) -def _log_messages(payload): - """记录请求中的消息摘要""" - for i, msg in enumerate(payload.get('messages', [])): - role = msg.get('role', '?') - content = msg.get('content') +def _finalize_chat_response( + ctx: RouteContext, + data: dict[str, Any], + *, + debug_label: str, +): + """统一收尾非流式聊天补全响应。 + + 三条后端链路最终都会回到 Chat Completions 格式,因此这里集中做: + - 回填给 Cursor 展示的模型名 + - 输出统一调试日志 + - 输出统一令牌统计日志 + """ + data['model'] = ctx.client_model + _dbg(debug_label + '=' + json.dumps(data, ensure_ascii=False, default=str)[:1000]) + log_usage('聊天补全', data.get('usage', {}), input_key='prompt_tokens', output_key='completion_tokens') + return jsonify(data) + + +def _log_messages(payload: dict[str, Any]) -> None: + """记录消息摘要,方便排查请求形态是否符合预期。""" + for index, message in enumerate(payload.get('messages', [])): + role = message.get('role', '?') + content = message.get('content') extra = '' - if 'tool_calls' in msg: - extra += f' tool_calls={len(msg["tool_calls"])}' - if msg.get('tool_call_id'): - extra += f' tool_call_id={msg["tool_call_id"]}' + + if 'tool_calls' in message: + extra += f' 工具调用数={len(message["tool_calls"])}' + if message.get('tool_call_id'): + extra += f' 工具调用ID={message["tool_call_id"]}' if isinstance(content, list): - info = f'list[{len(content)}]' + content_info = f'列表[{len(content)}]' elif isinstance(content, str): - info = f'str[{len(content)}]' + content_info = f'文本[{len(content)}]' else: - info = type(content).__name__ - logger.info(f' 消息[{i}] {role} {info}{extra}') + content_info = type(content).__name__ + + logger.info(' 消息[%s] 角色=%s 内容=%s%s', index, role, content_info, extra) diff --git a/routes/common.py b/routes/common.py new file mode 100644 index 0000000..6964095 --- /dev/null +++ b/routes/common.py @@ -0,0 +1,118 @@ +"""路由层公共辅助 + +收敛多个数据面路由都会用到的上下文解析、上游目标构造、日志输出和 +SSE 消息拼装逻辑,避免 `chat.py` 和 `responses.py` 各自维护重复实现。 +""" + +from __future__ import annotations + +from dataclasses import dataclass +import json +import logging +from typing import Any + +import settings +from utils.http import build_anthropic_headers, build_openai_headers + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class RouteContext: + """数据面路由使用的标准请求上下文。""" + + client_model: str + upstream_model: str + backend: str + target_url: str + api_key: str + is_stream: bool + + +def build_route_context(client_model: str, is_stream: bool) -> RouteContext: + """解析模型映射,得到当前请求的统一路由上下文。""" + mapping = settings.resolve_model(client_model) + return RouteContext( + client_model=client_model, + upstream_model=mapping['upstream_model'], + backend=mapping['backend'], + target_url=mapping['target_url'], + api_key=mapping['api_key'], + is_stream=is_stream, + ) + + +def build_openai_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 OpenAI 兼容后端的地址和请求头。""" + url = f'{ctx.target_url.rstrip("/")}/v1/chat/completions' + headers = build_openai_headers(ctx.api_key) + return url, headers + + +def build_responses_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 OpenAI Responses 后端的地址和请求头。""" + url = f'{ctx.target_url.rstrip("/")}/v1/responses' + headers = build_openai_headers(ctx.api_key) + return url, headers + + +def build_anthropic_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 Anthropic 后端的地址和请求头。""" + url = f'{ctx.target_url.rstrip("/")}/v1/messages' + headers = build_anthropic_headers(ctx.api_key) + return url, headers + + +def log_route_context(route_name: str, ctx: RouteContext, *, extra: str = '') -> None: + """统一输出路由级日志,避免不同入口的日志格式逐渐漂移。""" + parts = [ + f'[{route_name}]', + f'模型={ctx.client_model}', + f'上游模型={ctx.upstream_model}', + f'后端={ctx.backend}', + f'流式={ctx.is_stream}', + ] + if extra: + parts.append(extra) + logger.info(' '.join(parts)) + + +def log_usage( + route_name: str, + usage: dict[str, Any], + *, + input_key: str, + output_key: str, +) -> None: + """统一输出令牌统计日志。 + + 不同协议对 usage 字段命名不一致,这里只接收字段名,不在调用方重复拼接日志文案。 + """ + logger.info( + '[%s] 请求完成 输入令牌=%s 输出令牌=%s', + route_name, + usage.get(input_key, 0), + usage.get(output_key, 0), + ) + + +def sse_data_message(data: Any) -> str: + """构造仅包含 data 的 SSE 消息。""" + payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False) + return f'data: {payload}\n\n' + + +def sse_event_message(event_type: str, data: Any) -> str: + """构造带 event 名称的 SSE 消息。""" + payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False) + return f'event: {event_type}\ndata: {payload}\n\n' + + +def chat_error_chunk(message: str, error_type: str = 'upstream_error') -> str: + """构造聊天补全流式接口使用的错误消息。""" + return sse_data_message({'error': {'message': message, 'type': error_type}}) + + +def responses_error_event(message: str) -> str: + """构造 Responses 流式接口使用的错误事件。""" + return sse_event_message('error', {'error': message}) diff --git a/routes/responses.py b/routes/responses.py index 12cbd2a..617814a 100644 --- a/routes/responses.py +++ b/routes/responses.py @@ -1,22 +1,37 @@ """路由: /v1/responses -处理 Cursor 对 GPT/Claude-Opus 等模型发出的 Responses API 格式请求。 -转换为 CC 格式后分发到对应后端,响应再转回 Responses 格式。 +处理 Cursor 对 GPT、Claude-Opus 等模型发出的 Responses API 请求。 +请求会先转换为 Chat Completions 中间表示,再按后端类型分发,最后转换回 Responses 格式。 """ +from __future__ import annotations + import json import logging +from typing import Any -from flask import Blueprint, request, jsonify +from flask import Blueprint, jsonify, request -import settings -from adapters.responses_adapter import responses_to_cc, cc_to_responses, ResponsesStreamConverter -from adapters.openai_fixer import normalize_request, fix_response, fix_stream_chunk -from adapters.openai_anthropic import cc_to_messages_request, messages_to_cc_response +from adapters.cc_anthropic_adapter import cc_to_messages_request, messages_to_cc_response +from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request +from adapters.responses_cc_adapter import ResponsesStreamConverter, cc_to_responses, responses_to_cc +from config import Config +from routes.common import ( + RouteContext, + build_anthropic_target, + build_openai_target, + build_responses_target, + build_route_context, + log_route_context, + log_usage, + responses_error_event, +) from utils.http import ( - build_openai_headers, build_anthropic_headers, - forward_request, sse_response, - iter_openai_sse, iter_anthropic_sse, + forward_request, + iter_anthropic_sse, + iter_openai_sse, + iter_responses_sse, + sse_response, ) from utils.think_tag import ThinkTagExtractor @@ -25,102 +40,272 @@ logger = logging.getLogger(__name__) bp = Blueprint('responses', __name__) +def _dbg(message: str) -> None: + """仅在调试模式下输出详细日志。""" + if Config.DEBUG: + logger.info('[响应生成调试] %s', message) + + @bp.route('/v1/responses', methods=['POST']) def responses_endpoint(): + """处理 Responses 请求并按模型映射分发。""" payload = request.get_json(force=True) - model = payload.get('model', 'unknown') + client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) - mapping = settings.resolve_model(model) - backend = mapping['backend'] - upstream = mapping['upstream_model'] - url_base = mapping['target_url'] - api_key = mapping['api_key'] + ctx = build_route_context(client_model, is_stream) + log_route_context('响应生成', ctx) - logger.info(f'[Responses] {model} → {upstream} 后端={backend} 流式={is_stream}') + cc_payload = _build_cc_payload(payload, ctx) - # Responses → CC + if ctx.backend == 'openai': + return _handle_openai_backend(ctx, cc_payload) + if ctx.backend == 'responses': + return _handle_responses_backend(ctx, payload) + return _handle_anthropic_backend(ctx, cc_payload) + + +def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, Any]: + """将 Responses 请求统一降级为 Chat Completions 中间表示。 + + 这样后续无论走 OpenAI 兼容后端还是 Anthropic 后端,都能复用一套 + 中间协议,避免在路由层同时维护两套完全不同的请求编排逻辑。 + """ cc_payload = responses_to_cc(payload) - cc_payload['model'] = upstream - - if backend == 'openai': - return _via_openai(cc_payload, url_base, api_key, is_stream, model) - else: - return _via_anthropic(cc_payload, url_base, api_key, is_stream, model) + cc_payload['model'] = ctx.upstream_model + _dbg( + '已转换为聊天补全中间表示:字段=' + str(list(cc_payload.keys())) + + f' 消息数={len(cc_payload.get("messages", []))}' + ) + return cc_payload -# ─── OpenAI 后端 ────────────────────────────────── - - -def _via_openai(cc_payload, url_base, api_key, is_stream, display_model): - """通过 OpenAI 后端处理""" +def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any]): + """处理走 OpenAI 兼容后端的 Responses 请求。""" cc_payload = normalize_request(cc_payload) - headers = build_openai_headers(api_key) - url = f'{url_base.rstrip("/")}/v1/chat/completions' + _dbg( + f'标准化完成:模型={cc_payload.get("model")} ' + f'工具数={len(cc_payload.get("tools", []))}' + ) - if not is_stream: - cc_payload['stream'] = False - resp, err = forward_request(url, headers, cc_payload) - if err: - return err - return jsonify(cc_to_responses(fix_response(resp.json()), display_model)) + url, headers = build_openai_target(ctx) - # 流式处理 + if ctx.is_stream: + return _handle_openai_stream(ctx, cc_payload, url, headers) + return _handle_openai_non_stream(ctx, cc_payload, url, headers) + + +def _handle_openai_non_stream( + ctx: RouteContext, + cc_payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 OpenAI 兼容后端的非流式 Responses 返回。""" + cc_payload['stream'] = False + resp, err = forward_request(url, headers, cc_payload) + if err: + return err + + raw = resp.json() + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + fixed = fix_response(raw) + response_data = cc_to_responses(fixed, ctx.client_model) + return _finalize_responses_response(response_data, debug_label='转换为 Responses 后') + + +def _handle_openai_stream( + ctx: RouteContext, + cc_payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 OpenAI 兼容后端的流式 Responses 返回。""" cc_payload['stream'] = True - converter = ResponsesStreamConverter(model=display_model) + converter = ResponsesStreamConverter(model=ctx.client_model) def generate(): yield from converter.start_events() resp, err = forward_request(url, headers, cc_payload, stream=True) if err: - yield f'event: error\ndata: {json.dumps({"error": err})}\n\n' + yield responses_error_event(str(err)) return - think_ext = ThinkTagExtractor() + think_extractor = ThinkTagExtractor() + chunk_count = 0 + for chunk in iter_openai_sse(resp): if chunk is None: + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') yield from converter.finalize() return + + if chunk_count < 10: + _dbg( + f'上游原始片段#{chunk_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) + chunk = fix_stream_chunk(chunk) - for out in think_ext.process_chunk(chunk): + for out in think_extractor.process_chunk(chunk): + if chunk_count < 10: + _dbg( + f'转换后片段#{chunk_count}=' + + json.dumps(out, ensure_ascii=False, default=str)[:500] + ) yield from converter.process_cc_chunk(out) + chunk_count += 1 + return sse_response(generate()) -# ─── Anthropic 后端 ─────────────────────────────── +def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any]): + """处理走原生 Responses 后端的请求。 + + 当中转站本身就只支持 `/v1/responses` 时,不需要再绕到聊天补全中间协议, + 直接转发原生 Responses 请求即可。 + """ + payload = dict(payload) + payload['model'] = ctx.upstream_model + url, headers = build_responses_target(ctx) + + if ctx.is_stream: + return _handle_responses_stream(ctx, payload, url, headers) + return _handle_responses_non_stream(ctx, payload, url, headers) -def _via_anthropic(cc_payload, url_base, api_key, is_stream, display_model): - """通过 Anthropic 后端处理""" - anthropic_payload = cc_to_messages_request(cc_payload) - headers = build_anthropic_headers(api_key) - url = f'{url_base.rstrip("/")}/v1/messages' +def _handle_responses_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理原生 Responses 后端的非流式返回。""" + payload['stream'] = False + resp, err = forward_request(url, headers, payload) + if err: + return err - if not is_stream: - anthropic_payload['stream'] = False - resp, err = forward_request(url, headers, anthropic_payload) + response_data = resp.json() + response_data['model'] = ctx.client_model + return _finalize_responses_response(response_data, debug_label='原生 Responses 返回后') + + +def _handle_responses_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理原生 Responses 后端的流式返回。""" + payload['stream'] = True + converter = ResponsesStreamConverter(model=ctx.client_model) + + def generate(): + resp, err = forward_request(url, headers, payload, stream=True) if err: - return err - cc_data = messages_to_cc_response(resp.json()) - return jsonify(cc_to_responses(cc_data, display_model)) + yield responses_error_event(str(err)) + return - # 流式处理:Anthropic SSE → Responses SSE(跳过 CC 中间态) + event_count = 0 + for event_type, event_data in iter_responses_sse(resp): + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + yield from converter.process_responses_event(event_type, event_data) + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + + return sse_response(generate()) + + +def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any]): + """处理走 Anthropic 后端的 Responses 请求。""" + anthropic_payload = cc_to_messages_request(cc_payload) + _dbg( + '已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys())) + + f' 消息数={len(anthropic_payload.get("messages", []))}' + ) + + url, headers = build_anthropic_target(ctx) + + if ctx.is_stream: + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers) + + +def _handle_anthropic_non_stream( + ctx: RouteContext, + anthropic_payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 Anthropic 后端的非流式 Responses 返回。""" + anthropic_payload['stream'] = False + resp, err = forward_request(url, headers, anthropic_payload) + if err: + return err + + raw = resp.json() + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + cc_data = messages_to_cc_response(raw) + response_data = cc_to_responses(cc_data, ctx.client_model) + return _finalize_responses_response(response_data, debug_label='Messages 转回 Responses 后') + + +def _handle_anthropic_stream( + ctx: RouteContext, + anthropic_payload: dict[str, Any], + url: str, + headers: dict[str, str], +): + """处理 Anthropic 后端的流式 Responses 返回。 + + 这里直接将 Anthropic SSE 事件映射到 Responses SSE,故意跳过 CC 流式中间态, + 这样可以减少一次事件重组,降低流式转换复杂度,也更容易保留原始时序。 + """ anthropic_payload['stream'] = True - converter = ResponsesStreamConverter(model=display_model) + converter = ResponsesStreamConverter(model=ctx.client_model) def generate(): yield from converter.start_events() resp, err = forward_request(url, headers, anthropic_payload, stream=True) if err: - yield f'event: error\ndata: {json.dumps({"error": err})}\n\n' + yield responses_error_event(str(err)) return + event_count = 0 for event_type, event_data in iter_anthropic_sse(resp): - yield from converter.process_anthropic_event(event_type, event_data) + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + yield from converter.process_anthropic_event(event_type, event_data) + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') yield from converter.finalize() return sse_response(generate()) + + +def _finalize_responses_response(response_data: dict[str, Any], *, debug_label: str): + """统一收尾非流式 Responses 响应。 + + 两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象,因此这里集中 + 处理调试日志、回填展示模型名以及 usage 日志。 + """ + response_data['model'] = response_data.get('model') or '' + _dbg(debug_label + '=' + json.dumps(response_data, ensure_ascii=False, default=str)[:1000]) + log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') + return jsonify(response_data) diff --git a/settings.py b/settings.py index d7d6d61..83ac318 100644 --- a/settings.py +++ b/settings.py @@ -76,9 +76,12 @@ def resolve_model(model_name): if model_name in mappings: m = mappings[model_name] + backend = m.get('backend') + if backend in ('', None, 'auto'): + backend = _auto_detect(model_name) return { 'upstream_model': m.get('upstream_model') or model_name, - 'backend': m.get('backend') or _auto_detect(model_name), + 'backend': backend, 'target_url': m.get('target_url') or base_url, 'api_key': m.get('api_key') or base_key, } diff --git a/static/admin.css b/static/admin.css index 1844c47..9b1b197 100644 --- a/static/admin.css +++ b/static/admin.css @@ -58,6 +58,7 @@ main{padding:28px 0 60px} .tag{font-size:11px;padding:2px 8px;border-radius:4px;font-weight:500} .tag-anthropic{background:rgba(249,115,22,.15);color:#fb923c} .tag-openai{background:rgba(16,185,129,.15);color:#34d399} +.tag-responses{background:rgba(59,130,246,.15);color:#60a5fa} .tag-auto{background:rgba(139,92,246,.15);color:#a78bfa} .tag-override{background:rgba(59,130,246,.1);color:var(--primary)} .mapping-actions{margin-left:auto;display:flex;gap:6px} diff --git a/static/admin.html b/static/admin.html index 1a9c28b..fd245f0 100644 --- a/static/admin.html +++ b/static/admin.html @@ -94,11 +94,13 @@ +
anthropic:转换为 Anthropic Messages 格式 — 适用于中转站通过 /v1/messages 提供 Claude 模型
openai:保持 OpenAI Chat Completions 格式 — 适用于 GPT、DeepSeek、Codex 或通过 /v1/chat/completions 提供所有模型的中转站
+ responses:保持 OpenAI Responses 格式 — 适用于中转站仅通过 /v1/responses 提供模型能力
自动检测:根据上游模型名判断(含 claude → anthropic,其他 → openai)
diff --git a/static/admin.js b/static/admin.js index 9b7b10c..d69b1fd 100644 --- a/static/admin.js +++ b/static/admin.js @@ -124,8 +124,18 @@ async function loadMappings() { el.innerHTML = '
' + keys.map(name => { const m = mappings[name]; const backend = m.backend || 'auto'; - const tagClass = backend === 'anthropic' ? 'tag-anthropic' : backend === 'openai' ? 'tag-openai' : 'tag-auto'; - const tagLabel = backend === 'auto' ? '自动' : backend; + const tagClass = backend === 'anthropic' + ? 'tag-anthropic' + : backend === 'responses' + ? 'tag-responses' + : backend === 'openai' + ? 'tag-openai' + : 'tag-auto'; + const tagLabel = backend === 'auto' + ? '自动' + : backend === 'responses' + ? 'responses' + : backend; const hasOverride = m.target_url || m.api_key; return `
diff --git a/utils/http.py b/utils/http.py index 9793ed8..c31db5a 100644 --- a/utils/http.py +++ b/utils/http.py @@ -1,8 +1,11 @@ """HTTP 工具 - 请求头构建、上游转发、SSE 流解析、响应构建""" +from __future__ import annotations + import json -import uuid import logging +import uuid +from typing import Any, Iterator import requests from flask import Response, jsonify @@ -12,7 +15,7 @@ from config import Config logger = logging.getLogger(__name__) -def gen_id(prefix=''): +def gen_id(prefix: str = '') -> str: """生成唯一 ID""" return f'{prefix}{uuid.uuid4().hex[:24]}' @@ -20,7 +23,7 @@ def gen_id(prefix=''): # ─── 请求头构建 ──────────────────────────────────── -def build_openai_headers(api_key): +def build_openai_headers(api_key: str) -> dict[str, str]: """构建 OpenAI 兼容请求头""" return { 'Authorization': f'Bearer {api_key}', @@ -28,7 +31,7 @@ def build_openai_headers(api_key): } -def build_anthropic_headers(api_key): +def build_anthropic_headers(api_key: str) -> dict[str, str]: """构建 Anthropic 请求头,根据密钥前缀自动选择鉴权方式""" headers = { 'anthropic-version': '2023-06-01', @@ -94,7 +97,7 @@ def forward_request(url, headers, payload, stream=False): # ─── SSE 流解析 ─────────────────────────────────── -def iter_openai_sse(response): +def iter_openai_sse(response) -> Iterator[dict[str, Any] | None]: """解析 OpenAI SSE 流,yield chunk 字典;yield None 表示 [DONE]""" for line in response.iter_lines(): if not line: @@ -112,8 +115,18 @@ def iter_openai_sse(response): continue -def iter_anthropic_sse(response): +def iter_anthropic_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: """解析 Anthropic SSE 流,yield (event_type, data_dict) 元组""" + yield from _iter_event_sse(response) + + +def iter_responses_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: + """解析 OpenAI Responses SSE 流,yield (event_type, data_dict) 元组""" + yield from _iter_event_sse(response) + + +def _iter_event_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: + """解析带 event/data 的通用 SSE 流。""" event_type = '' for line in response.iter_lines(): if not line: