From a8f5ada8e1bfc86f655f4eb1398d9a22ea6d23d2 Mon Sep 17 00:00:00 2001 From: h88782481 <54714341+h88782481@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:34:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E9=80=80=E6=97=A7=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adapters/cc_anthropic_adapter.py | 260 ++++++------ adapters/cc_gemini_adapter.py | 147 +++---- adapters/helpers.py | 155 ------- adapters/openai_compat_fixer.py | 59 +-- adapters/responses_cc_adapter.py | 336 ++++++--------- adapters/unified.py | 354 ---------------- routes/chat.py | 685 ++++++++++++++++++++++++++++--- routes/common.py | 193 ++------- routes/responses.py | 609 +++++++++++++++++++++++++-- 9 files changed, 1582 insertions(+), 1216 deletions(-) delete mode 100644 adapters/helpers.py delete mode 100644 adapters/unified.py diff --git a/adapters/cc_anthropic_adapter.py b/adapters/cc_anthropic_adapter.py index b70fff0..7848d05 100644 --- a/adapters/cc_anthropic_adapter.py +++ b/adapters/cc_anthropic_adapter.py @@ -18,21 +18,13 @@ from __future__ import annotations import json from typing import Any -from adapters.helpers import ( - build_cc_message, - build_cc_response, - build_cc_tool_call, - build_cc_usage, - extract_text, - make_cc_chunk, - parse_json_safe, - stringify_content, -) from utils.http import gen_id from utils.tool_fixer import fix_anthropic_tool_use, normalize_args, repair_str_replace_args JsonDict = dict[str, Any] + +# Anthropic stop_reason → OpenAI finish_reason _STOP_REASON_MAP = { 'end_turn': 'stop', 'max_tokens': 'length', @@ -86,18 +78,23 @@ def messages_to_cc_response(data: JsonDict, request_id: str | None = None) -> Js data = fix_anthropic_tool_use(data) content_text, reasoning_text, tool_calls = _collect_response_parts(data.get('content', [])) + message = _build_cc_message(content_text, reasoning_text, tool_calls) usage = data.get('usage', {}) - return build_cc_response( - response_id=request_id, - message=build_cc_message(content_text, reasoning_text, tool_calls), - finish_reason=_STOP_REASON_MAP.get(data.get('stop_reason', 'end_turn'), 'stop'), - usage=build_cc_usage( + return { + 'id': request_id, + 'object': 'chat.completion', + 'model': data.get('model', 'claude'), + 'choices': [{ + 'index': 0, + 'message': message, + 'finish_reason': _STOP_REASON_MAP.get(data.get('stop_reason', 'end_turn'), 'stop'), + }], + 'usage': _build_cc_usage( input_tokens=usage.get('input_tokens', 0), output_tokens=usage.get('output_tokens', 0), ), - model=data.get('model', 'claude'), - ) + } # ═══════════════════════════════════════════════════════════ @@ -127,8 +124,12 @@ class AnthropicStreamConverter: self._input_tokens = 0 self._output_tokens = 0 - def process_event(self, event_type: str, event_data: JsonDict) -> list[JsonDict]: - """处理单个 Anthropic SSE 事件,返回 CC chunk dict 列表。""" + def process_event(self, event_type: str, event_data: JsonDict) -> list[str]: + """处理单个 Anthropic SSE 事件。 + + 调用方会按事件顺序不断喂入 event/data,这里根据事件类型拆成一个或多个 CC chunk + 字符串,交给上层直接作为 SSE data 发送给 Cursor。 + """ if event_type == 'message_start': return self._handle_message_start(event_data) if event_type == 'content_block_start': @@ -139,64 +140,104 @@ class AnthropicStreamConverter: return self._handle_message_delta(event_data) return [] - def _handle_message_start(self, event_data: JsonDict) -> list[JsonDict]: + def _handle_message_start(self, event_data: JsonDict) -> list[str]: + """处理消息开始事件,产出 assistant 角色起始 chunk。 + + 这个起始 chunk 很重要,因为 Cursor 侧通常会依赖首个带 role 的 chunk 来初始化 + 当前 assistant 消息。 + """ message = event_data.get('message', {}) self._input_tokens = message.get('usage', {}).get('input_tokens', 0) + chunk = self._make_chunk(delta={'role': 'assistant', 'content': ''}) if message.get('model'): chunk['model'] = message['model'] - return [chunk] + return [self._dump_chunk(chunk)] - def _handle_content_block_start(self, event_data: JsonDict) -> list[JsonDict]: + def _handle_content_block_start(self, event_data: JsonDict) -> list[str]: + """处理内容块开始事件。 + + 目前这里只需要显式处理 `tool_use`,因为文本和 thinking 的真正内容都在后续 delta + 事件里;而 tool_use 需要先开一个空 arguments 的 tool_call 槽位。 + """ block = event_data.get('content_block', {}) if block.get('type') != 'tool_use': return [] + self._tool_index += 1 - return [self._make_chunk(delta={ + return [self._dump_chunk(self._make_chunk(delta={ 'tool_calls': [{ 'index': self._tool_index, 'id': block.get('id', gen_id('toolu_')), 'type': 'function', - 'function': {'name': block.get('name', ''), 'arguments': ''}, + 'function': { + 'name': block.get('name', ''), + 'arguments': '', + }, }] - })] + }))] - def _handle_content_block_delta(self, event_data: JsonDict) -> list[JsonDict]: + def _handle_content_block_delta(self, event_data: JsonDict) -> list[str]: + """处理内容块增量事件。 + + Anthropic 会把文本、思考内容、工具参数拆成不同 delta 类型,这里要分别映射成 + OpenAI chunk 里的 `content`、`reasoning_content` 和 `tool_calls.function.arguments`。 + """ delta = event_data.get('delta', {}) delta_type = delta.get('type', '') if delta_type == 'text_delta' and delta.get('text'): - return [self._make_chunk(delta={'content': delta['text']})] + return [self._dump_chunk(self._make_chunk(delta={'content': delta['text']}))] + if delta_type == 'thinking_delta' and delta.get('thinking'): - return [self._make_chunk(delta={'reasoning_content': delta['thinking']})] + return [self._dump_chunk(self._make_chunk(delta={'reasoning_content': delta['thinking']}))] + if delta_type == 'input_json_delta' and delta.get('partial_json'): - return [self._make_chunk(delta={ + return [self._dump_chunk(self._make_chunk(delta={ 'tool_calls': [{ 'index': self._tool_index, 'function': {'arguments': delta['partial_json']}, }] - })] + }))] + return [] - def _handle_message_delta(self, event_data: JsonDict) -> list[JsonDict]: + def _handle_message_delta(self, event_data: JsonDict) -> list[str]: + """处理消息收尾事件,补出 finish_reason 和 usage。 + + 当 Anthropic 发出 `message_delta` 时,说明这一轮 assistant 输出已经收束, + 这里会统一生成最后一个带 usage 的收尾 chunk。 + """ delta = event_data.get('delta', {}) usage = event_data.get('usage', {}) self._output_tokens = usage.get('output_tokens', 0) - chunk = make_cc_chunk( - self._id, + + chunk = self._make_chunk( delta={}, finish_reason=_STOP_REASON_MAP.get(delta.get('stop_reason', ''), 'stop'), - model='claude', ) - chunk['usage'] = build_cc_usage( + chunk['usage'] = _build_cc_usage( input_tokens=self._input_tokens, output_tokens=self._output_tokens, ) - return [chunk] + return [self._dump_chunk(chunk)] def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: """构造标准 OpenAI Chat Completions chunk 对象。""" - return make_cc_chunk(self._id, delta, finish_reason, model='claude') + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': self._id, + 'object': 'chat.completion.chunk', + 'model': 'claude', + 'choices': [choice], + } + + @staticmethod + def _dump_chunk(chunk: JsonDict) -> str: + """统一序列化 chunk,方便上层直接写入 SSE data。""" + return json.dumps(chunk) # ═══════════════════════════════════════════════════════════ @@ -213,7 +254,7 @@ def _convert_request_message(message: Any) -> tuple[JsonDict | None, str | None] content = message.get('content', '') if role == 'system': - return None, extract_text(content) + return None, _flatten_text(content) if role == 'tool': return _convert_tool_role_message(message), None @@ -260,7 +301,7 @@ def _append_tool_use_blocks(content: Any, tool_calls: list[Any]) -> list[JsonDic 'type': 'tool_use', 'id': tool_call.get('id', gen_id('toolu_')), 'name': function_data.get('name', ''), - 'input': parse_json_safe(function_data.get('arguments', '{}')), + 'input': _parse_tool_arguments(function_data.get('arguments', '{}')), }) return blocks @@ -331,12 +372,37 @@ def _convert_tool_use_block(block: JsonDict, *, index: int) -> JsonDict: else: arguments_text = str(input_data) - return build_cc_tool_call( - call_id=block.get('id', gen_id('toolu_')), - name=tool_name, - arguments=arguments_text, - index=index, - ) + return { + 'index': index, + 'id': block.get('id', gen_id('toolu_')), + 'type': 'function', + 'function': { + 'name': tool_name, + 'arguments': arguments_text, + }, + } + + +def _build_cc_message(content_text: str, reasoning_text: str, tool_calls: list[JsonDict]) -> JsonDict: + """构造 OpenAI CC 响应中的 assistant message。""" + message: JsonDict = { + 'role': 'assistant', + 'content': content_text or None, + } + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls + return message + + +def _build_cc_usage(*, input_tokens: int, output_tokens: int) -> JsonDict: + """将 Anthropic usage 字段映射为 OpenAI usage。""" + return { + 'prompt_tokens': input_tokens, + 'completion_tokens': output_tokens, + 'total_tokens': input_tokens + output_tokens, + } # ═══════════════════════════════════════════════════════════ @@ -344,6 +410,35 @@ def _convert_tool_use_block(block: JsonDict, *, index: int) -> JsonDict: # ═══════════════════════════════════════════════════════════ +def _parse_tool_arguments(arguments: Any) -> Any: + """将 tool_call.arguments 尽量解析为对象,供 Anthropic tool_use.input 使用。 + + Anthropic 的 `tool_use.input` 天然期望对象结构;如果这里直接保留原始字符串, + 后续上游会把它当普通文本而不是工具参数对象。 + """ + if not isinstance(arguments, str): + return arguments if arguments is not None else {} + try: + return json.loads(arguments) + except json.JSONDecodeError: + return {} + + +def _flatten_text(content: Any) -> str: + """将 content 扁平化为纯文本,主要用于 system 消息上提。""" + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for part in content: + if isinstance(part, str): + parts.append(part) + elif isinstance(part, dict) and part.get('type') == 'text': + parts.append(part.get('text', '')) + return '\n'.join(parts) + return str(content) + + def _convert_content(message: JsonDict) -> Any: """将 OpenAI 消息的 content 字段转换为 Anthropic 内容格式。""" content = message.get('content', '') @@ -613,78 +708,3 @@ def _pick_window_anchor(refs: list[JsonDict], target: int) -> int | None: if 'cache_control' not in refs[i]: return i return None - - -# ═══════════════════════════════════════════════════════════ -# OutboundTransformer 实现: Anthropic Messages -# ═══════════════════════════════════════════════════════════ - - -class AnthropicOutbound: - """Anthropic Messages 后端的出站转换器。 - - 将 CC 格式转换为 Anthropic Messages 格式并处理响应。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - return cc_to_messages_request(payload) - - def build_url(self, ctx) -> str: - return f'{ctx.target_url.rstrip("/")}/v1/messages' - - def build_headers(self, ctx) -> dict[str, str]: - from utils.http import build_anthropic_headers - return build_anthropic_headers(ctx.api_key) - - def parse_response(self, raw: JsonDict) -> JsonDict: - return messages_to_cc_response(raw) - - def create_stream_processor(self) -> AnthropicStreamProcessor: - return AnthropicStreamProcessor() - - -class AnthropicStreamProcessor: - """Anthropic SSE 流式处理器。 - - 包装 iter_anthropic_sse + AnthropicStreamConverter, - 将 Anthropic 事件流转换为 CC chunk。 - """ - - def __init__(self): - self._converter = AnthropicStreamConverter() - self._input_tokens = 0 - self._output_tokens = 0 - - def iter_events(self, response) -> Iterator: - from utils.http import iter_anthropic_sse - yield from iter_anthropic_sse(response) - - def process_event(self, event: tuple) -> list[JsonDict]: - event_type, event_data = event - return self._converter.process_event(event_type, event_data) - - def extract_usage(self, event: tuple) -> JsonDict | None: - event_type, event_data = event - if event_type == 'message_start': - message_usage = event_data.get('message', {}).get('usage', {}) - if isinstance(message_usage, dict): - self._input_tokens = message_usage.get('input_tokens', 0) - return { - 'prompt_tokens': self._input_tokens, - 'completion_tokens': 0, - 'total_tokens': self._input_tokens, - } - elif event_type == 'message_delta': - delta_usage = event_data.get('usage', {}) - if isinstance(delta_usage, dict): - completion = delta_usage.get('output_tokens', 0) - self._output_tokens = completion - return { - 'prompt_tokens': self._input_tokens, - 'completion_tokens': completion, - 'total_tokens': self._input_tokens + completion, - } - return None - - def finalize(self) -> list[JsonDict]: - return [] diff --git a/adapters/cc_gemini_adapter.py b/adapters/cc_gemini_adapter.py index 60336c8..5e8aad0 100644 --- a/adapters/cc_gemini_adapter.py +++ b/adapters/cc_gemini_adapter.py @@ -8,17 +8,8 @@ from __future__ import annotations import json import logging -from typing import Any, Iterator +from typing import Any -from adapters.helpers import ( - build_cc_message, - build_cc_response, - build_cc_tool_call, - build_cc_usage, - extract_text, - make_cc_chunk, - parse_json_safe, -) from utils.http import gen_id JsonDict = dict[str, Any] @@ -47,7 +38,7 @@ def cc_to_gemini_request(payload: JsonDict) -> JsonDict: for msg in messages: role = msg.get('role', '') if role in ('system', 'developer'): - system_parts.append(extract_text(msg.get('content', ''))) + system_parts.append(_flatten_text(msg.get('content', ''))) continue converted = _convert_message(msg) if converted: @@ -93,13 +84,21 @@ def gemini_to_cc_response(data: JsonDict, request_id: str | None = None) -> Json else: finish_reason = _FINISH_REASON_MAP.get(finish, 'stop') - return build_cc_response( - response_id=request_id, - message=build_cc_message(content_text, reasoning_text, tool_calls), - finish_reason=finish_reason, - usage=_convert_usage(data.get('usageMetadata', {})), - model=data.get('modelVersion', 'gemini'), - ) + message: JsonDict = {'role': 'assistant', 'content': content_text or None} + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls + + usage = _convert_usage(data.get('usageMetadata', {})) + + return { + 'id': request_id, + 'object': 'chat.completion', + 'model': data.get('modelVersion', 'gemini'), + 'choices': [{'index': 0, 'message': message, 'finish_reason': finish_reason}], + 'usage': usage, + } # ═══════════════════════════════════════════════════════════ @@ -167,7 +166,15 @@ class GeminiStreamConverter: return results def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: - return make_cc_chunk(self._id, delta, finish_reason, model='gemini') + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': self._id, + 'object': 'chat.completion.chunk', + 'model': 'gemini', + 'choices': [choice], + } # ═══════════════════════════════════════════════════════════ @@ -187,7 +194,7 @@ def _convert_message(msg: JsonDict) -> JsonDict | None: 'parts': [{ 'functionResponse': { 'name': msg.get('name', msg.get('tool_call_id', '')), - 'response': parse_json_safe(msg.get('content', ''), fallback={'result': msg.get('content', '')} if msg.get('content', '') else {}), + 'response': _parse_json_safe(msg.get('content', '')), }, }], } @@ -214,7 +221,7 @@ def _convert_message(msg: JsonDict) -> JsonDict | None: parts.append({ 'functionCall': { 'name': func.get('name', ''), - 'args': parse_json_safe(func.get('arguments', '{}'), fallback={}), + 'args': _parse_json_safe(func.get('arguments', '{}')), }, }) @@ -297,12 +304,15 @@ def _extract_parts(parts: list[Any]) -> tuple[str, str, list[JsonDict]]: text += part['text'] elif 'functionCall' in part: fc = part['functionCall'] - tool_calls.append(build_cc_tool_call( - call_id=fc.get('id') or gen_id('call_'), - name=fc.get('name', ''), - arguments=json.dumps(fc.get('args', {}), ensure_ascii=False), - index=len(tool_calls), - )) + tool_calls.append({ + 'index': len(tool_calls), + 'id': fc.get('id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': fc.get('name', ''), + 'arguments': json.dumps(fc.get('args', {}), ensure_ascii=False), + }, + }) return text, reasoning, tool_calls @@ -312,7 +322,12 @@ def _convert_usage(meta: JsonDict) -> JsonDict: prompt = meta.get('promptTokenCount', 0) candidates = meta.get('candidatesTokenCount', 0) thoughts = meta.get('thoughtsTokenCount', 0) - return build_cc_usage(prompt, candidates + thoughts) + completion = candidates + thoughts + return { + 'prompt_tokens': prompt, + 'completion_tokens': completion, + 'total_tokens': prompt + completion, + } def _merge_same_role(contents: list[JsonDict]) -> list[JsonDict]: @@ -328,65 +343,21 @@ def _merge_same_role(contents: list[JsonDict]) -> list[JsonDict]: return merged +def _flatten_text(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + return '\n'.join( + p.get('text', '') if isinstance(p, dict) else str(p) + for p in content + ) + return str(content) -# ═══════════════════════════════════════════════════════════ -# OutboundTransformer 实现: Gemini Contents -# ═══════════════════════════════════════════════════════════ - - -class GeminiOutbound: - """Gemini Contents 后端的出站转换器。 - - 将 CC 格式转换为 Gemini generateContent 格式并处理响应。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - return cc_to_gemini_request(payload) - - def build_url(self, ctx) -> str: - base = ctx.target_url.rstrip('/') - model = ctx.upstream_model - if ctx.is_stream: - return f'{base}/v1/models/{model}:streamGenerateContent?alt=sse' - return f'{base}/v1/models/{model}:generateContent' - - def build_headers(self, ctx) -> dict[str, str]: - from utils.http import build_gemini_headers - return build_gemini_headers(ctx.api_key) - - def parse_response(self, raw: JsonDict) -> JsonDict: - return gemini_to_cc_response(raw) - - def create_stream_processor(self) -> GeminiStreamProcessor: - return GeminiStreamProcessor() - - -class GeminiStreamProcessor: - """Gemini SSE 流式处理器。 - - 包装 iter_gemini_sse + GeminiStreamConverter。 - """ - - def __init__(self): - self._converter = GeminiStreamConverter() - - def iter_events(self, response) -> Iterator: - from utils.http import iter_gemini_sse - yield from iter_gemini_sse(response) - - def process_event(self, event: JsonDict) -> list[JsonDict]: - return self._converter.process_chunk(event) - - def extract_usage(self, event: JsonDict) -> JsonDict | None: - usage_meta = event.get('usageMetadata') if isinstance(event, dict) else None - if isinstance(usage_meta, dict): - return { - 'prompt_tokens': usage_meta.get('promptTokenCount', 0), - 'completion_tokens': usage_meta.get('candidatesTokenCount', 0), - 'total_tokens': usage_meta.get('totalTokenCount', 0), - } - return None - - def finalize(self) -> list[JsonDict]: - return [] +def _parse_json_safe(text: Any) -> Any: + if not isinstance(text, str): + return text if text is not None else {} + try: + return json.loads(text) + except (json.JSONDecodeError, ValueError): + return {'result': text} if text else {} diff --git a/adapters/helpers.py b/adapters/helpers.py deleted file mode 100644 index 563902b..0000000 --- a/adapters/helpers.py +++ /dev/null @@ -1,155 +0,0 @@ -"""适配器公共辅助函数 - -收敛多个适配器都在重复实现的 CC 格式构建逻辑: -- CC 消息/Usage/Tool Call/Stream Chunk 的标准构造 -- 内容扁平化、JSON 安全解析、工具输出序列化 -""" - -from __future__ import annotations - -import json -from typing import Any - -from utils.http import gen_id - -JsonDict = dict[str, Any] - - -# ═══════════════════════════════════════════════════════════ -# CC 格式标准构造 -# ═══════════════════════════════════════════════════════════ - - -def build_cc_message( - content_text: str, - reasoning_text: str = '', - tool_calls: list[JsonDict] | None = None, -) -> JsonDict: - """构造标准的 CC assistant 消息。""" - message: JsonDict = { - 'role': 'assistant', - 'content': content_text or None, - } - if reasoning_text: - message['reasoning_content'] = reasoning_text - if tool_calls: - message['tool_calls'] = tool_calls - return message - - -def build_cc_usage(input_tokens: int, output_tokens: int) -> JsonDict: - """构造标准的 CC usage 字典。""" - return { - 'prompt_tokens': input_tokens, - 'completion_tokens': output_tokens, - 'total_tokens': input_tokens + output_tokens, - } - - -def build_cc_tool_call( - call_id: str, - name: str, - arguments: str, - *, - index: int | None = None, -) -> JsonDict: - """构造标准的 CC tool_call 结构。""" - tc: JsonDict = { - 'id': call_id or gen_id('call_'), - 'type': 'function', - 'function': { - 'name': name, - 'arguments': arguments, - }, - } - if index is not None: - tc['index'] = index - return tc - - -def make_cc_chunk( - chunk_id: str, - delta: JsonDict, - finish_reason: str | None = None, - model: str = '', -) -> JsonDict: - """构造标准的 CC 流式 chunk。""" - choice: JsonDict = {'index': 0, 'delta': delta} - if finish_reason: - choice['finish_reason'] = finish_reason - return { - 'id': chunk_id, - 'object': 'chat.completion.chunk', - 'model': model, - 'choices': [choice], - } - - -def build_cc_response( - response_id: str, - message: JsonDict, - finish_reason: str, - usage: JsonDict, - model: str = '', -) -> JsonDict: - """构造标准的 CC 非流式响应。""" - return { - 'id': response_id, - 'object': 'chat.completion', - 'model': model, - 'choices': [{ - 'index': 0, - 'message': message, - 'finish_reason': finish_reason, - }], - 'usage': usage, - } - - -# ═══════════════════════════════════════════════════════════ -# 通用文本/JSON 处理 -# ═══════════════════════════════════════════════════════════ - - -def extract_text(content: Any) -> str: - """从多种内容格式中提取并拼接纯文本。 - - 支持字符串、内容块列表(OpenAI/Anthropic/Responses 风格)。 - """ - if isinstance(content, str): - return content - if not isinstance(content, list): - return str(content) if content is not None else '' - - parts: list[str] = [] - for part in content: - if isinstance(part, str): - parts.append(part) - elif isinstance(part, dict): - part_type = part.get('type', '') - if part_type in ('text', 'output_text', 'input_text'): - parts.append(part.get('text', '')) - elif part_type == 'refusal': - parts.append(part.get('refusal', '')) - elif 'text' in part and not part_type: - parts.append(part['text']) - return '\n'.join(parts) if parts else '' - - -def parse_json_safe(text: Any, fallback: Any = None) -> Any: - """安全解析 JSON,失败时返回 fallback。""" - if not isinstance(text, str): - return text if text is not None else (fallback if fallback is not None else {}) - try: - return json.loads(text) - except (json.JSONDecodeError, ValueError): - return fallback if fallback is not None else {} - - -def stringify_content(content: Any) -> str: - """将任意内容序列化为字符串。""" - if isinstance(content, str): - return content - if content is None: - return '' - return json.dumps(content, ensure_ascii=False) diff --git a/adapters/openai_compat_fixer.py b/adapters/openai_compat_fixer.py index 50348cf..8a2d252 100644 --- a/adapters/openai_compat_fixer.py +++ b/adapters/openai_compat_fixer.py @@ -13,7 +13,7 @@ from __future__ import annotations import json import logging -from typing import Any, Iterator +from typing import Any from utils.http import gen_id from utils.think_tag import extract_from_text @@ -423,60 +423,3 @@ def _rewrite_function_call_finish_reason(choice: JsonDict) -> None: """将旧版 finish_reason=function_call 升级为 tool_calls。""" if choice.get('finish_reason') == 'function_call': choice['finish_reason'] = 'tool_calls' - - -# ═══════════════════════════════════════════════════════════ -# OutboundTransformer 实现: OpenAI Chat -# ═══════════════════════════════════════════════════════════ - - -class OpenAIChatOutbound: - """OpenAI Chat Completions 后端的出站转换器。 - - 由于 CC 本身就是 OpenAI Chat 格式,请求/响应转换主要做兼容性修复。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - return normalize_request(payload) - - def build_url(self, ctx) -> str: - return f'{ctx.target_url.rstrip("/")}/v1/chat/completions' - - def build_headers(self, ctx) -> dict[str, str]: - from utils.http import build_openai_headers - return build_openai_headers(ctx.api_key) - - def parse_response(self, raw: JsonDict) -> JsonDict: - return fix_response(raw) - - def create_stream_processor(self) -> OpenAIChatStreamProcessor: - return OpenAIChatStreamProcessor() - - -class OpenAIChatStreamProcessor: - """OpenAI Chat SSE 流式处理器。 - - 包装 iter_openai_sse + fix_stream_chunk + ThinkTagExtractor。 - """ - - def __init__(self): - from utils.think_tag import ThinkTagExtractor - self._think_extractor = ThinkTagExtractor() - - def iter_events(self, response) -> Iterator: - from utils.http import iter_openai_sse - for chunk in iter_openai_sse(response): - if chunk is None: - return - yield chunk - - def process_event(self, event: JsonDict) -> list[JsonDict]: - chunk = fix_stream_chunk(event) - return list(self._think_extractor.process_chunk(chunk)) - - def extract_usage(self, event: JsonDict) -> JsonDict | None: - return event.get('usage') - - def finalize(self) -> list[JsonDict]: - close_chunk = self._think_extractor.finalize() - return [close_chunk] if close_chunk else [] diff --git a/adapters/responses_cc_adapter.py b/adapters/responses_cc_adapter.py index 68acedc..e6c864a 100644 --- a/adapters/responses_cc_adapter.py +++ b/adapters/responses_cc_adapter.py @@ -15,18 +15,8 @@ from __future__ import annotations import json from dataclasses import dataclass -from typing import Any, Iterator +from typing import Any -from adapters.helpers import ( - build_cc_message, - build_cc_response, - build_cc_tool_call, - build_cc_usage, - extract_text, - make_cc_chunk, - stringify_content, -) -from adapters.unified import UnifiedUsage from utils.http import gen_id JsonDict = dict[str, Any] @@ -95,7 +85,7 @@ def cc_to_responses(cc_resp: JsonDict, model: str = '') -> JsonDict: 'status': _response_status_from_finish_reason(finish_reason), 'model': model or cc_resp.get('model', ''), 'output': _build_responses_output(message), - 'usage': UnifiedUsage.from_cc_dict(cc_resp.get('usage', {})).to_responses_dict(), + 'usage': _build_responses_usage(cc_resp.get('usage', {})), } @@ -104,18 +94,31 @@ def responses_to_cc_response(response_data: JsonDict, model: str = '') -> JsonDi output_items = response_data.get('output', []) content_text, reasoning_text, tool_calls = _collect_cc_parts_from_responses_output(output_items) finish_reason = _cc_finish_reason_from_responses(response_data, tool_calls) - usage = response_data.get('usage', {}) + message = { + 'role': 'assistant', + 'content': content_text or None, + } + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls - return build_cc_response( - response_id=response_data.get('id', gen_id('chatcmpl-')), - message=build_cc_message(content_text, reasoning_text, tool_calls), - finish_reason=finish_reason, - usage=build_cc_usage( - input_tokens=usage.get('input_tokens', 0), - output_tokens=usage.get('output_tokens', 0), - ), - model=model or response_data.get('model', ''), - ) + usage = response_data.get('usage', {}) + return { + 'id': response_data.get('id', gen_id('chatcmpl-')), + 'object': 'chat.completion', + 'model': model or response_data.get('model', ''), + 'choices': [{ + 'index': 0, + 'message': message, + 'finish_reason': finish_reason, + }], + 'usage': { + 'prompt_tokens': usage.get('input_tokens', 0), + 'completion_tokens': usage.get('output_tokens', 0), + 'total_tokens': usage.get('total_tokens', 0), + }, + } # ═══════════════════════════════════════════════════════════ @@ -655,7 +658,15 @@ class ResponsesToCCStreamConverter: def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: """构造标准 Chat Completions chunk。""" - return make_cc_chunk(self._id, delta, finish_reason, model=self._model) + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': self._id, + 'object': 'chat.completion.chunk', + 'model': self._model, + 'choices': [choice], + } # ═══════════════════════════════════════════════════════════ @@ -704,7 +715,7 @@ def _append_responses_input_item( content = message.get('content') if role == 'system': - text = extract_text(content) + text = _content_to_text(content) if text: instructions.append(text) return @@ -713,11 +724,11 @@ def _append_responses_input_item( input_items.append({ 'type': 'function_call_output', 'call_id': message.get('tool_call_id', ''), - 'output': stringify_content(content), + 'output': _stringify_output(content), }) return - text = extract_text(content) + text = _content_to_text(content) has_tool_calls = bool(message.get('tool_calls')) if role == 'assistant' and has_tool_calls: @@ -760,7 +771,7 @@ def _convert_input_items(items: list[Any], messages: list[JsonDict]) -> None: if role and not item_type: msg: JsonDict = { 'role': role, - 'content': extract_text(item.get('content', '')), + 'content': _normalize_simple_content(item.get('content', '')), } if role == 'assistant' and pending_reasoning: msg['reasoning_content'] = pending_reasoning @@ -799,7 +810,7 @@ def _append_message_item(items: list[Any], *, start: int, messages: list[JsonDic """将一个 message 项及其后续连续 function_call 项合并成一条消息。""" item = items[start] role = item.get('role', 'assistant') - content = extract_text(item.get('content', [])) + content = _extract_text(item.get('content', [])) message: JsonDict = {'role': role, 'content': content or ''} if role == 'assistant': @@ -817,11 +828,7 @@ def _append_message_item(items: list[Any], *, start: int, messages: list[JsonDic def _append_function_call_item(item: JsonDict, messages: list[JsonDict]) -> None: """将独立的 Responses `function_call` 项挂接到最近的 assistant 消息上。""" - tool_call = build_cc_tool_call( - call_id=item.get('call_id') or gen_id('call_'), - name=item.get('name', ''), - arguments=item.get('arguments', '{}'), - ) + tool_call = _build_cc_tool_call(item) if messages and messages[-1]['role'] == 'assistant': messages[-1].setdefault('tool_calls', []).append(tool_call) @@ -844,6 +851,12 @@ def _convert_function_call_output_item(item: JsonDict) -> JsonDict: } +def _normalize_simple_content(content: Any) -> str: + """将简单 content 载荷规范化为纯文本字符串。""" + if isinstance(content, list): + return _extract_text(content) or '' + return str(content) if content is not None else '' + def _collect_function_calls(items: list[Any], start: int) -> tuple[list[JsonDict], int]: """收集从指定位置开始连续出现的 `function_call` 项。""" @@ -852,17 +865,24 @@ def _collect_function_calls(items: list[Any], start: int) -> tuple[list[JsonDict while index < len(items): next_item = items[index] if isinstance(next_item, dict) and next_item.get('type') == 'function_call': - tool_calls.append(build_cc_tool_call( - call_id=next_item.get('call_id') or gen_id('call_'), - name=next_item.get('name', ''), - arguments=next_item.get('arguments', '{}'), - )) + tool_calls.append(_build_cc_tool_call(next_item)) index += 1 else: break return tool_calls, index - start +def _build_cc_tool_call(item: JsonDict) -> JsonDict: + """将单个 Responses `function_call` 项转换为 CC `tool_call` 结构。""" + return { + 'id': item.get('call_id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': item.get('name', ''), + 'arguments': item.get('arguments', '{}'), + }, + } + # ═══════════════════════════════════════════════════════════ # 非流式响应转换辅助 @@ -916,6 +936,14 @@ def _make_function_call_output_item(tool_call: JsonDict) -> JsonDict: } +def _build_responses_usage(usage: JsonDict) -> JsonDict: + """将 Chat Completions 的 usage 字段映射为 Responses usage 结构。""" + return { + 'input_tokens': usage.get('prompt_tokens', 0), + 'output_tokens': usage.get('completion_tokens', 0), + 'total_tokens': usage.get('total_tokens', 0), + } + def _collect_cc_parts_from_responses_output(output_items: Any) -> tuple[str, str, list[JsonDict]]: """从 Responses `output` 中提取文本、思考摘要和工具调用。""" @@ -931,16 +959,11 @@ def _collect_cc_parts_from_responses_output(output_items: Any) -> tuple[str, str continue item_type = item.get('type', '') if item_type == 'message': - content_text += extract_text(item.get('content', [])) + content_text += _extract_text(item.get('content', [])) elif item_type == 'reasoning': reasoning_text += _extract_reasoning_text(item) elif item_type == 'function_call': - tool_calls.append(build_cc_tool_call( - call_id=item.get('call_id') or gen_id('call_'), - name=item.get('name', ''), - arguments=item.get('arguments', '{}'), - index=len(tool_calls), - )) + tool_calls.append(_build_cc_tool_call_from_responses_output(item, index=len(tool_calls))) return content_text, reasoning_text, tool_calls @@ -957,6 +980,18 @@ def _extract_reasoning_text(item: JsonDict) -> str: return ''.join(texts) +def _build_cc_tool_call_from_responses_output(item: JsonDict, *, index: int) -> JsonDict: + """将 Responses `function_call` 输出项转换为 CC `tool_call`。""" + return { + 'index': index, + 'id': item.get('call_id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': item.get('name', ''), + 'arguments': item.get('arguments', '{}'), + }, + } + def _cc_finish_reason_from_responses(response_data: JsonDict, tool_calls: list[JsonDict]) -> str: """根据 Responses 完成状态推断聊天补全的 finish_reason。""" @@ -982,7 +1017,57 @@ def _map_anthropic_stop_reason(stop_reason: str) -> str: # ═══════════════════════════════════════════════════════════ +def _extract_text(content: Any) -> str: + """从多种内容块结构中提取并拼接纯文本。""" + if isinstance(content, str): + return content + if not isinstance(content, list): + return str(content) if content else '' + texts: list[str] = [] + for part in content: + if isinstance(part, str): + texts.append(part) + elif isinstance(part, dict): + part_type = part.get('type', '') + if part_type in ('output_text', 'input_text', 'text'): + texts.append(part.get('text', '')) + elif part_type == 'refusal': + texts.append(part.get('refusal', '')) + return '\n'.join(texts) if texts else '' + + +def _content_to_text(content: Any) -> str: + """将任意 content 载荷转换为单个字符串。""" + if isinstance(content, str): + return content + if isinstance(content, list): + return _extract_text(content) + return str(content) if content is not None else '' + + +def _content_to_responses_parts(content: Any, role: str = 'user') -> list[JsonDict]: + """将普通消息内容转换为 Responses 内容块数组。 + + assistant 消息使用 output_text,其他角色使用 input_text。 + """ + if isinstance(content, list): + text = _extract_text(content) + else: + text = _content_to_text(content) + if not text: + return [] + part_type = 'output_text' if role == 'assistant' else 'input_text' + return [{'type': part_type, 'text': text}] + + +def _stringify_output(content: Any) -> str: + """将工具输出统一序列化为字符串,便于放入 `function_call_output`。""" + if isinstance(content, str): + return content + if content is None: + return '' + return json.dumps(content, ensure_ascii=False) if not isinstance(content, str) else content def _build_responses_function_call_item(tool_call: JsonDict) -> JsonDict: @@ -996,165 +1081,6 @@ def _build_responses_function_call_item(tool_call: JsonDict) -> JsonDict: } -# ═══════════════════════════════════════════════════════════ -# OutboundTransformer 实现: Responses -# ═══════════════════════════════════════════════════════════ - - -class ResponsesOutbound: - """OpenAI Responses 后端的出站转换器。 - - 将 CC 格式转换为 Responses 格式并处理响应。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - return cc_to_responses_request(payload) - - def build_url(self, ctx) -> str: - return f'{ctx.target_url.rstrip("/")}/v1/responses' - - def build_headers(self, ctx) -> dict[str, str]: - from utils.http import build_openai_headers - return build_openai_headers(ctx.api_key) - - def parse_response(self, raw: JsonDict) -> JsonDict: - return responses_to_cc_response(raw) - - def create_stream_processor(self) -> ResponsesStreamProcessorForCC: - return ResponsesStreamProcessorForCC() - - -class ResponsesStreamProcessorForCC: - """Responses SSE → CC chunk 流式处理器。 - - 用于 /v1/chat/completions -> /v1/responses 的桥接路径。 - """ - - def __init__(self): - self._converter = ResponsesToCCStreamConverter() - - def iter_events(self, response) -> Iterator: - from utils.http import iter_responses_sse - yield from iter_responses_sse(response) - - def process_event(self, event: tuple) -> list[JsonDict]: - event_type, event_data = event - return self._converter.process_event(event_type, event_data) - - def extract_usage(self, event: tuple) -> JsonDict | None: - from adapters.unified import extract_responses_usage - event_type, event_data = event - extracted = extract_responses_usage(event_data) - if extracted: - return { - 'prompt_tokens': extracted.get('input_tokens', 0), - 'completion_tokens': extracted.get('output_tokens', 0), - 'total_tokens': extracted.get('total_tokens', 0), - } - return None - - def finalize(self) -> list[JsonDict]: - return [] - - -class ResponsesNativeOutbound: - """Responses 后端原生透传的出站转换器。 - - 当 /v1/responses → /v1/responses 时直接透传,不经过 CC 中间格式。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - return payload - - def build_url(self, ctx) -> str: - return f'{ctx.target_url.rstrip("/")}/v1/responses' - - def build_headers(self, ctx) -> dict[str, str]: - from utils.http import build_openai_headers - return build_openai_headers(ctx.api_key) - - def parse_response(self, raw: JsonDict) -> JsonDict: - return raw - - def create_stream_processor(self) -> ResponsesNativeStreamProcessor: - return ResponsesNativeStreamProcessor() - - -class ResponsesNativeStreamProcessor: - """Responses 原生 SSE 透传流式处理器。 - - 上游就是 Responses 格式,只需透传事件并做轻量模型名改写。 - 每个事件作为 SSE 字符串直接返回。 - """ - - def iter_events(self, response) -> Iterator: - from utils.http import iter_responses_sse - yield from iter_responses_sse(response) - - def process_event(self, event: tuple) -> list[JsonDict]: - event_type, event_data = event - return [{'_sse_event_type': event_type, **event_data}] - - def extract_usage(self, event: tuple) -> JsonDict | None: - from adapters.unified import extract_responses_usage - _, event_data = event - return extract_responses_usage(event_data) - - def finalize(self) -> list[JsonDict]: - return [] - - -class AnthropicOutboundForResponses: - """Anthropic 后端的出站转换器(用于 /v1/responses 路由)。 - - 流式处理直接将 Anthropic SSE → Responses SSE, - 跳过 CC 中间态以保留原始时序。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - from adapters.cc_anthropic_adapter import cc_to_messages_request - return cc_to_messages_request(payload) - - def build_url(self, ctx) -> str: - return f'{ctx.target_url.rstrip("/")}/v1/messages' - - def build_headers(self, ctx) -> dict[str, str]: - from utils.http import build_anthropic_headers - return build_anthropic_headers(ctx.api_key) - - def parse_response(self, raw: JsonDict) -> JsonDict: - from adapters.cc_anthropic_adapter import messages_to_cc_response - return messages_to_cc_response(raw) - - def create_stream_processor(self) -> AnthropicToResponsesStreamProcessor: - return AnthropicToResponsesStreamProcessor() - - -class AnthropicToResponsesStreamProcessor: - """Anthropic SSE → Responses SSE 直接转换的流式处理器。 - - 跳过 CC 中间态,直接将 Anthropic 事件映射为 Responses 事件。 - 返回的 chunk 是 SSE 字符串。 - """ - - def __init__(self): - self._converter = ResponsesStreamConverter() - - def iter_events(self, response) -> Iterator: - from utils.http import iter_anthropic_sse - yield from iter_anthropic_sse(response) - - def process_event(self, event: tuple) -> list[str]: - event_type, event_data = event - return self._converter.process_anthropic_event(event_type, event_data) - - def extract_usage(self, event: tuple) -> JsonDict | None: - return None - - def finalize(self) -> list[str]: - return self._converter.finalize() - - def _convert_cc_tools_to_responses(tools: Any) -> list[JsonDict]: """将聊天补全风格的工具定义转换为 Responses `tools` 列表。""" if not isinstance(tools, list): diff --git a/adapters/unified.py b/adapters/unified.py deleted file mode 100644 index db2e087..0000000 --- a/adapters/unified.py +++ /dev/null @@ -1,354 +0,0 @@ -"""统一中间格式与转换器接口 - -定义项目中所有 API 格式共用的中间表示和转换器协议: -- UnifiedRequest / UnifiedResponse: 统一的请求/响应数据结构 -- InboundTransformer / OutboundTransformer: 入站/出站转换器接口 -- StreamProcessor: 流式事件处理器接口 -- ClientFormatter: 客户端响应格式化接口 -""" - -from __future__ import annotations - -import json -import logging -from dataclasses import dataclass, field -from typing import Any, Iterator, Protocol - -from flask import Response, jsonify - -import settings -from utils.http import forward_request, gen_id, sse_response -from utils.request_logger import ( - append_client_event, - append_upstream_event, - attach_client_response, - attach_error, - attach_upstream_request, - attach_upstream_response, - finalize_turn, - set_stream_summary, -) -from utils.usage_tracker import usage_tracker - -logger = logging.getLogger(__name__) - -JsonDict = dict[str, Any] - - -# ═══════════════════════════════════════════════════════════ -# 统一数据模型 -# ═══════════════════════════════════════════════════════════ - - -@dataclass -class UnifiedUsage: - """标准化的令牌用量统计。""" - - input_tokens: int = 0 - output_tokens: int = 0 - total_tokens: int = 0 - - def to_cc_dict(self) -> JsonDict: - return { - 'prompt_tokens': self.input_tokens, - 'completion_tokens': self.output_tokens, - 'total_tokens': self.total_tokens, - } - - def to_responses_dict(self) -> JsonDict: - return { - 'input_tokens': self.input_tokens, - 'output_tokens': self.output_tokens, - 'total_tokens': self.total_tokens, - } - - @classmethod - def from_cc_dict(cls, d: JsonDict) -> UnifiedUsage: - return cls( - input_tokens=d.get('prompt_tokens', 0), - output_tokens=d.get('completion_tokens', 0), - total_tokens=d.get('total_tokens', 0), - ) - - @classmethod - def from_responses_dict(cls, d: JsonDict) -> UnifiedUsage: - return cls( - input_tokens=d.get('input_tokens', 0), - output_tokens=d.get('output_tokens', 0), - total_tokens=d.get('total_tokens', 0), - ) - - -# ═══════════════════════════════════════════════════════════ -# 转换器接口 -# ═══════════════════════════════════════════════════════════ - - -class OutboundTransformer(Protocol): - """出站转换器:将 CC 中间格式转换为上游后端格式。 - - 所有后端(OpenAI Chat / Responses / Anthropic / Gemini)各实现一套, - 内部复用各自现有的适配器函数。 - """ - - def build_request(self, payload: JsonDict) -> JsonDict: - """将 CC 格式请求体转换为上游格式请求体。""" - ... - - def build_url(self, ctx: Any) -> str: - """根据路由上下文构建上游请求 URL。""" - ... - - def build_headers(self, ctx: Any) -> JsonDict: - """根据路由上下文构建上游请求头。""" - ... - - def parse_response(self, raw: JsonDict) -> JsonDict: - """将上游非流式响应转换回 CC 格式。""" - ... - - def create_stream_processor(self) -> StreamProcessor: - """创建该后端对应的流式事件处理器。""" - ... - - -class StreamProcessor(Protocol): - """流式事件处理器接口。 - - 每个后端的 SSE 格式不同,StreamProcessor 封装了具体的迭代与转换逻辑, - 让通用流式处理器不必关心后端差异。 - """ - - def iter_events(self, response: Any) -> Iterator: - """从上游 HTTP 响应中迭代原始事件。""" - ... - - def process_event(self, event: Any) -> list: - """将单个上游事件转换为输出项列表。 - - 返回值通常是 list[JsonDict](CC chunk), - 但 Anthropic→Responses 路径返回 list[str](SSE 字符串)。 - """ - ... - - def extract_usage(self, event: Any) -> JsonDict | None: - """从上游事件中提取用量信息(如果有的话)。""" - ... - - def finalize(self) -> list: - """流结束时产出的收尾项。""" - ... - - -class ClientFormatter(Protocol): - """客户端响应格式化器。 - - 根据客户端期望的 API 格式(CC 或 Responses),将通用的处理结果 - 格式化为最终返回给客户端的形态。 - """ - - def format_response(self, cc_response: JsonDict, model: str) -> JsonDict: - """格式化非流式响应。""" - ... - - def wrap_stream_item(self, item: Any) -> str: - """将单个流式输出项包装为 SSE 字符串。""" - ... - - def format_error(self, message: str) -> str: - """构造流式错误消息。""" - ... - - def format_done(self) -> str | None: - """构造流结束标记(CC 返回 [DONE],Responses 返回 None)。""" - ... - - def start_events(self) -> list[str]: - """流开始前的初始事件(Responses 返回 response.created)。""" - ... - - @property - def usage_input_key(self) -> str: - """usage 中输入令牌的字段名。""" - ... - - @property - def usage_output_key(self) -> str: - """usage 中输出令牌的字段名。""" - ... - - -# ═══════════════════════════════════════════════════════════ -# 通用请求/响应处理器 -# ═══════════════════════════════════════════════════════════ - - -def _dbg(message: str) -> None: - if settings.get_debug_mode() in ('simple', 'verbose'): - logger.info('[通用调试] %s', message) - - -def extract_responses_usage(event_data: JsonDict) -> JsonDict | None: - """从原生 Responses 事件中提取 usage(公共辅助)。""" - if not isinstance(event_data, dict): - return None - usage = event_data.get('usage') - if isinstance(usage, dict): - return usage - response_obj = event_data.get('response') - if isinstance(response_obj, dict): - nested_usage = response_obj.get('usage') - if isinstance(nested_usage, dict): - return nested_usage - return None - - -def handle_non_stream( - ctx: Any, - outbound: OutboundTransformer, - client_fmt: ClientFormatter, - payload: JsonDict, - turn: JsonDict | None, -) -> Response: - """通用非流式处理器。 - - 替代 chat.py 和 responses.py 中的 8 个 _handle_xxx_non_stream 函数。 - """ - from routes.common import apply_body_modifications, apply_header_modifications, log_usage - - upstream_payload = outbound.build_request(payload) - url = outbound.build_url(ctx) - headers = outbound.build_headers(ctx) - upstream_payload = apply_body_modifications(upstream_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - upstream_payload['stream'] = False - attach_upstream_request(turn, upstream_payload, headers) - resp, err = forward_request(url, headers, upstream_payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - cc_response = outbound.parse_response(raw) - result = client_fmt.format_response(cc_response, ctx.client_model) - - _dbg('格式化后响应=' + json.dumps(result, ensure_ascii=False, default=str)[:1000]) - usage_data = result.get('usage', {}) - log_usage('通用', usage_data, input_key=client_fmt.usage_input_key, output_key=client_fmt.usage_output_key) - usage_tracker.record( - ctx.client_model, - usage_data, - input_key=client_fmt.usage_input_key, - output_key=client_fmt.usage_output_key, - ) - attach_client_response(turn, result) - finalize_turn(turn, usage=usage_data) - return jsonify(result) - - -def handle_stream( - ctx: Any, - outbound: OutboundTransformer, - client_fmt: ClientFormatter, - payload: JsonDict, - turn: JsonDict | None, -) -> Response: - """通用流式处理器。 - - 替代 chat.py 和 responses.py 中的 8 个 _handle_xxx_stream 函数。 - """ - from routes.common import apply_body_modifications, apply_header_modifications - - upstream_payload = outbound.build_request(payload) - url = outbound.build_url(ctx) - headers = outbound.build_headers(ctx) - upstream_payload = apply_body_modifications(upstream_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - upstream_payload['stream'] = True - processor = outbound.create_stream_processor() - - def generate(): - for start_evt in client_fmt.start_events(): - yield start_evt - - attach_upstream_request(turn, upstream_payload, headers) - resp, err = forward_request(url, headers, upstream_payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield client_fmt.format_error(str(err)) - return - - event_count = 0 - client_items: list[str] = [] - last_usage: JsonDict | None = None - - for event in processor.iter_events(resp): - append_upstream_event(turn, {'type': 'upstream_event', 'data': event}) - - extracted = processor.extract_usage(event) - if extracted is not None: - last_usage = extracted - - if event_count < 10: - _dbg( - f'上游事件#{event_count}=' - + json.dumps(event, ensure_ascii=False, default=str)[:500] - ) - - for chunk in processor.process_event(event): - if isinstance(chunk, dict): - chunk['model'] = ctx.client_model - wrapped = client_fmt.wrap_stream_item(chunk) - client_items.append(wrapped) - append_client_event(turn, {'type': 'stream_item', 'data': chunk}) - if event_count < 10: - _dbg( - f'返回片段#{event_count}=' - + json.dumps(chunk, ensure_ascii=False, default=str)[:500] - ) - yield wrapped - - event_count += 1 - - for chunk in processor.finalize(): - if isinstance(chunk, dict): - chunk['model'] = ctx.client_model - wrapped = client_fmt.wrap_stream_item(chunk) - client_items.append(wrapped) - append_client_event(turn, {'type': 'stream_item', 'data': chunk}) - yield wrapped - - done = client_fmt.format_done() - if done: - append_client_event(turn, {'type': 'done'}) - yield done - - _dbg(f'流式响应结束,共 {event_count} 个事件') - usage_tracker.record( - ctx.client_model, - last_usage, - input_key=client_fmt.usage_input_key, - output_key=client_fmt.usage_output_key, - ) - set_stream_summary(turn, { - 'event_count': event_count, - 'client_item_count': len(client_items), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'stream.summary', - 'model': ctx.client_model, - 'event_count': len(client_items), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) diff --git a/routes/chat.py b/routes/chat.py index 7c72413..be4f775 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -1,7 +1,8 @@ """路由: /v1/chat/completions 处理 Cursor 发来的 OpenAI Chat Completions 格式请求。 -根据模型映射的后端类型,通过统一的出站转换器转发到不同后端。 +根据模型映射的后端类型,转发到 OpenAI 兼容接口、Anthropic Messages 接口, +或原生 OpenAI Responses 接口。 """ from __future__ import annotations @@ -10,34 +11,103 @@ import json import logging from typing import Any +import settings from flask import Blueprint, jsonify, request -from adapters.openai_compat_fixer import normalize_request -from adapters.responses_cc_adapter import responses_to_cc -from adapters.unified import handle_non_stream, handle_stream -from routes.common import ( - CCClientFormatter, - build_route_context, - get_outbound, - inject_instructions_cc, - log_route_context, - should_inject_thinking, +from adapters.cc_anthropic_adapter import ( + AnthropicStreamConverter, + cc_to_messages_request, + messages_to_cc_response, ) -from utils.request_logger import start_turn +from adapters.cc_gemini_adapter import ( + GeminiStreamConverter, + cc_to_gemini_request, + gemini_to_cc_response, +) +from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request +from adapters.responses_cc_adapter import ( + ResponsesToCCStreamConverter, + cc_to_responses_request, + responses_to_cc, + responses_to_cc_response, +) +from config import Config +from routes.common import ( + RouteContext, + apply_body_modifications, + apply_header_modifications, + build_anthropic_target, + build_gemini_target, + build_openai_target, + build_responses_target, + build_route_context, + chat_error_chunk, + inject_instructions_anthropic, + inject_instructions_cc, + inject_instructions_responses, + log_route_context, + log_usage, + sse_data_message, +) +from utils.http import ( + forward_request, + gen_id, + iter_anthropic_sse, + iter_gemini_sse, + iter_openai_sse, + iter_responses_sse, + sse_response, +) +from utils.request_logger import ( + append_client_event, + append_upstream_event, + attach_client_response, + attach_error, + attach_upstream_request, + attach_upstream_response, + finalize_turn, + set_stream_summary, + start_turn, +) +from utils.think_tag import ThinkTagExtractor from utils.thinking_cache import thinking_cache +from utils.usage_tracker import usage_tracker logger = logging.getLogger(__name__) bp = Blueprint('chat', __name__) +def _dbg(message: str) -> None: + """仅在调试模式下输出详细日志。""" + if settings.get_debug_mode() in ('simple', 'verbose'): + logger.info('[聊天补全调试] %s', message) + + +def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None: + """从原生 Responses 事件中提取 usage。 + + `/v1/chat/completions -> /v1/responses` 的桥接流式路径也需要读取 usage, + 因此在本文件保留一个本地辅助函数,避免依赖其他路由模块的私有实现。 + """ + if not isinstance(event_data, dict): + return None + usage = event_data.get('usage') + if isinstance(usage, dict): + return usage + response_obj = event_data.get('response') + if isinstance(response_obj, dict): + nested_usage = response_obj.get('usage') + if isinstance(nested_usage, dict): + return nested_usage + return None + + @bp.route('/v1/chat/completions', methods=['POST']) def chat_completions(): """处理聊天补全请求并按模型映射分发到不同后端。""" original_payload = request.get_json(force=True) - payload, message_count = _normalize_chat_payload( - json.loads(json.dumps(original_payload, ensure_ascii=False, default=str)) - ) + payload, message_count = _normalize_chat_payload(json.loads(json.dumps(original_payload, ensure_ascii=False, default=str))) client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) @@ -57,39 +127,23 @@ def chat_completions(): log_route_context('聊天补全', ctx, extra=f'消息数={message_count}') _log_messages(payload) - payload['model'] = ctx.upstream_model - payload = normalize_request(payload) - if should_inject_thinking(ctx.backend): + if ctx.backend != 'responses': payload['messages'] = thinking_cache.inject(payload.get('messages', [])) - payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) - outbound = get_outbound(ctx.backend) - client_fmt = CCClientFormatter() - - if ctx.is_stream: - result = handle_stream(ctx, outbound, client_fmt, payload, turn) - else: - result = handle_non_stream(ctx, outbound, client_fmt, payload, turn) - - if not ctx.is_stream and isinstance(result, tuple): - response_data = result - elif hasattr(result, 'json'): - try: - response_data = result.get_json(silent=True) or {} - except Exception: - response_data = {} - else: - response_data = {} - - _try_cache_thinking(response_data) - return result + if ctx.backend == 'openai': + return _handle_openai_backend(ctx, payload, turn) + if ctx.backend == 'responses': + return _handle_responses_backend(ctx, payload, turn) + if ctx.backend == 'gemini': + return _handle_gemini_backend(ctx, payload, turn) + return _handle_anthropic_backend(ctx, payload, turn) def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], int]: """整理聊天补全入口的请求体。 - 当 Cursor 或调用方把 Responses 格式误发到 `/v1/chat/completions` 时, - 先降级转换成 Chat Completions,再进入统一主流程。 + 这里保留了一层兼容逻辑:当 Cursor 或调用方把 Responses 格式误发到 + `/v1/chat/completions` 时,先降级转换成 Chat Completions,再进入统一主流程。 """ message_count = len(payload.get('messages', [])) @@ -103,11 +157,548 @@ def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], in return payload, message_count -def _try_cache_thinking(response_data: dict[str, Any]) -> None: - """尝试从非流式响应中缓存思维链内容。""" - if not isinstance(response_data, dict): - return - for choice in response_data.get('choices', []): +def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any]): + """处理走 OpenAI 兼容后端的聊天补全请求。""" + _dbg( + '原始请求字段=' + str(list(payload.keys())) + ' ' + + '附加字段=' + + json.dumps( + {k: v for k, v in payload.items() if k != 'messages'}, + ensure_ascii=False, + default=str, + )[:500] + ) + + payload = normalize_request(payload, ctx.upstream_model) + payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) + _dbg( + f'标准化完成:模型={payload.get("model")} ' + f'工具数={len(payload.get("tools", []))}' + ) + + url, headers = build_openai_target(ctx) + payload = apply_body_modifications(payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_openai_stream(ctx, payload, url, headers, turn) + return _handle_openai_non_stream(ctx, payload, url, headers, turn) + + +def _handle_openai_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any], +): + """处理 OpenAI 兼容后端的非流式返回。""" + payload['stream'] = False + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = fix_response(raw) + return _finalize_chat_response(ctx, data, turn=turn, debug_label='修复后响应') + + +def _handle_openai_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any], +): + """处理 OpenAI 兼容后端的流式返回。""" + payload['stream'] = True + + def generate(): + """消费上游 OpenAI SSE,并逐段产出给 Cursor 的聊天补全流。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield chat_error_chunk(str(err)) + return + + think_extractor = ThinkTagExtractor() + chunk_count = 0 + last_usage = None + client_chunks: list[dict[str, Any]] = [] + + for chunk in iter_openai_sse(resp): + if chunk is None: + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + close_chunk = think_extractor.finalize() + if close_chunk: + client_chunks.append(close_chunk) + append_client_event(turn, {'type': 'chat_chunk', 'data': close_chunk}) + yield sse_data_message(close_chunk) + append_client_event(turn, {'type': 'done'}) + yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunk_count': len(client_chunks), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + return + + append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) + if chunk.get('usage'): + last_usage = chunk['usage'] + + if chunk_count < 10: + _dbg( + f'上游原始片段#{chunk_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) + + chunk = fix_stream_chunk(chunk) + chunk['model'] = ctx.client_model + + for out in think_extractor.process_chunk(chunk): + client_chunks.append(out) + append_client_event(turn, {'type': 'chat_chunk', 'data': out}) + if chunk_count < 10: + _dbg( + f'返回片段#{chunk_count}=' + + json.dumps(out, ensure_ascii=False, default=str)[:500] + ) + yield sse_data_message(out) + + chunk_count += 1 + + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + 'ended_without_done': True, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunk_count': len(client_chunks), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) + + +def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走原生 Responses 后端的聊天补全请求。 + + 当上游只支持 `/v1/responses` 时,需要先把聊天补全请求转换为 Responses 请求, + 返回时再转换回聊天补全协议。 + """ + responses_payload = cc_to_responses_request(payload) + responses_payload['model'] = ctx.upstream_model + responses_payload = inject_instructions_responses(responses_payload, ctx.custom_instructions, ctx.instructions_position) + _dbg( + '已转换为 Responses 请求:字段=' + str(list(responses_payload.keys())) + + f' 输入项数={len(responses_payload.get("input", []))}' + ) + + url, headers = build_responses_target(ctx) + responses_payload = apply_body_modifications(responses_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_responses_stream(ctx, responses_payload, url, headers, turn) + return _handle_responses_non_stream(ctx, responses_payload, url, headers, turn) + + +def _handle_responses_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理原生 Responses 后端的非流式返回。""" + payload['stream'] = False + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = responses_to_cc_response(raw, ctx.client_model) + return _finalize_chat_response(ctx, data, turn=turn, debug_label='Responses 转回聊天补全后') + + +def _handle_responses_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理原生 Responses 后端的流式返回。""" + payload['stream'] = True + converter = ResponsesToCCStreamConverter(model=ctx.client_model) + + def generate(): + """消费上游 Responses 事件,并实时转换成聊天补全 chunk。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield chat_error_chunk(str(err)) + return + + event_count = 0 + client_chunks: list[Any] = [] + last_usage: dict[str, Any] | None = None + for event_type, event_data in iter_responses_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) + extracted_usage = _extract_responses_usage(event_data) + if extracted_usage: + last_usage = { + 'prompt_tokens': extracted_usage.get('input_tokens', 0), + 'completion_tokens': extracted_usage.get('output_tokens', 0), + 'total_tokens': extracted_usage.get('total_tokens', 0), + } + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + + for chunk in converter.process_event(event_type, event_data): + client_chunks.append(chunk) + append_client_event(turn, {'type': 'chat_chunk', 'data': chunk}) + if isinstance(chunk, dict) and isinstance(chunk.get('usage'), dict): + last_usage = chunk['usage'] + if event_count < 10: + _dbg( + f'返回片段#{event_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) + yield sse_data_message(chunk) + + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + append_client_event(turn, {'type': 'done'}) + yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunk_count': len(client_chunks), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) + + +def _handle_gemini_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走 Gemini Contents 后端的聊天补全请求。""" + payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) + gemini_payload = cc_to_gemini_request(payload) + _dbg( + '已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys())) + + f' 内容数={len(gemini_payload.get("contents", []))}' + ) + + url, headers = build_gemini_target(ctx, stream=ctx.is_stream) + gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) + return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) + + +def _handle_gemini_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的非流式返回。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = gemini_to_cc_response(raw) + return _finalize_chat_response(ctx, data, turn=turn, debug_label='Gemini 转回聊天补全后') + + +def _handle_gemini_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的流式返回。""" + converter = GeminiStreamConverter() + + def generate(): + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield chat_error_chunk(str(err)) + return + + chunk_count = 0 + client_chunks: list[Any] = [] + last_usage: dict[str, Any] | None = None + for gemini_chunk in iter_gemini_sse(resp): + append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) + usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None + if isinstance(usage_meta, dict): + last_usage = { + 'prompt_tokens': usage_meta.get('promptTokenCount', 0), + 'completion_tokens': usage_meta.get('candidatesTokenCount', 0), + 'total_tokens': usage_meta.get('totalTokenCount', 0), + } + if chunk_count < 10: + _dbg( + f'上游 Gemini 片段#{chunk_count}=' + + json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500] + ) + + for cc_chunk in converter.process_chunk(gemini_chunk): + cc_chunk['model'] = ctx.client_model + client_chunks.append(cc_chunk) + append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk}) + if isinstance(cc_chunk, dict) and isinstance(cc_chunk.get('usage'), dict): + last_usage = cc_chunk['usage'] + if chunk_count < 10: + _dbg( + f'返回片段#{chunk_count}=' + + json.dumps(cc_chunk, ensure_ascii=False, default=str)[:500] + ) + yield sse_data_message(cc_chunk) + + chunk_count += 1 + + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + append_client_event(turn, {'type': 'done'}) + yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunk_count': len(client_chunks), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) + + +def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走 Anthropic Messages 后端的聊天补全请求。""" + payload['model'] = ctx.upstream_model + anthropic_payload = cc_to_messages_request(payload) + anthropic_payload = inject_instructions_anthropic(anthropic_payload, ctx.custom_instructions, ctx.instructions_position) + _dbg( + '已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys())) + + f' 消息数={len(anthropic_payload.get("messages", []))}' + ) + + url, headers = build_anthropic_target(ctx) + anthropic_payload = apply_body_modifications(anthropic_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) + + +def _handle_anthropic_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Anthropic 后端的非流式返回。""" + payload['stream'] = False + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = messages_to_cc_response(raw) + return _finalize_chat_response(ctx, data, turn=turn, debug_label='Messages 转回聊天补全后') + + +def _handle_anthropic_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Anthropic 后端的流式返回。 + + 这里仍然保留独立的事件级转换器,而不是先落成完整响应再回放, + 是为了尽量保持 Cursor 端的流式体验和工具调用时序。 + """ + payload['stream'] = True + converter = AnthropicStreamConverter() + + def generate(): + """消费上游 Anthropic 事件流,并逐步映射为聊天补全 SSE。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield chat_error_chunk(str(err)) + return + + event_count = 0 + client_chunks: list[Any] = [] + last_usage: dict[str, Any] | None = None + for event_type, event_data in iter_anthropic_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) + if event_type == 'message_start': + message_usage = event_data.get('message', {}).get('usage', {}) + if isinstance(message_usage, dict): + last_usage = { + 'prompt_tokens': message_usage.get('input_tokens', 0), + 'completion_tokens': 0, + 'total_tokens': message_usage.get('input_tokens', 0), + } + elif event_type == 'message_delta': + delta_usage = event_data.get('usage', {}) + if isinstance(delta_usage, dict): + prompt_tokens = 0 + if isinstance(last_usage, dict): + prompt_tokens = last_usage.get('prompt_tokens', 0) + completion_tokens = delta_usage.get('output_tokens', 0) + last_usage = { + 'prompt_tokens': prompt_tokens, + 'completion_tokens': completion_tokens, + 'total_tokens': prompt_tokens + completion_tokens, + } + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + + for chunk_str in converter.process_event(event_type, event_data): + try: + chunk_obj = json.loads(chunk_str) + chunk_obj['model'] = ctx.client_model + if isinstance(chunk_obj.get('usage'), dict): + last_usage = chunk_obj['usage'] + chunk_str = json.dumps(chunk_obj, ensure_ascii=False) + except (json.JSONDecodeError, TypeError): + pass + + client_chunks.append(chunk_str) + append_client_event(turn, {'type': 'chat_chunk', 'data': chunk_str}) + if event_count < 10: + _dbg(f'返回片段#{event_count}={chunk_str[:500]}') + yield sse_data_message(chunk_str) + + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + append_client_event(turn, {'type': 'done'}) + yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunk_count': len(client_chunks), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) + + +def _finalize_chat_response( + ctx: RouteContext, + data: dict[str, Any], + *, + turn: dict[str, Any] | None, + debug_label: str, +): + """统一收尾非流式聊天补全响应。 + + 三条后端链路最终都会回到 Chat Completions 格式,因此这里集中做: + - 回填给 Cursor 展示的模型名 + - 输出统一调试日志 + - 输出统一令牌统计日志 + """ + data['model'] = ctx.client_model + _dbg(debug_label + '=' + json.dumps(data, ensure_ascii=False, default=str)[:1000]) + log_usage('聊天补全', data.get('usage', {}), input_key='prompt_tokens', output_key='completion_tokens') + + usage_tracker.record(ctx.client_model, data.get('usage')) + attach_client_response(turn, data) + finalize_turn(turn, usage=data.get('usage')) + + for choice in data.get('choices', []): msg = choice.get('message', {}) if msg.get('reasoning_content'): thinking_cache.store_from_response( @@ -116,6 +707,8 @@ def _try_cache_thinking(response_data: dict[str, Any]) -> None: ) break + return jsonify(data) + def _log_messages(payload: dict[str, Any]) -> None: """记录消息摘要,方便排查请求形态是否符合预期。""" diff --git a/routes/common.py b/routes/common.py index 654900e..0ad7518 100644 --- a/routes/common.py +++ b/routes/common.py @@ -12,6 +12,7 @@ import logging from typing import Any import settings +from utils.http import build_anthropic_headers, build_gemini_headers, build_openai_headers logger = logging.getLogger(__name__) @@ -54,6 +55,42 @@ def build_route_context(client_model: str, is_stream: bool) -> RouteContext: ) +def build_openai_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 OpenAI 兼容后端的地址和请求头。""" + url = f'{ctx.target_url.rstrip("/")}/v1/chat/completions' + headers = build_openai_headers(ctx.api_key) + return url, headers + + +def build_responses_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 OpenAI Responses 后端的地址和请求头。""" + url = f'{ctx.target_url.rstrip("/")}/v1/responses' + headers = build_openai_headers(ctx.api_key) + return url, headers + + +def build_anthropic_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 Anthropic 后端的地址和请求头。""" + url = f'{ctx.target_url.rstrip("/")}/v1/messages' + headers = build_anthropic_headers(ctx.api_key) + return url, headers + + +def build_gemini_target(ctx: RouteContext, stream: bool = False) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 Gemini 后端的地址和请求头。 + + Gemini URL 格式: {base}/v1/models/{model}:generateContent + 流式: {base}/v1/models/{model}:streamGenerateContent?alt=sse + """ + base = ctx.target_url.rstrip('/') + model = ctx.upstream_model + if stream: + url = f'{base}/v1/models/{model}:streamGenerateContent?alt=sse' + else: + url = f'{base}/v1/models/{model}:generateContent' + headers = build_gemini_headers(ctx.api_key) + return url, headers + def log_route_context(route_name: str, ctx: RouteContext, *, extra: str = '') -> None: """统一输出路由级日志,避免不同入口的日志格式逐渐漂移。""" @@ -100,6 +137,11 @@ def sse_event_message(event_type: str, data: Any) -> str: return f'event: {event_type}\ndata: {payload}\n\n' +def chat_error_chunk(message: str, error_type: str = 'upstream_error') -> str: + """构造聊天补全流式接口使用的错误消息。""" + return sse_data_message({'error': {'message': message, 'type': error_type}}) + + def responses_error_event(message: str) -> str: """构造 Responses 流式接口使用的错误事件。""" return sse_event_message('error', {'error': message}) @@ -173,20 +215,6 @@ def inject_instructions_anthropic(payload: dict[str, Any], instructions: str, po return payload -def should_inject_thinking(backend: str) -> bool: - """判断当前后端是否需要注入历史 thinking。 - - 仅对明确能消费历史 reasoning/thinking 的后端启用: - - anthropic - - gemini - - responses - - OpenAI Chat 兼容后端通常不接受 `reasoning_content` 历史字段, - 若注入会导致上游报错,因此显式排除。 - """ - return backend in ('anthropic', 'gemini', 'responses') - - # ─── Body / Header 修改 ────────────────────────── @@ -220,140 +248,3 @@ def apply_header_modifications(headers: dict[str, str], modifications: dict[str, headers[key] = str(value) logger.info('已应用 header_modifications: %s', list(modifications.keys())) return headers - - -# ═══════════════════════════════════════════════════════════ -# 后端注册表 + ClientFormatter 实现 -# ═══════════════════════════════════════════════════════════ - - -def get_outbound(backend: str): - """根据后端类型获取对应的 OutboundTransformer 实例。""" - from adapters.cc_anthropic_adapter import AnthropicOutbound - from adapters.cc_gemini_adapter import GeminiOutbound - from adapters.openai_compat_fixer import OpenAIChatOutbound - from adapters.responses_cc_adapter import ResponsesOutbound - - registry = { - 'openai': OpenAIChatOutbound, - 'anthropic': AnthropicOutbound, - 'gemini': GeminiOutbound, - 'responses': ResponsesOutbound, - } - cls = registry.get(backend, OpenAIChatOutbound) - return cls() - - -class CCClientFormatter: - """Chat Completions 客户端格式化器。 - - 将通用处理结果格式化为 OpenAI Chat Completions 格式, - 供 /v1/chat/completions 端点使用。 - """ - - def format_response(self, cc_response: dict[str, Any], model: str) -> dict[str, Any]: - cc_response['model'] = model - return cc_response - - def wrap_stream_item(self, item: Any) -> str: - payload = item if isinstance(item, str) else json.dumps(item, ensure_ascii=False) - return f'data: {payload}\n\n' - - def format_error(self, message: str) -> str: - return sse_data_message({'error': {'message': message, 'type': 'upstream_error'}}) - - def format_done(self) -> str | None: - return sse_data_message('[DONE]') - - def start_events(self) -> list[str]: - return [] - - @property - def usage_input_key(self) -> str: - return 'prompt_tokens' - - @property - def usage_output_key(self) -> str: - return 'completion_tokens' - - -class ResponsesClientFormatter: - """Responses API 客户端格式化器。 - - 将通用处理结果格式化为 OpenAI Responses 格式, - 供 /v1/responses 端点使用。 - - 流式场景使用 ResponsesStreamConverter 做 CC chunk → Responses SSE 转换。 - """ - - def __init__(self, model: str = ''): - from adapters.responses_cc_adapter import ResponsesStreamConverter, cc_to_responses - self._model = model - self._converter = ResponsesStreamConverter(model=model) - self._cc_to_responses = cc_to_responses - - def format_response(self, cc_response: dict[str, Any], model: str) -> dict[str, Any]: - return self._cc_to_responses(cc_response, model) - - def wrap_stream_item(self, item: Any) -> str: - if isinstance(item, str): - return item - events = self._converter.process_cc_chunk(item) - return ''.join(events) - - def format_error(self, message: str) -> str: - return responses_error_event(message) - - def format_done(self) -> str | None: - events = self._converter.finalize() - return ''.join(events) if events else None - - def start_events(self) -> list[str]: - return self._converter.start_events() - - @property - def usage_input_key(self) -> str: - return 'input_tokens' - - @property - def usage_output_key(self) -> str: - return 'output_tokens' - - -class ResponsesPassthroughFormatter: - """Responses 透传格式化器。 - - 当后端本身就是 Responses 格式时使用,做轻量模型名改写。 - """ - - def __init__(self, model: str = ''): - self._model = model - - def format_response(self, response_data: dict[str, Any], model: str) -> dict[str, Any]: - response_data['model'] = model - return response_data - - def wrap_stream_item(self, item: Any) -> str: - if isinstance(item, str): - return item - event_type = item.pop('_sse_event_type', None) - if event_type: - return f'event: {event_type}\ndata: {json.dumps(item, ensure_ascii=False)}\n\n' - return f'data: {json.dumps(item, ensure_ascii=False)}\n\n' - - def format_error(self, message: str) -> str: - return responses_error_event(message) - - def format_done(self) -> str | None: - return None - - def start_events(self) -> list[str]: - return [] - - @property - def usage_input_key(self) -> str: - return 'input_tokens' - - @property - def usage_output_key(self) -> str: - return 'output_tokens' diff --git a/routes/responses.py b/routes/responses.py index ce1de4b..4889a40 100644 --- a/routes/responses.py +++ b/routes/responses.py @@ -1,7 +1,7 @@ """路由: /v1/responses 处理 Cursor 对 GPT、Claude-Opus 等模型发出的 Responses API 请求。 -请求先转换为 Chat Completions 中间表示,再通过统一出站转换器分发。 +请求会先转换为 Chat Completions 中间表示,再按后端类型分发,最后转换回 Responses 格式。 """ from __future__ import annotations @@ -13,31 +13,62 @@ from typing import Any import settings from flask import Blueprint, jsonify, request -from adapters.openai_compat_fixer import normalize_request -from adapters.responses_cc_adapter import ( - AnthropicOutboundForResponses, - ResponsesNativeOutbound, - responses_to_cc, -) -from adapters.unified import handle_non_stream, handle_stream +from adapters.cc_anthropic_adapter import cc_to_messages_request, messages_to_cc_response +from adapters.cc_gemini_adapter import GeminiStreamConverter, cc_to_gemini_request, gemini_to_cc_response +from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request +from adapters.responses_cc_adapter import ResponsesStreamConverter, cc_to_responses, responses_to_cc +from config import Config from routes.common import ( - ResponsesClientFormatter, - ResponsesPassthroughFormatter, + RouteContext, + apply_body_modifications, + apply_header_modifications, + build_anthropic_target, + build_gemini_target, + build_openai_target, + build_responses_target, build_route_context, - get_outbound, + inject_instructions_anthropic, inject_instructions_cc, inject_instructions_responses, log_route_context, - should_inject_thinking, + log_usage, + responses_error_event, ) -from utils.request_logger import start_turn +from utils.http import ( + forward_request, + gen_id, + iter_anthropic_sse, + iter_gemini_sse, + iter_openai_sse, + iter_responses_sse, + sse_response, +) +from utils.request_logger import ( + append_client_event, + append_upstream_event, + attach_client_response, + attach_error, + attach_upstream_request, + attach_upstream_response, + finalize_turn, + set_stream_summary, + start_turn, +) +from utils.think_tag import ThinkTagExtractor from utils.thinking_cache import thinking_cache +from utils.usage_tracker import usage_tracker logger = logging.getLogger(__name__) bp = Blueprint('responses', __name__) +def _dbg(message: str) -> None: + """仅在调试模式下输出详细日志。""" + if settings.get_debug_mode() in ('simple', 'verbose'): + logger.info('[响应生成调试] %s', message) + + @bp.route('/v1/responses', methods=['POST']) def responses_endpoint(): """处理 Responses 请求并按模型映射分发。""" @@ -59,43 +90,543 @@ def responses_endpoint(): ) log_route_context('响应生成', ctx) - if ctx.backend == 'responses': - return _handle_native_responses(ctx, payload, turn) - cc_payload = _build_cc_payload(payload, ctx) - if ctx.backend == 'anthropic': - outbound = AnthropicOutboundForResponses() - else: - outbound = get_outbound(ctx.backend) + if ctx.backend == 'openai': + return _handle_openai_backend(ctx, cc_payload, turn) + if ctx.backend == 'responses': + return _handle_responses_backend(ctx, payload, turn) + if ctx.backend == 'gemini': + return _handle_gemini_backend(ctx, cc_payload, turn) + return _handle_anthropic_backend(ctx, cc_payload, turn) - client_fmt = ResponsesClientFormatter(model=ctx.client_model) + +def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, Any]: + """将 Responses 请求统一降级为 Chat Completions 中间表示。 + + 这样后续无论走 OpenAI 兼容后端还是 Anthropic 后端,都能复用一套 + 中间协议,避免在路由层同时维护两套完全不同的请求编排逻辑。 + """ + cc_payload = responses_to_cc(payload) + cc_payload['model'] = ctx.upstream_model + cc_payload['messages'] = thinking_cache.inject(cc_payload.get('messages', [])) + cc_payload = inject_instructions_cc(cc_payload, ctx.custom_instructions, ctx.instructions_position) + _dbg( + '已转换为聊天补全中间表示:字段=' + str(list(cc_payload.keys())) + + f' 消息数={len(cc_payload.get("messages", []))}' + ) + return cc_payload + + +def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any]): + """处理走 OpenAI 兼容后端的 Responses 请求。""" + cc_payload = normalize_request(cc_payload) + _dbg( + f'标准化完成:模型={cc_payload.get("model")} ' + f'工具数={len(cc_payload.get("tools", []))}' + ) + + url, headers = build_openai_target(ctx) + cc_payload = apply_body_modifications(cc_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return handle_stream(ctx, outbound, client_fmt, cc_payload, turn) - return handle_non_stream(ctx, outbound, client_fmt, cc_payload, turn) + return _handle_openai_stream(ctx, cc_payload, url, headers, turn) + return _handle_openai_non_stream(ctx, cc_payload, url, headers, turn) -def _handle_native_responses(ctx, payload: dict[str, Any], turn: dict[str, Any]): - """处理走原生 Responses 后端的请求(直接透传)。""" +def _handle_openai_non_stream( + ctx: RouteContext, + cc_payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any], +): + """处理 OpenAI 兼容后端的非流式 Responses 返回。""" + cc_payload['stream'] = False + attach_upstream_request(turn, cc_payload, headers) + resp, err = forward_request(url, headers, cc_payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + fixed = fix_response(raw) + response_data = cc_to_responses(fixed, ctx.client_model) + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='转换为 Responses 后', + ) + + +def _handle_openai_stream( + ctx: RouteContext, + cc_payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 OpenAI 兼容后端的流式 Responses 返回。""" + cc_payload['stream'] = True + converter = ResponsesStreamConverter(model=ctx.client_model) + + def generate(): + """消费 OpenAI 聊天补全流,并实时改写为 Responses SSE。""" + yield from converter.start_events() + + attach_upstream_request(turn, cc_payload, headers) + resp, err = forward_request(url, headers, cc_payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield responses_error_event(str(err)) + return + + think_extractor = ThinkTagExtractor() + chunk_count = 0 + client_events: list[str] = [] + + for chunk in iter_openai_sse(resp): + if chunk is None: + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + finalized_events = converter.finalize() + for item in finalized_events: + client_events.append(item) + append_client_event(turn, {'type': 'responses_event', 'data': item}) + yield item + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_event_count': len(client_events), + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'event_count': len(client_events), + }) + finalize_turn(turn) + return + + append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) + if chunk_count < 10: + _dbg( + f'上游原始片段#{chunk_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) + + chunk = fix_stream_chunk(chunk) + for out in think_extractor.process_chunk(chunk): + for evt in converter.process_cc_chunk(out): + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + if chunk_count < 10: + _dbg( + f'转换后片段#{chunk_count}=' + + json.dumps(out, ensure_ascii=False, default=str)[:500] + ) + yield evt + + chunk_count += 1 + + return sse_response(generate()) + + +def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走原生 Responses 后端的请求。 + + 当中转站本身就只支持 `/v1/responses` 时,不需要再绕到聊天补全中间协议, + 直接转发原生 Responses 请求即可。 + """ payload = dict(payload) payload['model'] = ctx.upstream_model payload = inject_instructions_responses(payload, ctx.custom_instructions, ctx.instructions_position) - - outbound = ResponsesNativeOutbound() - client_fmt = ResponsesPassthroughFormatter(model=ctx.client_model) + url, headers = build_responses_target(ctx) + payload = apply_body_modifications(payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return handle_stream(ctx, outbound, client_fmt, payload, turn) - return handle_non_stream(ctx, outbound, client_fmt, payload, turn) + return _handle_responses_stream(ctx, payload, url, headers, turn) + return _handle_responses_non_stream(ctx, payload, url, headers, turn) -def _build_cc_payload(payload: dict[str, Any], ctx) -> dict[str, Any]: - """将 Responses 请求统一降级为 Chat Completions 中间表示。""" - cc_payload = responses_to_cc(payload) - cc_payload['model'] = ctx.upstream_model - cc_payload = normalize_request(cc_payload) - if should_inject_thinking(ctx.backend): - cc_payload['messages'] = thinking_cache.inject(cc_payload.get('messages', [])) - cc_payload = inject_instructions_cc(cc_payload, ctx.custom_instructions, ctx.instructions_position) - return cc_payload +def _handle_responses_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理原生 Responses 后端的非流式返回。""" + payload['stream'] = False + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + response_data = resp.json() + attach_upstream_response(turn, response_data) + response_data['model'] = ctx.client_model + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='原生 Responses 返回后', + ) + + +def _handle_responses_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理原生 Responses 后端的流式返回。""" + payload['stream'] = True + converter = ResponsesStreamConverter(model=ctx.client_model) + + def generate(): + """透传上游原生 Responses 流,并做轻量模型名改写。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield responses_error_event(str(err)) + return + + event_count = 0 + client_events: list[str] = [] + last_usage: dict[str, Any] | None = None + for event_type, event_data in iter_responses_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) + extracted_usage = _extract_responses_usage(event_data) + if extracted_usage: + last_usage = extracted_usage + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + produced = converter.process_responses_event(event_type, event_data) + for evt in produced: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + usage_tracker.record( + ctx.client_model, + last_usage, + input_key='input_tokens', + output_key='output_tokens', + ) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_event_count': len(client_events), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'event_count': len(client_events), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) + + +def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None: + """从原生 Responses 事件中提取 usage。 + + 原生 `/v1/responses` 流式通常会在 `response.completed` 事件里携带 usage, + 也可能直接挂在顶层 `usage` 字段。这里统一做兼容提取,供统计与日志复用。 + """ + if not isinstance(event_data, dict): + return None + usage = event_data.get('usage') + if isinstance(usage, dict): + return usage + response_obj = event_data.get('response') + if isinstance(response_obj, dict): + nested_usage = response_obj.get('usage') + if isinstance(nested_usage, dict): + return nested_usage + return None + + +def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走 Gemini Contents 后端的 Responses 请求。""" + gemini_payload = cc_to_gemini_request(cc_payload) + _dbg( + '已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys())) + + f' 内容数={len(gemini_payload.get("contents", []))}' + ) + + url, headers = build_gemini_target(ctx, stream=ctx.is_stream) + gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) + return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) + + +def _handle_gemini_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的非流式 Responses 返回。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + cc_data = gemini_to_cc_response(raw) + response_data = cc_to_responses(cc_data, ctx.client_model) + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='Gemini 转回 Responses 后', + ) + + +def _handle_gemini_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的流式 Responses 返回。""" + converter = ResponsesStreamConverter(model=ctx.client_model) + gemini_converter = GeminiStreamConverter() + + def generate(): + yield from converter.start_events() + + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield responses_error_event(str(err)) + return + + chunk_count = 0 + client_events: list[str] = [] + last_usage: dict[str, Any] | None = None + for gemini_chunk in iter_gemini_sse(resp): + append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) + usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None + if isinstance(usage_meta, dict): + last_usage = { + 'input_tokens': usage_meta.get('promptTokenCount', 0), + 'output_tokens': usage_meta.get('candidatesTokenCount', 0), + 'total_tokens': usage_meta.get('totalTokenCount', 0), + } + if chunk_count < 10: + _dbg( + f'上游 Gemini 片段#{chunk_count}=' + + json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500] + ) + + for cc_chunk in gemini_converter.process_chunk(gemini_chunk): + for evt in converter.process_cc_chunk(cc_chunk): + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + + chunk_count += 1 + + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + finalized_events = converter.finalize() + for evt in finalized_events: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + usage_tracker.record( + ctx.client_model, + last_usage, + input_key='input_tokens', + output_key='output_tokens', + ) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_event_count': len(client_events), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'event_count': len(client_events), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) + + +def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走 Anthropic 后端的 Responses 请求。""" + anthropic_payload = cc_to_messages_request(cc_payload) + _dbg( + '已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys())) + + f' 消息数={len(anthropic_payload.get("messages", []))}' + ) + + url, headers = build_anthropic_target(ctx) + anthropic_payload = apply_body_modifications(anthropic_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) + + +def _handle_anthropic_non_stream( + ctx: RouteContext, + anthropic_payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Anthropic 后端的非流式 Responses 返回。""" + anthropic_payload['stream'] = False + attach_upstream_request(turn, anthropic_payload, headers) + resp, err = forward_request(url, headers, anthropic_payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + cc_data = messages_to_cc_response(raw) + response_data = cc_to_responses(cc_data, ctx.client_model) + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='Messages 转回 Responses 后', + ) + + +def _handle_anthropic_stream( + ctx: RouteContext, + anthropic_payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Anthropic 后端的流式 Responses 返回。 + + 这里直接将 Anthropic SSE 事件映射到 Responses SSE,故意跳过 CC 流式中间态, + 这样可以减少一次事件重组,降低流式转换复杂度,也更容易保留原始时序。 + """ + anthropic_payload['stream'] = True + converter = ResponsesStreamConverter(model=ctx.client_model) + + def generate(): + """消费 Anthropic SSE,并直接映射为 Responses 事件序列。""" + yield from converter.start_events() + + attach_upstream_request(turn, anthropic_payload, headers) + resp, err = forward_request(url, headers, anthropic_payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield responses_error_event(str(err)) + return + + event_count = 0 + client_events: list[str] = [] + for event_type, event_data in iter_anthropic_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) + if event_count < 10: + _dbg( + f'上游事件#{event_count} 类型={event_type} 数据=' + + json.dumps(event_data, ensure_ascii=False, default=str)[:500] + ) + + produced = converter.process_anthropic_event(event_type, event_data) + for evt in produced: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + event_count += 1 + + _dbg(f'流式响应结束,共 {event_count} 个事件') + finalized_events = converter.finalize() + for evt in finalized_events: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_event_count': len(client_events), + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'event_count': len(client_events), + }) + finalize_turn(turn) + + return sse_response(generate()) + + +def _finalize_responses_response( + response_data: dict[str, Any], + *, + client_model: str, + turn: dict[str, Any], + debug_label: str, +): + """统一收尾非流式 Responses 响应。 + + 两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象,因此这里集中 + 处理调试日志、回填展示模型名以及 usage 日志。 + """ + response_data['model'] = response_data.get('model') or '' + _dbg(debug_label + '=' + json.dumps(response_data, ensure_ascii=False, default=str)[:1000]) + log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') + + usage_tracker.record( + client_model, + response_data.get('usage'), + input_key='input_tokens', + output_key='output_tokens', + ) + + attach_client_response(turn, response_data) + finalize_turn(turn, usage=response_data.get('usage')) + + return jsonify(response_data)