diff --git a/adapters/cc_anthropic_adapter.py b/adapters/cc_anthropic_adapter.py index 7848d05..b70fff0 100644 --- a/adapters/cc_anthropic_adapter.py +++ b/adapters/cc_anthropic_adapter.py @@ -18,13 +18,21 @@ from __future__ import annotations import json from typing import Any +from adapters.helpers import ( + build_cc_message, + build_cc_response, + build_cc_tool_call, + build_cc_usage, + extract_text, + make_cc_chunk, + parse_json_safe, + stringify_content, +) from utils.http import gen_id from utils.tool_fixer import fix_anthropic_tool_use, normalize_args, repair_str_replace_args JsonDict = dict[str, Any] - -# Anthropic stop_reason → OpenAI finish_reason _STOP_REASON_MAP = { 'end_turn': 'stop', 'max_tokens': 'length', @@ -78,23 +86,18 @@ def messages_to_cc_response(data: JsonDict, request_id: str | None = None) -> Js data = fix_anthropic_tool_use(data) content_text, reasoning_text, tool_calls = _collect_response_parts(data.get('content', [])) - message = _build_cc_message(content_text, reasoning_text, tool_calls) usage = data.get('usage', {}) - return { - 'id': request_id, - 'object': 'chat.completion', - 'model': data.get('model', 'claude'), - 'choices': [{ - 'index': 0, - 'message': message, - 'finish_reason': _STOP_REASON_MAP.get(data.get('stop_reason', 'end_turn'), 'stop'), - }], - 'usage': _build_cc_usage( + return build_cc_response( + response_id=request_id, + message=build_cc_message(content_text, reasoning_text, tool_calls), + finish_reason=_STOP_REASON_MAP.get(data.get('stop_reason', 'end_turn'), 'stop'), + usage=build_cc_usage( input_tokens=usage.get('input_tokens', 0), output_tokens=usage.get('output_tokens', 0), ), - } + model=data.get('model', 'claude'), + ) # ═══════════════════════════════════════════════════════════ @@ -124,12 +127,8 @@ class AnthropicStreamConverter: self._input_tokens = 0 self._output_tokens = 0 - def process_event(self, event_type: str, event_data: JsonDict) -> list[str]: - """处理单个 Anthropic SSE 事件。 - - 调用方会按事件顺序不断喂入 event/data,这里根据事件类型拆成一个或多个 CC chunk - 字符串,交给上层直接作为 SSE data 发送给 Cursor。 - """ + def process_event(self, event_type: str, event_data: JsonDict) -> list[JsonDict]: + """处理单个 Anthropic SSE 事件,返回 CC chunk dict 列表。""" if event_type == 'message_start': return self._handle_message_start(event_data) if event_type == 'content_block_start': @@ -140,104 +139,64 @@ class AnthropicStreamConverter: return self._handle_message_delta(event_data) return [] - def _handle_message_start(self, event_data: JsonDict) -> list[str]: - """处理消息开始事件,产出 assistant 角色起始 chunk。 - - 这个起始 chunk 很重要,因为 Cursor 侧通常会依赖首个带 role 的 chunk 来初始化 - 当前 assistant 消息。 - """ + def _handle_message_start(self, event_data: JsonDict) -> list[JsonDict]: message = event_data.get('message', {}) self._input_tokens = message.get('usage', {}).get('input_tokens', 0) - chunk = self._make_chunk(delta={'role': 'assistant', 'content': ''}) if message.get('model'): chunk['model'] = message['model'] - return [self._dump_chunk(chunk)] + return [chunk] - def _handle_content_block_start(self, event_data: JsonDict) -> list[str]: - """处理内容块开始事件。 - - 目前这里只需要显式处理 `tool_use`,因为文本和 thinking 的真正内容都在后续 delta - 事件里;而 tool_use 需要先开一个空 arguments 的 tool_call 槽位。 - """ + def _handle_content_block_start(self, event_data: JsonDict) -> list[JsonDict]: block = event_data.get('content_block', {}) if block.get('type') != 'tool_use': return [] - self._tool_index += 1 - return [self._dump_chunk(self._make_chunk(delta={ + return [self._make_chunk(delta={ 'tool_calls': [{ 'index': self._tool_index, 'id': block.get('id', gen_id('toolu_')), 'type': 'function', - 'function': { - 'name': block.get('name', ''), - 'arguments': '', - }, + 'function': {'name': block.get('name', ''), 'arguments': ''}, }] - }))] + })] - def _handle_content_block_delta(self, event_data: JsonDict) -> list[str]: - """处理内容块增量事件。 - - Anthropic 会把文本、思考内容、工具参数拆成不同 delta 类型,这里要分别映射成 - OpenAI chunk 里的 `content`、`reasoning_content` 和 `tool_calls.function.arguments`。 - """ + def _handle_content_block_delta(self, event_data: JsonDict) -> list[JsonDict]: delta = event_data.get('delta', {}) delta_type = delta.get('type', '') if delta_type == 'text_delta' and delta.get('text'): - return [self._dump_chunk(self._make_chunk(delta={'content': delta['text']}))] - + return [self._make_chunk(delta={'content': delta['text']})] if delta_type == 'thinking_delta' and delta.get('thinking'): - return [self._dump_chunk(self._make_chunk(delta={'reasoning_content': delta['thinking']}))] - + return [self._make_chunk(delta={'reasoning_content': delta['thinking']})] if delta_type == 'input_json_delta' and delta.get('partial_json'): - return [self._dump_chunk(self._make_chunk(delta={ + return [self._make_chunk(delta={ 'tool_calls': [{ 'index': self._tool_index, 'function': {'arguments': delta['partial_json']}, }] - }))] - + })] return [] - def _handle_message_delta(self, event_data: JsonDict) -> list[str]: - """处理消息收尾事件,补出 finish_reason 和 usage。 - - 当 Anthropic 发出 `message_delta` 时,说明这一轮 assistant 输出已经收束, - 这里会统一生成最后一个带 usage 的收尾 chunk。 - """ + def _handle_message_delta(self, event_data: JsonDict) -> list[JsonDict]: delta = event_data.get('delta', {}) usage = event_data.get('usage', {}) self._output_tokens = usage.get('output_tokens', 0) - - chunk = self._make_chunk( + chunk = make_cc_chunk( + self._id, delta={}, finish_reason=_STOP_REASON_MAP.get(delta.get('stop_reason', ''), 'stop'), + model='claude', ) - chunk['usage'] = _build_cc_usage( + chunk['usage'] = build_cc_usage( input_tokens=self._input_tokens, output_tokens=self._output_tokens, ) - return [self._dump_chunk(chunk)] + return [chunk] def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: """构造标准 OpenAI Chat Completions chunk 对象。""" - choice: JsonDict = {'index': 0, 'delta': delta} - if finish_reason: - choice['finish_reason'] = finish_reason - return { - 'id': self._id, - 'object': 'chat.completion.chunk', - 'model': 'claude', - 'choices': [choice], - } - - @staticmethod - def _dump_chunk(chunk: JsonDict) -> str: - """统一序列化 chunk,方便上层直接写入 SSE data。""" - return json.dumps(chunk) + return make_cc_chunk(self._id, delta, finish_reason, model='claude') # ═══════════════════════════════════════════════════════════ @@ -254,7 +213,7 @@ def _convert_request_message(message: Any) -> tuple[JsonDict | None, str | None] content = message.get('content', '') if role == 'system': - return None, _flatten_text(content) + return None, extract_text(content) if role == 'tool': return _convert_tool_role_message(message), None @@ -301,7 +260,7 @@ def _append_tool_use_blocks(content: Any, tool_calls: list[Any]) -> list[JsonDic 'type': 'tool_use', 'id': tool_call.get('id', gen_id('toolu_')), 'name': function_data.get('name', ''), - 'input': _parse_tool_arguments(function_data.get('arguments', '{}')), + 'input': parse_json_safe(function_data.get('arguments', '{}')), }) return blocks @@ -372,37 +331,12 @@ def _convert_tool_use_block(block: JsonDict, *, index: int) -> JsonDict: else: arguments_text = str(input_data) - return { - 'index': index, - 'id': block.get('id', gen_id('toolu_')), - 'type': 'function', - 'function': { - 'name': tool_name, - 'arguments': arguments_text, - }, - } - - -def _build_cc_message(content_text: str, reasoning_text: str, tool_calls: list[JsonDict]) -> JsonDict: - """构造 OpenAI CC 响应中的 assistant message。""" - message: JsonDict = { - 'role': 'assistant', - 'content': content_text or None, - } - if reasoning_text: - message['reasoning_content'] = reasoning_text - if tool_calls: - message['tool_calls'] = tool_calls - return message - - -def _build_cc_usage(*, input_tokens: int, output_tokens: int) -> JsonDict: - """将 Anthropic usage 字段映射为 OpenAI usage。""" - return { - 'prompt_tokens': input_tokens, - 'completion_tokens': output_tokens, - 'total_tokens': input_tokens + output_tokens, - } + return build_cc_tool_call( + call_id=block.get('id', gen_id('toolu_')), + name=tool_name, + arguments=arguments_text, + index=index, + ) # ═══════════════════════════════════════════════════════════ @@ -410,35 +344,6 @@ def _build_cc_usage(*, input_tokens: int, output_tokens: int) -> JsonDict: # ═══════════════════════════════════════════════════════════ -def _parse_tool_arguments(arguments: Any) -> Any: - """将 tool_call.arguments 尽量解析为对象,供 Anthropic tool_use.input 使用。 - - Anthropic 的 `tool_use.input` 天然期望对象结构;如果这里直接保留原始字符串, - 后续上游会把它当普通文本而不是工具参数对象。 - """ - if not isinstance(arguments, str): - return arguments if arguments is not None else {} - try: - return json.loads(arguments) - except json.JSONDecodeError: - return {} - - -def _flatten_text(content: Any) -> str: - """将 content 扁平化为纯文本,主要用于 system 消息上提。""" - if isinstance(content, str): - return content - if isinstance(content, list): - parts: list[str] = [] - for part in content: - if isinstance(part, str): - parts.append(part) - elif isinstance(part, dict) and part.get('type') == 'text': - parts.append(part.get('text', '')) - return '\n'.join(parts) - return str(content) - - def _convert_content(message: JsonDict) -> Any: """将 OpenAI 消息的 content 字段转换为 Anthropic 内容格式。""" content = message.get('content', '') @@ -708,3 +613,78 @@ def _pick_window_anchor(refs: list[JsonDict], target: int) -> int | None: if 'cache_control' not in refs[i]: return i return None + + +# ═══════════════════════════════════════════════════════════ +# OutboundTransformer 实现: Anthropic Messages +# ═══════════════════════════════════════════════════════════ + + +class AnthropicOutbound: + """Anthropic Messages 后端的出站转换器。 + + 将 CC 格式转换为 Anthropic Messages 格式并处理响应。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + return cc_to_messages_request(payload) + + def build_url(self, ctx) -> str: + return f'{ctx.target_url.rstrip("/")}/v1/messages' + + def build_headers(self, ctx) -> dict[str, str]: + from utils.http import build_anthropic_headers + return build_anthropic_headers(ctx.api_key) + + def parse_response(self, raw: JsonDict) -> JsonDict: + return messages_to_cc_response(raw) + + def create_stream_processor(self) -> AnthropicStreamProcessor: + return AnthropicStreamProcessor() + + +class AnthropicStreamProcessor: + """Anthropic SSE 流式处理器。 + + 包装 iter_anthropic_sse + AnthropicStreamConverter, + 将 Anthropic 事件流转换为 CC chunk。 + """ + + def __init__(self): + self._converter = AnthropicStreamConverter() + self._input_tokens = 0 + self._output_tokens = 0 + + def iter_events(self, response) -> Iterator: + from utils.http import iter_anthropic_sse + yield from iter_anthropic_sse(response) + + def process_event(self, event: tuple) -> list[JsonDict]: + event_type, event_data = event + return self._converter.process_event(event_type, event_data) + + def extract_usage(self, event: tuple) -> JsonDict | None: + event_type, event_data = event + if event_type == 'message_start': + message_usage = event_data.get('message', {}).get('usage', {}) + if isinstance(message_usage, dict): + self._input_tokens = message_usage.get('input_tokens', 0) + return { + 'prompt_tokens': self._input_tokens, + 'completion_tokens': 0, + 'total_tokens': self._input_tokens, + } + elif event_type == 'message_delta': + delta_usage = event_data.get('usage', {}) + if isinstance(delta_usage, dict): + completion = delta_usage.get('output_tokens', 0) + self._output_tokens = completion + return { + 'prompt_tokens': self._input_tokens, + 'completion_tokens': completion, + 'total_tokens': self._input_tokens + completion, + } + return None + + def finalize(self) -> list[JsonDict]: + return [] diff --git a/adapters/cc_gemini_adapter.py b/adapters/cc_gemini_adapter.py index 5e8aad0..60336c8 100644 --- a/adapters/cc_gemini_adapter.py +++ b/adapters/cc_gemini_adapter.py @@ -8,8 +8,17 @@ from __future__ import annotations import json import logging -from typing import Any +from typing import Any, Iterator +from adapters.helpers import ( + build_cc_message, + build_cc_response, + build_cc_tool_call, + build_cc_usage, + extract_text, + make_cc_chunk, + parse_json_safe, +) from utils.http import gen_id JsonDict = dict[str, Any] @@ -38,7 +47,7 @@ def cc_to_gemini_request(payload: JsonDict) -> JsonDict: for msg in messages: role = msg.get('role', '') if role in ('system', 'developer'): - system_parts.append(_flatten_text(msg.get('content', ''))) + system_parts.append(extract_text(msg.get('content', ''))) continue converted = _convert_message(msg) if converted: @@ -84,21 +93,13 @@ def gemini_to_cc_response(data: JsonDict, request_id: str | None = None) -> Json else: finish_reason = _FINISH_REASON_MAP.get(finish, 'stop') - message: JsonDict = {'role': 'assistant', 'content': content_text or None} - if reasoning_text: - message['reasoning_content'] = reasoning_text - if tool_calls: - message['tool_calls'] = tool_calls - - usage = _convert_usage(data.get('usageMetadata', {})) - - return { - 'id': request_id, - 'object': 'chat.completion', - 'model': data.get('modelVersion', 'gemini'), - 'choices': [{'index': 0, 'message': message, 'finish_reason': finish_reason}], - 'usage': usage, - } + return build_cc_response( + response_id=request_id, + message=build_cc_message(content_text, reasoning_text, tool_calls), + finish_reason=finish_reason, + usage=_convert_usage(data.get('usageMetadata', {})), + model=data.get('modelVersion', 'gemini'), + ) # ═══════════════════════════════════════════════════════════ @@ -166,15 +167,7 @@ class GeminiStreamConverter: return results def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: - choice: JsonDict = {'index': 0, 'delta': delta} - if finish_reason: - choice['finish_reason'] = finish_reason - return { - 'id': self._id, - 'object': 'chat.completion.chunk', - 'model': 'gemini', - 'choices': [choice], - } + return make_cc_chunk(self._id, delta, finish_reason, model='gemini') # ═══════════════════════════════════════════════════════════ @@ -194,7 +187,7 @@ def _convert_message(msg: JsonDict) -> JsonDict | None: 'parts': [{ 'functionResponse': { 'name': msg.get('name', msg.get('tool_call_id', '')), - 'response': _parse_json_safe(msg.get('content', '')), + 'response': parse_json_safe(msg.get('content', ''), fallback={'result': msg.get('content', '')} if msg.get('content', '') else {}), }, }], } @@ -221,7 +214,7 @@ def _convert_message(msg: JsonDict) -> JsonDict | None: parts.append({ 'functionCall': { 'name': func.get('name', ''), - 'args': _parse_json_safe(func.get('arguments', '{}')), + 'args': parse_json_safe(func.get('arguments', '{}'), fallback={}), }, }) @@ -304,15 +297,12 @@ def _extract_parts(parts: list[Any]) -> tuple[str, str, list[JsonDict]]: text += part['text'] elif 'functionCall' in part: fc = part['functionCall'] - tool_calls.append({ - 'index': len(tool_calls), - 'id': fc.get('id') or gen_id('call_'), - 'type': 'function', - 'function': { - 'name': fc.get('name', ''), - 'arguments': json.dumps(fc.get('args', {}), ensure_ascii=False), - }, - }) + tool_calls.append(build_cc_tool_call( + call_id=fc.get('id') or gen_id('call_'), + name=fc.get('name', ''), + arguments=json.dumps(fc.get('args', {}), ensure_ascii=False), + index=len(tool_calls), + )) return text, reasoning, tool_calls @@ -322,12 +312,7 @@ def _convert_usage(meta: JsonDict) -> JsonDict: prompt = meta.get('promptTokenCount', 0) candidates = meta.get('candidatesTokenCount', 0) thoughts = meta.get('thoughtsTokenCount', 0) - completion = candidates + thoughts - return { - 'prompt_tokens': prompt, - 'completion_tokens': completion, - 'total_tokens': prompt + completion, - } + return build_cc_usage(prompt, candidates + thoughts) def _merge_same_role(contents: list[JsonDict]) -> list[JsonDict]: @@ -343,21 +328,65 @@ def _merge_same_role(contents: list[JsonDict]) -> list[JsonDict]: return merged -def _flatten_text(content: Any) -> str: - if isinstance(content, str): - return content - if isinstance(content, list): - return '\n'.join( - p.get('text', '') if isinstance(p, dict) else str(p) - for p in content - ) - return str(content) -def _parse_json_safe(text: Any) -> Any: - if not isinstance(text, str): - return text if text is not None else {} - try: - return json.loads(text) - except (json.JSONDecodeError, ValueError): - return {'result': text} if text else {} +# ═══════════════════════════════════════════════════════════ +# OutboundTransformer 实现: Gemini Contents +# ═══════════════════════════════════════════════════════════ + + +class GeminiOutbound: + """Gemini Contents 后端的出站转换器。 + + 将 CC 格式转换为 Gemini generateContent 格式并处理响应。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + return cc_to_gemini_request(payload) + + def build_url(self, ctx) -> str: + base = ctx.target_url.rstrip('/') + model = ctx.upstream_model + if ctx.is_stream: + return f'{base}/v1/models/{model}:streamGenerateContent?alt=sse' + return f'{base}/v1/models/{model}:generateContent' + + def build_headers(self, ctx) -> dict[str, str]: + from utils.http import build_gemini_headers + return build_gemini_headers(ctx.api_key) + + def parse_response(self, raw: JsonDict) -> JsonDict: + return gemini_to_cc_response(raw) + + def create_stream_processor(self) -> GeminiStreamProcessor: + return GeminiStreamProcessor() + + +class GeminiStreamProcessor: + """Gemini SSE 流式处理器。 + + 包装 iter_gemini_sse + GeminiStreamConverter。 + """ + + def __init__(self): + self._converter = GeminiStreamConverter() + + def iter_events(self, response) -> Iterator: + from utils.http import iter_gemini_sse + yield from iter_gemini_sse(response) + + def process_event(self, event: JsonDict) -> list[JsonDict]: + return self._converter.process_chunk(event) + + def extract_usage(self, event: JsonDict) -> JsonDict | None: + usage_meta = event.get('usageMetadata') if isinstance(event, dict) else None + if isinstance(usage_meta, dict): + return { + 'prompt_tokens': usage_meta.get('promptTokenCount', 0), + 'completion_tokens': usage_meta.get('candidatesTokenCount', 0), + 'total_tokens': usage_meta.get('totalTokenCount', 0), + } + return None + + def finalize(self) -> list[JsonDict]: + return [] diff --git a/adapters/helpers.py b/adapters/helpers.py new file mode 100644 index 0000000..563902b --- /dev/null +++ b/adapters/helpers.py @@ -0,0 +1,155 @@ +"""适配器公共辅助函数 + +收敛多个适配器都在重复实现的 CC 格式构建逻辑: +- CC 消息/Usage/Tool Call/Stream Chunk 的标准构造 +- 内容扁平化、JSON 安全解析、工具输出序列化 +""" + +from __future__ import annotations + +import json +from typing import Any + +from utils.http import gen_id + +JsonDict = dict[str, Any] + + +# ═══════════════════════════════════════════════════════════ +# CC 格式标准构造 +# ═══════════════════════════════════════════════════════════ + + +def build_cc_message( + content_text: str, + reasoning_text: str = '', + tool_calls: list[JsonDict] | None = None, +) -> JsonDict: + """构造标准的 CC assistant 消息。""" + message: JsonDict = { + 'role': 'assistant', + 'content': content_text or None, + } + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls + return message + + +def build_cc_usage(input_tokens: int, output_tokens: int) -> JsonDict: + """构造标准的 CC usage 字典。""" + return { + 'prompt_tokens': input_tokens, + 'completion_tokens': output_tokens, + 'total_tokens': input_tokens + output_tokens, + } + + +def build_cc_tool_call( + call_id: str, + name: str, + arguments: str, + *, + index: int | None = None, +) -> JsonDict: + """构造标准的 CC tool_call 结构。""" + tc: JsonDict = { + 'id': call_id or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': name, + 'arguments': arguments, + }, + } + if index is not None: + tc['index'] = index + return tc + + +def make_cc_chunk( + chunk_id: str, + delta: JsonDict, + finish_reason: str | None = None, + model: str = '', +) -> JsonDict: + """构造标准的 CC 流式 chunk。""" + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': chunk_id, + 'object': 'chat.completion.chunk', + 'model': model, + 'choices': [choice], + } + + +def build_cc_response( + response_id: str, + message: JsonDict, + finish_reason: str, + usage: JsonDict, + model: str = '', +) -> JsonDict: + """构造标准的 CC 非流式响应。""" + return { + 'id': response_id, + 'object': 'chat.completion', + 'model': model, + 'choices': [{ + 'index': 0, + 'message': message, + 'finish_reason': finish_reason, + }], + 'usage': usage, + } + + +# ═══════════════════════════════════════════════════════════ +# 通用文本/JSON 处理 +# ═══════════════════════════════════════════════════════════ + + +def extract_text(content: Any) -> str: + """从多种内容格式中提取并拼接纯文本。 + + 支持字符串、内容块列表(OpenAI/Anthropic/Responses 风格)。 + """ + if isinstance(content, str): + return content + if not isinstance(content, list): + return str(content) if content is not None else '' + + parts: list[str] = [] + for part in content: + if isinstance(part, str): + parts.append(part) + elif isinstance(part, dict): + part_type = part.get('type', '') + if part_type in ('text', 'output_text', 'input_text'): + parts.append(part.get('text', '')) + elif part_type == 'refusal': + parts.append(part.get('refusal', '')) + elif 'text' in part and not part_type: + parts.append(part['text']) + return '\n'.join(parts) if parts else '' + + +def parse_json_safe(text: Any, fallback: Any = None) -> Any: + """安全解析 JSON,失败时返回 fallback。""" + if not isinstance(text, str): + return text if text is not None else (fallback if fallback is not None else {}) + try: + return json.loads(text) + except (json.JSONDecodeError, ValueError): + return fallback if fallback is not None else {} + + +def stringify_content(content: Any) -> str: + """将任意内容序列化为字符串。""" + if isinstance(content, str): + return content + if content is None: + return '' + return json.dumps(content, ensure_ascii=False) diff --git a/adapters/openai_compat_fixer.py b/adapters/openai_compat_fixer.py index 8a2d252..50348cf 100644 --- a/adapters/openai_compat_fixer.py +++ b/adapters/openai_compat_fixer.py @@ -13,7 +13,7 @@ from __future__ import annotations import json import logging -from typing import Any +from typing import Any, Iterator from utils.http import gen_id from utils.think_tag import extract_from_text @@ -423,3 +423,60 @@ def _rewrite_function_call_finish_reason(choice: JsonDict) -> None: """将旧版 finish_reason=function_call 升级为 tool_calls。""" if choice.get('finish_reason') == 'function_call': choice['finish_reason'] = 'tool_calls' + + +# ═══════════════════════════════════════════════════════════ +# OutboundTransformer 实现: OpenAI Chat +# ═══════════════════════════════════════════════════════════ + + +class OpenAIChatOutbound: + """OpenAI Chat Completions 后端的出站转换器。 + + 由于 CC 本身就是 OpenAI Chat 格式,请求/响应转换主要做兼容性修复。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + return normalize_request(payload) + + def build_url(self, ctx) -> str: + return f'{ctx.target_url.rstrip("/")}/v1/chat/completions' + + def build_headers(self, ctx) -> dict[str, str]: + from utils.http import build_openai_headers + return build_openai_headers(ctx.api_key) + + def parse_response(self, raw: JsonDict) -> JsonDict: + return fix_response(raw) + + def create_stream_processor(self) -> OpenAIChatStreamProcessor: + return OpenAIChatStreamProcessor() + + +class OpenAIChatStreamProcessor: + """OpenAI Chat SSE 流式处理器。 + + 包装 iter_openai_sse + fix_stream_chunk + ThinkTagExtractor。 + """ + + def __init__(self): + from utils.think_tag import ThinkTagExtractor + self._think_extractor = ThinkTagExtractor() + + def iter_events(self, response) -> Iterator: + from utils.http import iter_openai_sse + for chunk in iter_openai_sse(response): + if chunk is None: + return + yield chunk + + def process_event(self, event: JsonDict) -> list[JsonDict]: + chunk = fix_stream_chunk(event) + return list(self._think_extractor.process_chunk(chunk)) + + def extract_usage(self, event: JsonDict) -> JsonDict | None: + return event.get('usage') + + def finalize(self) -> list[JsonDict]: + close_chunk = self._think_extractor.finalize() + return [close_chunk] if close_chunk else [] diff --git a/adapters/responses_cc_adapter.py b/adapters/responses_cc_adapter.py index e6c864a..68acedc 100644 --- a/adapters/responses_cc_adapter.py +++ b/adapters/responses_cc_adapter.py @@ -15,8 +15,18 @@ from __future__ import annotations import json from dataclasses import dataclass -from typing import Any +from typing import Any, Iterator +from adapters.helpers import ( + build_cc_message, + build_cc_response, + build_cc_tool_call, + build_cc_usage, + extract_text, + make_cc_chunk, + stringify_content, +) +from adapters.unified import UnifiedUsage from utils.http import gen_id JsonDict = dict[str, Any] @@ -85,7 +95,7 @@ def cc_to_responses(cc_resp: JsonDict, model: str = '') -> JsonDict: 'status': _response_status_from_finish_reason(finish_reason), 'model': model or cc_resp.get('model', ''), 'output': _build_responses_output(message), - 'usage': _build_responses_usage(cc_resp.get('usage', {})), + 'usage': UnifiedUsage.from_cc_dict(cc_resp.get('usage', {})).to_responses_dict(), } @@ -94,31 +104,18 @@ def responses_to_cc_response(response_data: JsonDict, model: str = '') -> JsonDi output_items = response_data.get('output', []) content_text, reasoning_text, tool_calls = _collect_cc_parts_from_responses_output(output_items) finish_reason = _cc_finish_reason_from_responses(response_data, tool_calls) - message = { - 'role': 'assistant', - 'content': content_text or None, - } - if reasoning_text: - message['reasoning_content'] = reasoning_text - if tool_calls: - message['tool_calls'] = tool_calls - usage = response_data.get('usage', {}) - return { - 'id': response_data.get('id', gen_id('chatcmpl-')), - 'object': 'chat.completion', - 'model': model or response_data.get('model', ''), - 'choices': [{ - 'index': 0, - 'message': message, - 'finish_reason': finish_reason, - }], - 'usage': { - 'prompt_tokens': usage.get('input_tokens', 0), - 'completion_tokens': usage.get('output_tokens', 0), - 'total_tokens': usage.get('total_tokens', 0), - }, - } + + return build_cc_response( + response_id=response_data.get('id', gen_id('chatcmpl-')), + message=build_cc_message(content_text, reasoning_text, tool_calls), + finish_reason=finish_reason, + usage=build_cc_usage( + input_tokens=usage.get('input_tokens', 0), + output_tokens=usage.get('output_tokens', 0), + ), + model=model or response_data.get('model', ''), + ) # ═══════════════════════════════════════════════════════════ @@ -658,15 +655,7 @@ class ResponsesToCCStreamConverter: def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: """构造标准 Chat Completions chunk。""" - choice: JsonDict = {'index': 0, 'delta': delta} - if finish_reason: - choice['finish_reason'] = finish_reason - return { - 'id': self._id, - 'object': 'chat.completion.chunk', - 'model': self._model, - 'choices': [choice], - } + return make_cc_chunk(self._id, delta, finish_reason, model=self._model) # ═══════════════════════════════════════════════════════════ @@ -715,7 +704,7 @@ def _append_responses_input_item( content = message.get('content') if role == 'system': - text = _content_to_text(content) + text = extract_text(content) if text: instructions.append(text) return @@ -724,11 +713,11 @@ def _append_responses_input_item( input_items.append({ 'type': 'function_call_output', 'call_id': message.get('tool_call_id', ''), - 'output': _stringify_output(content), + 'output': stringify_content(content), }) return - text = _content_to_text(content) + text = extract_text(content) has_tool_calls = bool(message.get('tool_calls')) if role == 'assistant' and has_tool_calls: @@ -771,7 +760,7 @@ def _convert_input_items(items: list[Any], messages: list[JsonDict]) -> None: if role and not item_type: msg: JsonDict = { 'role': role, - 'content': _normalize_simple_content(item.get('content', '')), + 'content': extract_text(item.get('content', '')), } if role == 'assistant' and pending_reasoning: msg['reasoning_content'] = pending_reasoning @@ -810,7 +799,7 @@ def _append_message_item(items: list[Any], *, start: int, messages: list[JsonDic """将一个 message 项及其后续连续 function_call 项合并成一条消息。""" item = items[start] role = item.get('role', 'assistant') - content = _extract_text(item.get('content', [])) + content = extract_text(item.get('content', [])) message: JsonDict = {'role': role, 'content': content or ''} if role == 'assistant': @@ -828,7 +817,11 @@ def _append_message_item(items: list[Any], *, start: int, messages: list[JsonDic def _append_function_call_item(item: JsonDict, messages: list[JsonDict]) -> None: """将独立的 Responses `function_call` 项挂接到最近的 assistant 消息上。""" - tool_call = _build_cc_tool_call(item) + tool_call = build_cc_tool_call( + call_id=item.get('call_id') or gen_id('call_'), + name=item.get('name', ''), + arguments=item.get('arguments', '{}'), + ) if messages and messages[-1]['role'] == 'assistant': messages[-1].setdefault('tool_calls', []).append(tool_call) @@ -851,12 +844,6 @@ def _convert_function_call_output_item(item: JsonDict) -> JsonDict: } -def _normalize_simple_content(content: Any) -> str: - """将简单 content 载荷规范化为纯文本字符串。""" - if isinstance(content, list): - return _extract_text(content) or '' - return str(content) if content is not None else '' - def _collect_function_calls(items: list[Any], start: int) -> tuple[list[JsonDict], int]: """收集从指定位置开始连续出现的 `function_call` 项。""" @@ -865,24 +852,17 @@ def _collect_function_calls(items: list[Any], start: int) -> tuple[list[JsonDict while index < len(items): next_item = items[index] if isinstance(next_item, dict) and next_item.get('type') == 'function_call': - tool_calls.append(_build_cc_tool_call(next_item)) + tool_calls.append(build_cc_tool_call( + call_id=next_item.get('call_id') or gen_id('call_'), + name=next_item.get('name', ''), + arguments=next_item.get('arguments', '{}'), + )) index += 1 else: break return tool_calls, index - start -def _build_cc_tool_call(item: JsonDict) -> JsonDict: - """将单个 Responses `function_call` 项转换为 CC `tool_call` 结构。""" - return { - 'id': item.get('call_id') or gen_id('call_'), - 'type': 'function', - 'function': { - 'name': item.get('name', ''), - 'arguments': item.get('arguments', '{}'), - }, - } - # ═══════════════════════════════════════════════════════════ # 非流式响应转换辅助 @@ -936,14 +916,6 @@ def _make_function_call_output_item(tool_call: JsonDict) -> JsonDict: } -def _build_responses_usage(usage: JsonDict) -> JsonDict: - """将 Chat Completions 的 usage 字段映射为 Responses usage 结构。""" - return { - 'input_tokens': usage.get('prompt_tokens', 0), - 'output_tokens': usage.get('completion_tokens', 0), - 'total_tokens': usage.get('total_tokens', 0), - } - def _collect_cc_parts_from_responses_output(output_items: Any) -> tuple[str, str, list[JsonDict]]: """从 Responses `output` 中提取文本、思考摘要和工具调用。""" @@ -959,11 +931,16 @@ def _collect_cc_parts_from_responses_output(output_items: Any) -> tuple[str, str continue item_type = item.get('type', '') if item_type == 'message': - content_text += _extract_text(item.get('content', [])) + content_text += extract_text(item.get('content', [])) elif item_type == 'reasoning': reasoning_text += _extract_reasoning_text(item) elif item_type == 'function_call': - tool_calls.append(_build_cc_tool_call_from_responses_output(item, index=len(tool_calls))) + tool_calls.append(build_cc_tool_call( + call_id=item.get('call_id') or gen_id('call_'), + name=item.get('name', ''), + arguments=item.get('arguments', '{}'), + index=len(tool_calls), + )) return content_text, reasoning_text, tool_calls @@ -980,18 +957,6 @@ def _extract_reasoning_text(item: JsonDict) -> str: return ''.join(texts) -def _build_cc_tool_call_from_responses_output(item: JsonDict, *, index: int) -> JsonDict: - """将 Responses `function_call` 输出项转换为 CC `tool_call`。""" - return { - 'index': index, - 'id': item.get('call_id') or gen_id('call_'), - 'type': 'function', - 'function': { - 'name': item.get('name', ''), - 'arguments': item.get('arguments', '{}'), - }, - } - def _cc_finish_reason_from_responses(response_data: JsonDict, tool_calls: list[JsonDict]) -> str: """根据 Responses 完成状态推断聊天补全的 finish_reason。""" @@ -1017,57 +982,7 @@ def _map_anthropic_stop_reason(stop_reason: str) -> str: # ═══════════════════════════════════════════════════════════ -def _extract_text(content: Any) -> str: - """从多种内容块结构中提取并拼接纯文本。""" - if isinstance(content, str): - return content - if not isinstance(content, list): - return str(content) if content else '' - texts: list[str] = [] - for part in content: - if isinstance(part, str): - texts.append(part) - elif isinstance(part, dict): - part_type = part.get('type', '') - if part_type in ('output_text', 'input_text', 'text'): - texts.append(part.get('text', '')) - elif part_type == 'refusal': - texts.append(part.get('refusal', '')) - return '\n'.join(texts) if texts else '' - - -def _content_to_text(content: Any) -> str: - """将任意 content 载荷转换为单个字符串。""" - if isinstance(content, str): - return content - if isinstance(content, list): - return _extract_text(content) - return str(content) if content is not None else '' - - -def _content_to_responses_parts(content: Any, role: str = 'user') -> list[JsonDict]: - """将普通消息内容转换为 Responses 内容块数组。 - - assistant 消息使用 output_text,其他角色使用 input_text。 - """ - if isinstance(content, list): - text = _extract_text(content) - else: - text = _content_to_text(content) - if not text: - return [] - part_type = 'output_text' if role == 'assistant' else 'input_text' - return [{'type': part_type, 'text': text}] - - -def _stringify_output(content: Any) -> str: - """将工具输出统一序列化为字符串,便于放入 `function_call_output`。""" - if isinstance(content, str): - return content - if content is None: - return '' - return json.dumps(content, ensure_ascii=False) if not isinstance(content, str) else content def _build_responses_function_call_item(tool_call: JsonDict) -> JsonDict: @@ -1081,6 +996,165 @@ def _build_responses_function_call_item(tool_call: JsonDict) -> JsonDict: } +# ═══════════════════════════════════════════════════════════ +# OutboundTransformer 实现: Responses +# ═══════════════════════════════════════════════════════════ + + +class ResponsesOutbound: + """OpenAI Responses 后端的出站转换器。 + + 将 CC 格式转换为 Responses 格式并处理响应。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + return cc_to_responses_request(payload) + + def build_url(self, ctx) -> str: + return f'{ctx.target_url.rstrip("/")}/v1/responses' + + def build_headers(self, ctx) -> dict[str, str]: + from utils.http import build_openai_headers + return build_openai_headers(ctx.api_key) + + def parse_response(self, raw: JsonDict) -> JsonDict: + return responses_to_cc_response(raw) + + def create_stream_processor(self) -> ResponsesStreamProcessorForCC: + return ResponsesStreamProcessorForCC() + + +class ResponsesStreamProcessorForCC: + """Responses SSE → CC chunk 流式处理器。 + + 用于 /v1/chat/completions -> /v1/responses 的桥接路径。 + """ + + def __init__(self): + self._converter = ResponsesToCCStreamConverter() + + def iter_events(self, response) -> Iterator: + from utils.http import iter_responses_sse + yield from iter_responses_sse(response) + + def process_event(self, event: tuple) -> list[JsonDict]: + event_type, event_data = event + return self._converter.process_event(event_type, event_data) + + def extract_usage(self, event: tuple) -> JsonDict | None: + from adapters.unified import extract_responses_usage + event_type, event_data = event + extracted = extract_responses_usage(event_data) + if extracted: + return { + 'prompt_tokens': extracted.get('input_tokens', 0), + 'completion_tokens': extracted.get('output_tokens', 0), + 'total_tokens': extracted.get('total_tokens', 0), + } + return None + + def finalize(self) -> list[JsonDict]: + return [] + + +class ResponsesNativeOutbound: + """Responses 后端原生透传的出站转换器。 + + 当 /v1/responses → /v1/responses 时直接透传,不经过 CC 中间格式。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + return payload + + def build_url(self, ctx) -> str: + return f'{ctx.target_url.rstrip("/")}/v1/responses' + + def build_headers(self, ctx) -> dict[str, str]: + from utils.http import build_openai_headers + return build_openai_headers(ctx.api_key) + + def parse_response(self, raw: JsonDict) -> JsonDict: + return raw + + def create_stream_processor(self) -> ResponsesNativeStreamProcessor: + return ResponsesNativeStreamProcessor() + + +class ResponsesNativeStreamProcessor: + """Responses 原生 SSE 透传流式处理器。 + + 上游就是 Responses 格式,只需透传事件并做轻量模型名改写。 + 每个事件作为 SSE 字符串直接返回。 + """ + + def iter_events(self, response) -> Iterator: + from utils.http import iter_responses_sse + yield from iter_responses_sse(response) + + def process_event(self, event: tuple) -> list[JsonDict]: + event_type, event_data = event + return [{'_sse_event_type': event_type, **event_data}] + + def extract_usage(self, event: tuple) -> JsonDict | None: + from adapters.unified import extract_responses_usage + _, event_data = event + return extract_responses_usage(event_data) + + def finalize(self) -> list[JsonDict]: + return [] + + +class AnthropicOutboundForResponses: + """Anthropic 后端的出站转换器(用于 /v1/responses 路由)。 + + 流式处理直接将 Anthropic SSE → Responses SSE, + 跳过 CC 中间态以保留原始时序。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + from adapters.cc_anthropic_adapter import cc_to_messages_request + return cc_to_messages_request(payload) + + def build_url(self, ctx) -> str: + return f'{ctx.target_url.rstrip("/")}/v1/messages' + + def build_headers(self, ctx) -> dict[str, str]: + from utils.http import build_anthropic_headers + return build_anthropic_headers(ctx.api_key) + + def parse_response(self, raw: JsonDict) -> JsonDict: + from adapters.cc_anthropic_adapter import messages_to_cc_response + return messages_to_cc_response(raw) + + def create_stream_processor(self) -> AnthropicToResponsesStreamProcessor: + return AnthropicToResponsesStreamProcessor() + + +class AnthropicToResponsesStreamProcessor: + """Anthropic SSE → Responses SSE 直接转换的流式处理器。 + + 跳过 CC 中间态,直接将 Anthropic 事件映射为 Responses 事件。 + 返回的 chunk 是 SSE 字符串。 + """ + + def __init__(self): + self._converter = ResponsesStreamConverter() + + def iter_events(self, response) -> Iterator: + from utils.http import iter_anthropic_sse + yield from iter_anthropic_sse(response) + + def process_event(self, event: tuple) -> list[str]: + event_type, event_data = event + return self._converter.process_anthropic_event(event_type, event_data) + + def extract_usage(self, event: tuple) -> JsonDict | None: + return None + + def finalize(self) -> list[str]: + return self._converter.finalize() + + def _convert_cc_tools_to_responses(tools: Any) -> list[JsonDict]: """将聊天补全风格的工具定义转换为 Responses `tools` 列表。""" if not isinstance(tools, list): diff --git a/adapters/unified.py b/adapters/unified.py new file mode 100644 index 0000000..db2e087 --- /dev/null +++ b/adapters/unified.py @@ -0,0 +1,354 @@ +"""统一中间格式与转换器接口 + +定义项目中所有 API 格式共用的中间表示和转换器协议: +- UnifiedRequest / UnifiedResponse: 统一的请求/响应数据结构 +- InboundTransformer / OutboundTransformer: 入站/出站转换器接口 +- StreamProcessor: 流式事件处理器接口 +- ClientFormatter: 客户端响应格式化接口 +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from typing import Any, Iterator, Protocol + +from flask import Response, jsonify + +import settings +from utils.http import forward_request, gen_id, sse_response +from utils.request_logger import ( + append_client_event, + append_upstream_event, + attach_client_response, + attach_error, + attach_upstream_request, + attach_upstream_response, + finalize_turn, + set_stream_summary, +) +from utils.usage_tracker import usage_tracker + +logger = logging.getLogger(__name__) + +JsonDict = dict[str, Any] + + +# ═══════════════════════════════════════════════════════════ +# 统一数据模型 +# ═══════════════════════════════════════════════════════════ + + +@dataclass +class UnifiedUsage: + """标准化的令牌用量统计。""" + + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + + def to_cc_dict(self) -> JsonDict: + return { + 'prompt_tokens': self.input_tokens, + 'completion_tokens': self.output_tokens, + 'total_tokens': self.total_tokens, + } + + def to_responses_dict(self) -> JsonDict: + return { + 'input_tokens': self.input_tokens, + 'output_tokens': self.output_tokens, + 'total_tokens': self.total_tokens, + } + + @classmethod + def from_cc_dict(cls, d: JsonDict) -> UnifiedUsage: + return cls( + input_tokens=d.get('prompt_tokens', 0), + output_tokens=d.get('completion_tokens', 0), + total_tokens=d.get('total_tokens', 0), + ) + + @classmethod + def from_responses_dict(cls, d: JsonDict) -> UnifiedUsage: + return cls( + input_tokens=d.get('input_tokens', 0), + output_tokens=d.get('output_tokens', 0), + total_tokens=d.get('total_tokens', 0), + ) + + +# ═══════════════════════════════════════════════════════════ +# 转换器接口 +# ═══════════════════════════════════════════════════════════ + + +class OutboundTransformer(Protocol): + """出站转换器:将 CC 中间格式转换为上游后端格式。 + + 所有后端(OpenAI Chat / Responses / Anthropic / Gemini)各实现一套, + 内部复用各自现有的适配器函数。 + """ + + def build_request(self, payload: JsonDict) -> JsonDict: + """将 CC 格式请求体转换为上游格式请求体。""" + ... + + def build_url(self, ctx: Any) -> str: + """根据路由上下文构建上游请求 URL。""" + ... + + def build_headers(self, ctx: Any) -> JsonDict: + """根据路由上下文构建上游请求头。""" + ... + + def parse_response(self, raw: JsonDict) -> JsonDict: + """将上游非流式响应转换回 CC 格式。""" + ... + + def create_stream_processor(self) -> StreamProcessor: + """创建该后端对应的流式事件处理器。""" + ... + + +class StreamProcessor(Protocol): + """流式事件处理器接口。 + + 每个后端的 SSE 格式不同,StreamProcessor 封装了具体的迭代与转换逻辑, + 让通用流式处理器不必关心后端差异。 + """ + + def iter_events(self, response: Any) -> Iterator: + """从上游 HTTP 响应中迭代原始事件。""" + ... + + def process_event(self, event: Any) -> list: + """将单个上游事件转换为输出项列表。 + + 返回值通常是 list[JsonDict](CC chunk), + 但 Anthropic→Responses 路径返回 list[str](SSE 字符串)。 + """ + ... + + def extract_usage(self, event: Any) -> JsonDict | None: + """从上游事件中提取用量信息(如果有的话)。""" + ... + + def finalize(self) -> list: + """流结束时产出的收尾项。""" + ... + + +class ClientFormatter(Protocol): + """客户端响应格式化器。 + + 根据客户端期望的 API 格式(CC 或 Responses),将通用的处理结果 + 格式化为最终返回给客户端的形态。 + """ + + def format_response(self, cc_response: JsonDict, model: str) -> JsonDict: + """格式化非流式响应。""" + ... + + def wrap_stream_item(self, item: Any) -> str: + """将单个流式输出项包装为 SSE 字符串。""" + ... + + def format_error(self, message: str) -> str: + """构造流式错误消息。""" + ... + + def format_done(self) -> str | None: + """构造流结束标记(CC 返回 [DONE],Responses 返回 None)。""" + ... + + def start_events(self) -> list[str]: + """流开始前的初始事件(Responses 返回 response.created)。""" + ... + + @property + def usage_input_key(self) -> str: + """usage 中输入令牌的字段名。""" + ... + + @property + def usage_output_key(self) -> str: + """usage 中输出令牌的字段名。""" + ... + + +# ═══════════════════════════════════════════════════════════ +# 通用请求/响应处理器 +# ═══════════════════════════════════════════════════════════ + + +def _dbg(message: str) -> None: + if settings.get_debug_mode() in ('simple', 'verbose'): + logger.info('[通用调试] %s', message) + + +def extract_responses_usage(event_data: JsonDict) -> JsonDict | None: + """从原生 Responses 事件中提取 usage(公共辅助)。""" + if not isinstance(event_data, dict): + return None + usage = event_data.get('usage') + if isinstance(usage, dict): + return usage + response_obj = event_data.get('response') + if isinstance(response_obj, dict): + nested_usage = response_obj.get('usage') + if isinstance(nested_usage, dict): + return nested_usage + return None + + +def handle_non_stream( + ctx: Any, + outbound: OutboundTransformer, + client_fmt: ClientFormatter, + payload: JsonDict, + turn: JsonDict | None, +) -> Response: + """通用非流式处理器。 + + 替代 chat.py 和 responses.py 中的 8 个 _handle_xxx_non_stream 函数。 + """ + from routes.common import apply_body_modifications, apply_header_modifications, log_usage + + upstream_payload = outbound.build_request(payload) + url = outbound.build_url(ctx) + headers = outbound.build_headers(ctx) + upstream_payload = apply_body_modifications(upstream_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + upstream_payload['stream'] = False + attach_upstream_request(turn, upstream_payload, headers) + resp, err = forward_request(url, headers, upstream_payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + cc_response = outbound.parse_response(raw) + result = client_fmt.format_response(cc_response, ctx.client_model) + + _dbg('格式化后响应=' + json.dumps(result, ensure_ascii=False, default=str)[:1000]) + usage_data = result.get('usage', {}) + log_usage('通用', usage_data, input_key=client_fmt.usage_input_key, output_key=client_fmt.usage_output_key) + usage_tracker.record( + ctx.client_model, + usage_data, + input_key=client_fmt.usage_input_key, + output_key=client_fmt.usage_output_key, + ) + attach_client_response(turn, result) + finalize_turn(turn, usage=usage_data) + return jsonify(result) + + +def handle_stream( + ctx: Any, + outbound: OutboundTransformer, + client_fmt: ClientFormatter, + payload: JsonDict, + turn: JsonDict | None, +) -> Response: + """通用流式处理器。 + + 替代 chat.py 和 responses.py 中的 8 个 _handle_xxx_stream 函数。 + """ + from routes.common import apply_body_modifications, apply_header_modifications + + upstream_payload = outbound.build_request(payload) + url = outbound.build_url(ctx) + headers = outbound.build_headers(ctx) + upstream_payload = apply_body_modifications(upstream_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + upstream_payload['stream'] = True + processor = outbound.create_stream_processor() + + def generate(): + for start_evt in client_fmt.start_events(): + yield start_evt + + attach_upstream_request(turn, upstream_payload, headers) + resp, err = forward_request(url, headers, upstream_payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield client_fmt.format_error(str(err)) + return + + event_count = 0 + client_items: list[str] = [] + last_usage: JsonDict | None = None + + for event in processor.iter_events(resp): + append_upstream_event(turn, {'type': 'upstream_event', 'data': event}) + + extracted = processor.extract_usage(event) + if extracted is not None: + last_usage = extracted + + if event_count < 10: + _dbg( + f'上游事件#{event_count}=' + + json.dumps(event, ensure_ascii=False, default=str)[:500] + ) + + for chunk in processor.process_event(event): + if isinstance(chunk, dict): + chunk['model'] = ctx.client_model + wrapped = client_fmt.wrap_stream_item(chunk) + client_items.append(wrapped) + append_client_event(turn, {'type': 'stream_item', 'data': chunk}) + if event_count < 10: + _dbg( + f'返回片段#{event_count}=' + + json.dumps(chunk, ensure_ascii=False, default=str)[:500] + ) + yield wrapped + + event_count += 1 + + for chunk in processor.finalize(): + if isinstance(chunk, dict): + chunk['model'] = ctx.client_model + wrapped = client_fmt.wrap_stream_item(chunk) + client_items.append(wrapped) + append_client_event(turn, {'type': 'stream_item', 'data': chunk}) + yield wrapped + + done = client_fmt.format_done() + if done: + append_client_event(turn, {'type': 'done'}) + yield done + + _dbg(f'流式响应结束,共 {event_count} 个事件') + usage_tracker.record( + ctx.client_model, + last_usage, + input_key=client_fmt.usage_input_key, + output_key=client_fmt.usage_output_key, + ) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_item_count': len(client_items), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'stream.summary', + 'model': ctx.client_model, + 'event_count': len(client_items), + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + + return sse_response(generate()) diff --git a/routes/chat.py b/routes/chat.py index be4f775..7e01346 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -1,8 +1,7 @@ """路由: /v1/chat/completions 处理 Cursor 发来的 OpenAI Chat Completions 格式请求。 -根据模型映射的后端类型,转发到 OpenAI 兼容接口、Anthropic Messages 接口, -或原生 OpenAI Responses 接口。 +根据模型映射的后端类型,通过统一的出站转换器转发到不同后端。 """ from __future__ import annotations @@ -11,103 +10,33 @@ import json import logging from typing import Any -import settings from flask import Blueprint, jsonify, request -from adapters.cc_anthropic_adapter import ( - AnthropicStreamConverter, - cc_to_messages_request, - messages_to_cc_response, -) -from adapters.cc_gemini_adapter import ( - GeminiStreamConverter, - cc_to_gemini_request, - gemini_to_cc_response, -) -from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request -from adapters.responses_cc_adapter import ( - ResponsesToCCStreamConverter, - cc_to_responses_request, - responses_to_cc, - responses_to_cc_response, -) -from config import Config +from adapters.openai_compat_fixer import normalize_request +from adapters.responses_cc_adapter import responses_to_cc +from adapters.unified import handle_non_stream, handle_stream from routes.common import ( - RouteContext, - apply_body_modifications, - apply_header_modifications, - build_anthropic_target, - build_gemini_target, - build_openai_target, - build_responses_target, + CCClientFormatter, build_route_context, - chat_error_chunk, - inject_instructions_anthropic, + get_outbound, inject_instructions_cc, - inject_instructions_responses, log_route_context, - log_usage, - sse_data_message, ) -from utils.http import ( - forward_request, - gen_id, - iter_anthropic_sse, - iter_gemini_sse, - iter_openai_sse, - iter_responses_sse, - sse_response, -) -from utils.request_logger import ( - append_client_event, - append_upstream_event, - attach_client_response, - attach_error, - attach_upstream_request, - attach_upstream_response, - finalize_turn, - set_stream_summary, - start_turn, -) -from utils.think_tag import ThinkTagExtractor +from utils.request_logger import start_turn from utils.thinking_cache import thinking_cache -from utils.usage_tracker import usage_tracker logger = logging.getLogger(__name__) bp = Blueprint('chat', __name__) -def _dbg(message: str) -> None: - """仅在调试模式下输出详细日志。""" - if settings.get_debug_mode() in ('simple', 'verbose'): - logger.info('[聊天补全调试] %s', message) - - -def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None: - """从原生 Responses 事件中提取 usage。 - - `/v1/chat/completions -> /v1/responses` 的桥接流式路径也需要读取 usage, - 因此在本文件保留一个本地辅助函数,避免依赖其他路由模块的私有实现。 - """ - if not isinstance(event_data, dict): - return None - usage = event_data.get('usage') - if isinstance(usage, dict): - return usage - response_obj = event_data.get('response') - if isinstance(response_obj, dict): - nested_usage = response_obj.get('usage') - if isinstance(nested_usage, dict): - return nested_usage - return None - - @bp.route('/v1/chat/completions', methods=['POST']) def chat_completions(): """处理聊天补全请求并按模型映射分发到不同后端。""" original_payload = request.get_json(force=True) - payload, message_count = _normalize_chat_payload(json.loads(json.dumps(original_payload, ensure_ascii=False, default=str))) + payload, message_count = _normalize_chat_payload( + json.loads(json.dumps(original_payload, ensure_ascii=False, default=str)) + ) client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) @@ -127,23 +56,38 @@ def chat_completions(): log_route_context('聊天补全', ctx, extra=f'消息数={message_count}') _log_messages(payload) - if ctx.backend != 'responses': - payload['messages'] = thinking_cache.inject(payload.get('messages', [])) + payload['model'] = ctx.upstream_model + payload = normalize_request(payload) + payload['messages'] = thinking_cache.inject(payload.get('messages', [])) + payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) - if ctx.backend == 'openai': - return _handle_openai_backend(ctx, payload, turn) - if ctx.backend == 'responses': - return _handle_responses_backend(ctx, payload, turn) - if ctx.backend == 'gemini': - return _handle_gemini_backend(ctx, payload, turn) - return _handle_anthropic_backend(ctx, payload, turn) + outbound = get_outbound(ctx.backend) + client_fmt = CCClientFormatter() + + if ctx.is_stream: + result = handle_stream(ctx, outbound, client_fmt, payload, turn) + else: + result = handle_non_stream(ctx, outbound, client_fmt, payload, turn) + + if not ctx.is_stream and isinstance(result, tuple): + response_data = result + elif hasattr(result, 'json'): + try: + response_data = result.get_json(silent=True) or {} + except Exception: + response_data = {} + else: + response_data = {} + + _try_cache_thinking(response_data) + return result def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], int]: """整理聊天补全入口的请求体。 - 这里保留了一层兼容逻辑:当 Cursor 或调用方把 Responses 格式误发到 - `/v1/chat/completions` 时,先降级转换成 Chat Completions,再进入统一主流程。 + 当 Cursor 或调用方把 Responses 格式误发到 `/v1/chat/completions` 时, + 先降级转换成 Chat Completions,再进入统一主流程。 """ message_count = len(payload.get('messages', [])) @@ -157,548 +101,11 @@ def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], in return payload, message_count -def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any]): - """处理走 OpenAI 兼容后端的聊天补全请求。""" - _dbg( - '原始请求字段=' + str(list(payload.keys())) + ' ' - + '附加字段=' - + json.dumps( - {k: v for k, v in payload.items() if k != 'messages'}, - ensure_ascii=False, - default=str, - )[:500] - ) - - payload = normalize_request(payload, ctx.upstream_model) - payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) - _dbg( - f'标准化完成:模型={payload.get("model")} ' - f'工具数={len(payload.get("tools", []))}' - ) - - url, headers = build_openai_target(ctx) - payload = apply_body_modifications(payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - if ctx.is_stream: - return _handle_openai_stream(ctx, payload, url, headers, turn) - return _handle_openai_non_stream(ctx, payload, url, headers, turn) - - -def _handle_openai_non_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any], -): - """处理 OpenAI 兼容后端的非流式返回。""" - payload['stream'] = False - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - data = fix_response(raw) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='修复后响应') - - -def _handle_openai_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any], -): - """处理 OpenAI 兼容后端的流式返回。""" - payload['stream'] = True - - def generate(): - """消费上游 OpenAI SSE,并逐段产出给 Cursor 的聊天补全流。""" - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield chat_error_chunk(str(err)) - return - - think_extractor = ThinkTagExtractor() - chunk_count = 0 - last_usage = None - client_chunks: list[dict[str, Any]] = [] - - for chunk in iter_openai_sse(resp): - if chunk is None: - _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') - close_chunk = think_extractor.finalize() - if close_chunk: - client_chunks.append(close_chunk) - append_client_event(turn, {'type': 'chat_chunk', 'data': close_chunk}) - yield sse_data_message(close_chunk) - append_client_event(turn, {'type': 'done'}) - yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model, last_usage) - set_stream_summary(turn, { - 'chunk_count': chunk_count, - 'client_chunk_count': len(client_chunks), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'chat.completion.stream.summary', - 'model': ctx.client_model, - 'chunk_count': len(client_chunks), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - return - - append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) - if chunk.get('usage'): - last_usage = chunk['usage'] - - if chunk_count < 10: - _dbg( - f'上游原始片段#{chunk_count}=' - + json.dumps(chunk, ensure_ascii=False, default=str)[:500] - ) - - chunk = fix_stream_chunk(chunk) - chunk['model'] = ctx.client_model - - for out in think_extractor.process_chunk(chunk): - client_chunks.append(out) - append_client_event(turn, {'type': 'chat_chunk', 'data': out}) - if chunk_count < 10: - _dbg( - f'返回片段#{chunk_count}=' - + json.dumps(out, ensure_ascii=False, default=str)[:500] - ) - yield sse_data_message(out) - - chunk_count += 1 - - usage_tracker.record(ctx.client_model, last_usage) - set_stream_summary(turn, { - 'chunk_count': chunk_count, - 'client_chunk_count': len(client_chunks), - 'usage': last_usage, - 'ended_without_done': True, - }) - attach_client_response(turn, { - 'type': 'chat.completion.stream.summary', - 'model': ctx.client_model, - 'chunk_count': len(client_chunks), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) - - -def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): - """处理走原生 Responses 后端的聊天补全请求。 - - 当上游只支持 `/v1/responses` 时,需要先把聊天补全请求转换为 Responses 请求, - 返回时再转换回聊天补全协议。 - """ - responses_payload = cc_to_responses_request(payload) - responses_payload['model'] = ctx.upstream_model - responses_payload = inject_instructions_responses(responses_payload, ctx.custom_instructions, ctx.instructions_position) - _dbg( - '已转换为 Responses 请求:字段=' + str(list(responses_payload.keys())) - + f' 输入项数={len(responses_payload.get("input", []))}' - ) - - url, headers = build_responses_target(ctx) - responses_payload = apply_body_modifications(responses_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - if ctx.is_stream: - return _handle_responses_stream(ctx, responses_payload, url, headers, turn) - return _handle_responses_non_stream(ctx, responses_payload, url, headers, turn) - - -def _handle_responses_non_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理原生 Responses 后端的非流式返回。""" - payload['stream'] = False - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - data = responses_to_cc_response(raw, ctx.client_model) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='Responses 转回聊天补全后') - - -def _handle_responses_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理原生 Responses 后端的流式返回。""" - payload['stream'] = True - converter = ResponsesToCCStreamConverter(model=ctx.client_model) - - def generate(): - """消费上游 Responses 事件,并实时转换成聊天补全 chunk。""" - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield chat_error_chunk(str(err)) - return - - event_count = 0 - client_chunks: list[Any] = [] - last_usage: dict[str, Any] | None = None - for event_type, event_data in iter_responses_sse(resp): - append_upstream_event(turn, {'type': event_type, 'data': event_data}) - extracted_usage = _extract_responses_usage(event_data) - if extracted_usage: - last_usage = { - 'prompt_tokens': extracted_usage.get('input_tokens', 0), - 'completion_tokens': extracted_usage.get('output_tokens', 0), - 'total_tokens': extracted_usage.get('total_tokens', 0), - } - if event_count < 10: - _dbg( - f'上游事件#{event_count} 类型={event_type} 数据=' - + json.dumps(event_data, ensure_ascii=False, default=str)[:500] - ) - - for chunk in converter.process_event(event_type, event_data): - client_chunks.append(chunk) - append_client_event(turn, {'type': 'chat_chunk', 'data': chunk}) - if isinstance(chunk, dict) and isinstance(chunk.get('usage'), dict): - last_usage = chunk['usage'] - if event_count < 10: - _dbg( - f'返回片段#{event_count}=' - + json.dumps(chunk, ensure_ascii=False, default=str)[:500] - ) - yield sse_data_message(chunk) - - event_count += 1 - - _dbg(f'流式响应结束,共 {event_count} 个事件') - append_client_event(turn, {'type': 'done'}) - yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model, last_usage) - set_stream_summary(turn, { - 'event_count': event_count, - 'client_chunk_count': len(client_chunks), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'chat.completion.stream.summary', - 'model': ctx.client_model, - 'chunk_count': len(client_chunks), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) - - -def _handle_gemini_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): - """处理走 Gemini Contents 后端的聊天补全请求。""" - payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) - gemini_payload = cc_to_gemini_request(payload) - _dbg( - '已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys())) - + f' 内容数={len(gemini_payload.get("contents", []))}' - ) - - url, headers = build_gemini_target(ctx, stream=ctx.is_stream) - gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - if ctx.is_stream: - return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) - return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) - - -def _handle_gemini_non_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Gemini 后端的非流式返回。""" - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - data = gemini_to_cc_response(raw) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='Gemini 转回聊天补全后') - - -def _handle_gemini_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Gemini 后端的流式返回。""" - converter = GeminiStreamConverter() - - def generate(): - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield chat_error_chunk(str(err)) - return - - chunk_count = 0 - client_chunks: list[Any] = [] - last_usage: dict[str, Any] | None = None - for gemini_chunk in iter_gemini_sse(resp): - append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) - usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None - if isinstance(usage_meta, dict): - last_usage = { - 'prompt_tokens': usage_meta.get('promptTokenCount', 0), - 'completion_tokens': usage_meta.get('candidatesTokenCount', 0), - 'total_tokens': usage_meta.get('totalTokenCount', 0), - } - if chunk_count < 10: - _dbg( - f'上游 Gemini 片段#{chunk_count}=' - + json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500] - ) - - for cc_chunk in converter.process_chunk(gemini_chunk): - cc_chunk['model'] = ctx.client_model - client_chunks.append(cc_chunk) - append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk}) - if isinstance(cc_chunk, dict) and isinstance(cc_chunk.get('usage'), dict): - last_usage = cc_chunk['usage'] - if chunk_count < 10: - _dbg( - f'返回片段#{chunk_count}=' - + json.dumps(cc_chunk, ensure_ascii=False, default=str)[:500] - ) - yield sse_data_message(cc_chunk) - - chunk_count += 1 - - _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') - append_client_event(turn, {'type': 'done'}) - yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model, last_usage) - set_stream_summary(turn, { - 'chunk_count': chunk_count, - 'client_chunk_count': len(client_chunks), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'chat.completion.stream.summary', - 'model': ctx.client_model, - 'chunk_count': len(client_chunks), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) - - -def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): - """处理走 Anthropic Messages 后端的聊天补全请求。""" - payload['model'] = ctx.upstream_model - anthropic_payload = cc_to_messages_request(payload) - anthropic_payload = inject_instructions_anthropic(anthropic_payload, ctx.custom_instructions, ctx.instructions_position) - _dbg( - '已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys())) - + f' 消息数={len(anthropic_payload.get("messages", []))}' - ) - - url, headers = build_anthropic_target(ctx) - anthropic_payload = apply_body_modifications(anthropic_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - if ctx.is_stream: - return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) - return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) - - -def _handle_anthropic_non_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Anthropic 后端的非流式返回。""" - payload['stream'] = False - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - data = messages_to_cc_response(raw) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='Messages 转回聊天补全后') - - -def _handle_anthropic_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Anthropic 后端的流式返回。 - - 这里仍然保留独立的事件级转换器,而不是先落成完整响应再回放, - 是为了尽量保持 Cursor 端的流式体验和工具调用时序。 - """ - payload['stream'] = True - converter = AnthropicStreamConverter() - - def generate(): - """消费上游 Anthropic 事件流,并逐步映射为聊天补全 SSE。""" - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield chat_error_chunk(str(err)) - return - - event_count = 0 - client_chunks: list[Any] = [] - last_usage: dict[str, Any] | None = None - for event_type, event_data in iter_anthropic_sse(resp): - append_upstream_event(turn, {'type': event_type, 'data': event_data}) - if event_type == 'message_start': - message_usage = event_data.get('message', {}).get('usage', {}) - if isinstance(message_usage, dict): - last_usage = { - 'prompt_tokens': message_usage.get('input_tokens', 0), - 'completion_tokens': 0, - 'total_tokens': message_usage.get('input_tokens', 0), - } - elif event_type == 'message_delta': - delta_usage = event_data.get('usage', {}) - if isinstance(delta_usage, dict): - prompt_tokens = 0 - if isinstance(last_usage, dict): - prompt_tokens = last_usage.get('prompt_tokens', 0) - completion_tokens = delta_usage.get('output_tokens', 0) - last_usage = { - 'prompt_tokens': prompt_tokens, - 'completion_tokens': completion_tokens, - 'total_tokens': prompt_tokens + completion_tokens, - } - if event_count < 10: - _dbg( - f'上游事件#{event_count} 类型={event_type} 数据=' - + json.dumps(event_data, ensure_ascii=False, default=str)[:500] - ) - - for chunk_str in converter.process_event(event_type, event_data): - try: - chunk_obj = json.loads(chunk_str) - chunk_obj['model'] = ctx.client_model - if isinstance(chunk_obj.get('usage'), dict): - last_usage = chunk_obj['usage'] - chunk_str = json.dumps(chunk_obj, ensure_ascii=False) - except (json.JSONDecodeError, TypeError): - pass - - client_chunks.append(chunk_str) - append_client_event(turn, {'type': 'chat_chunk', 'data': chunk_str}) - if event_count < 10: - _dbg(f'返回片段#{event_count}={chunk_str[:500]}') - yield sse_data_message(chunk_str) - - event_count += 1 - - _dbg(f'流式响应结束,共 {event_count} 个事件') - append_client_event(turn, {'type': 'done'}) - yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model, last_usage) - set_stream_summary(turn, { - 'event_count': event_count, - 'client_chunk_count': len(client_chunks), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'chat.completion.stream.summary', - 'model': ctx.client_model, - 'chunk_count': len(client_chunks), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) - - -def _finalize_chat_response( - ctx: RouteContext, - data: dict[str, Any], - *, - turn: dict[str, Any] | None, - debug_label: str, -): - """统一收尾非流式聊天补全响应。 - - 三条后端链路最终都会回到 Chat Completions 格式,因此这里集中做: - - 回填给 Cursor 展示的模型名 - - 输出统一调试日志 - - 输出统一令牌统计日志 - """ - data['model'] = ctx.client_model - _dbg(debug_label + '=' + json.dumps(data, ensure_ascii=False, default=str)[:1000]) - log_usage('聊天补全', data.get('usage', {}), input_key='prompt_tokens', output_key='completion_tokens') - - usage_tracker.record(ctx.client_model, data.get('usage')) - attach_client_response(turn, data) - finalize_turn(turn, usage=data.get('usage')) - - for choice in data.get('choices', []): +def _try_cache_thinking(response_data: dict[str, Any]) -> None: + """尝试从非流式响应中缓存思维链内容。""" + if not isinstance(response_data, dict): + return + for choice in response_data.get('choices', []): msg = choice.get('message', {}) if msg.get('reasoning_content'): thinking_cache.store_from_response( @@ -707,8 +114,6 @@ def _finalize_chat_response( ) break - return jsonify(data) - def _log_messages(payload: dict[str, Any]) -> None: """记录消息摘要,方便排查请求形态是否符合预期。""" diff --git a/routes/common.py b/routes/common.py index 0ad7518..eba89b2 100644 --- a/routes/common.py +++ b/routes/common.py @@ -12,7 +12,6 @@ import logging from typing import Any import settings -from utils.http import build_anthropic_headers, build_gemini_headers, build_openai_headers logger = logging.getLogger(__name__) @@ -55,42 +54,6 @@ def build_route_context(client_model: str, is_stream: bool) -> RouteContext: ) -def build_openai_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: - """根据路由上下文生成 OpenAI 兼容后端的地址和请求头。""" - url = f'{ctx.target_url.rstrip("/")}/v1/chat/completions' - headers = build_openai_headers(ctx.api_key) - return url, headers - - -def build_responses_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: - """根据路由上下文生成 OpenAI Responses 后端的地址和请求头。""" - url = f'{ctx.target_url.rstrip("/")}/v1/responses' - headers = build_openai_headers(ctx.api_key) - return url, headers - - -def build_anthropic_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: - """根据路由上下文生成 Anthropic 后端的地址和请求头。""" - url = f'{ctx.target_url.rstrip("/")}/v1/messages' - headers = build_anthropic_headers(ctx.api_key) - return url, headers - - -def build_gemini_target(ctx: RouteContext, stream: bool = False) -> tuple[str, dict[str, str]]: - """根据路由上下文生成 Gemini 后端的地址和请求头。 - - Gemini URL 格式: {base}/v1/models/{model}:generateContent - 流式: {base}/v1/models/{model}:streamGenerateContent?alt=sse - """ - base = ctx.target_url.rstrip('/') - model = ctx.upstream_model - if stream: - url = f'{base}/v1/models/{model}:streamGenerateContent?alt=sse' - else: - url = f'{base}/v1/models/{model}:generateContent' - headers = build_gemini_headers(ctx.api_key) - return url, headers - def log_route_context(route_name: str, ctx: RouteContext, *, extra: str = '') -> None: """统一输出路由级日志,避免不同入口的日志格式逐渐漂移。""" @@ -137,11 +100,6 @@ def sse_event_message(event_type: str, data: Any) -> str: return f'event: {event_type}\ndata: {payload}\n\n' -def chat_error_chunk(message: str, error_type: str = 'upstream_error') -> str: - """构造聊天补全流式接口使用的错误消息。""" - return sse_data_message({'error': {'message': message, 'type': error_type}}) - - def responses_error_event(message: str) -> str: """构造 Responses 流式接口使用的错误事件。""" return sse_event_message('error', {'error': message}) @@ -248,3 +206,140 @@ def apply_header_modifications(headers: dict[str, str], modifications: dict[str, headers[key] = str(value) logger.info('已应用 header_modifications: %s', list(modifications.keys())) return headers + + +# ═══════════════════════════════════════════════════════════ +# 后端注册表 + ClientFormatter 实现 +# ═══════════════════════════════════════════════════════════ + + +def get_outbound(backend: str): + """根据后端类型获取对应的 OutboundTransformer 实例。""" + from adapters.cc_anthropic_adapter import AnthropicOutbound + from adapters.cc_gemini_adapter import GeminiOutbound + from adapters.openai_compat_fixer import OpenAIChatOutbound + from adapters.responses_cc_adapter import ResponsesOutbound + + registry = { + 'openai': OpenAIChatOutbound, + 'anthropic': AnthropicOutbound, + 'gemini': GeminiOutbound, + 'responses': ResponsesOutbound, + } + cls = registry.get(backend, OpenAIChatOutbound) + return cls() + + +class CCClientFormatter: + """Chat Completions 客户端格式化器。 + + 将通用处理结果格式化为 OpenAI Chat Completions 格式, + 供 /v1/chat/completions 端点使用。 + """ + + def format_response(self, cc_response: dict[str, Any], model: str) -> dict[str, Any]: + cc_response['model'] = model + return cc_response + + def wrap_stream_item(self, item: Any) -> str: + payload = item if isinstance(item, str) else json.dumps(item, ensure_ascii=False) + return f'data: {payload}\n\n' + + def format_error(self, message: str) -> str: + return sse_data_message({'error': {'message': message, 'type': 'upstream_error'}}) + + def format_done(self) -> str | None: + return sse_data_message('[DONE]') + + def start_events(self) -> list[str]: + return [] + + @property + def usage_input_key(self) -> str: + return 'prompt_tokens' + + @property + def usage_output_key(self) -> str: + return 'completion_tokens' + + +class ResponsesClientFormatter: + """Responses API 客户端格式化器。 + + 将通用处理结果格式化为 OpenAI Responses 格式, + 供 /v1/responses 端点使用。 + + 流式场景使用 ResponsesStreamConverter 做 CC chunk → Responses SSE 转换。 + """ + + def __init__(self, model: str = ''): + from adapters.responses_cc_adapter import ResponsesStreamConverter, cc_to_responses + self._model = model + self._converter = ResponsesStreamConverter(model=model) + self._cc_to_responses = cc_to_responses + + def format_response(self, cc_response: dict[str, Any], model: str) -> dict[str, Any]: + return self._cc_to_responses(cc_response, model) + + def wrap_stream_item(self, item: Any) -> str: + if isinstance(item, str): + return item + events = self._converter.process_cc_chunk(item) + return ''.join(events) + + def format_error(self, message: str) -> str: + return responses_error_event(message) + + def format_done(self) -> str | None: + events = self._converter.finalize() + return ''.join(events) if events else None + + def start_events(self) -> list[str]: + return self._converter.start_events() + + @property + def usage_input_key(self) -> str: + return 'input_tokens' + + @property + def usage_output_key(self) -> str: + return 'output_tokens' + + +class ResponsesPassthroughFormatter: + """Responses 透传格式化器。 + + 当后端本身就是 Responses 格式时使用,做轻量模型名改写。 + """ + + def __init__(self, model: str = ''): + self._model = model + + def format_response(self, response_data: dict[str, Any], model: str) -> dict[str, Any]: + response_data['model'] = model + return response_data + + def wrap_stream_item(self, item: Any) -> str: + if isinstance(item, str): + return item + event_type = item.pop('_sse_event_type', None) + if event_type: + return f'event: {event_type}\ndata: {json.dumps(item, ensure_ascii=False)}\n\n' + return f'data: {json.dumps(item, ensure_ascii=False)}\n\n' + + def format_error(self, message: str) -> str: + return responses_error_event(message) + + def format_done(self) -> str | None: + return None + + def start_events(self) -> list[str]: + return [] + + @property + def usage_input_key(self) -> str: + return 'input_tokens' + + @property + def usage_output_key(self) -> str: + return 'output_tokens' diff --git a/routes/responses.py b/routes/responses.py index 4889a40..6732660 100644 --- a/routes/responses.py +++ b/routes/responses.py @@ -1,7 +1,7 @@ """路由: /v1/responses 处理 Cursor 对 GPT、Claude-Opus 等模型发出的 Responses API 请求。 -请求会先转换为 Chat Completions 中间表示,再按后端类型分发,最后转换回 Responses 格式。 +请求先转换为 Chat Completions 中间表示,再通过统一出站转换器分发。 """ from __future__ import annotations @@ -13,62 +13,30 @@ from typing import Any import settings from flask import Blueprint, jsonify, request -from adapters.cc_anthropic_adapter import cc_to_messages_request, messages_to_cc_response -from adapters.cc_gemini_adapter import GeminiStreamConverter, cc_to_gemini_request, gemini_to_cc_response -from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request -from adapters.responses_cc_adapter import ResponsesStreamConverter, cc_to_responses, responses_to_cc -from config import Config +from adapters.openai_compat_fixer import normalize_request +from adapters.responses_cc_adapter import ( + AnthropicOutboundForResponses, + ResponsesNativeOutbound, + responses_to_cc, +) +from adapters.unified import handle_non_stream, handle_stream from routes.common import ( - RouteContext, - apply_body_modifications, - apply_header_modifications, - build_anthropic_target, - build_gemini_target, - build_openai_target, - build_responses_target, + ResponsesClientFormatter, + ResponsesPassthroughFormatter, build_route_context, - inject_instructions_anthropic, + get_outbound, inject_instructions_cc, inject_instructions_responses, log_route_context, - log_usage, - responses_error_event, ) -from utils.http import ( - forward_request, - gen_id, - iter_anthropic_sse, - iter_gemini_sse, - iter_openai_sse, - iter_responses_sse, - sse_response, -) -from utils.request_logger import ( - append_client_event, - append_upstream_event, - attach_client_response, - attach_error, - attach_upstream_request, - attach_upstream_response, - finalize_turn, - set_stream_summary, - start_turn, -) -from utils.think_tag import ThinkTagExtractor +from utils.request_logger import start_turn from utils.thinking_cache import thinking_cache -from utils.usage_tracker import usage_tracker logger = logging.getLogger(__name__) bp = Blueprint('responses', __name__) -def _dbg(message: str) -> None: - """仅在调试模式下输出详细日志。""" - if settings.get_debug_mode() in ('simple', 'verbose'): - logger.info('[响应生成调试] %s', message) - - @bp.route('/v1/responses', methods=['POST']) def responses_endpoint(): """处理 Responses 请求并按模型映射分发。""" @@ -90,543 +58,42 @@ def responses_endpoint(): ) log_route_context('响应生成', ctx) + if ctx.backend == 'responses': + return _handle_native_responses(ctx, payload, turn) + cc_payload = _build_cc_payload(payload, ctx) - if ctx.backend == 'openai': - return _handle_openai_backend(ctx, cc_payload, turn) - if ctx.backend == 'responses': - return _handle_responses_backend(ctx, payload, turn) - if ctx.backend == 'gemini': - return _handle_gemini_backend(ctx, cc_payload, turn) - return _handle_anthropic_backend(ctx, cc_payload, turn) + if ctx.backend == 'anthropic': + outbound = AnthropicOutboundForResponses() + else: + outbound = get_outbound(ctx.backend) - -def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, Any]: - """将 Responses 请求统一降级为 Chat Completions 中间表示。 - - 这样后续无论走 OpenAI 兼容后端还是 Anthropic 后端,都能复用一套 - 中间协议,避免在路由层同时维护两套完全不同的请求编排逻辑。 - """ - cc_payload = responses_to_cc(payload) - cc_payload['model'] = ctx.upstream_model - cc_payload['messages'] = thinking_cache.inject(cc_payload.get('messages', [])) - cc_payload = inject_instructions_cc(cc_payload, ctx.custom_instructions, ctx.instructions_position) - _dbg( - '已转换为聊天补全中间表示:字段=' + str(list(cc_payload.keys())) - + f' 消息数={len(cc_payload.get("messages", []))}' - ) - return cc_payload - - -def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any]): - """处理走 OpenAI 兼容后端的 Responses 请求。""" - cc_payload = normalize_request(cc_payload) - _dbg( - f'标准化完成:模型={cc_payload.get("model")} ' - f'工具数={len(cc_payload.get("tools", []))}' - ) - - url, headers = build_openai_target(ctx) - cc_payload = apply_body_modifications(cc_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) + client_fmt = ResponsesClientFormatter(model=ctx.client_model) if ctx.is_stream: - return _handle_openai_stream(ctx, cc_payload, url, headers, turn) - return _handle_openai_non_stream(ctx, cc_payload, url, headers, turn) + return handle_stream(ctx, outbound, client_fmt, cc_payload, turn) + return handle_non_stream(ctx, outbound, client_fmt, cc_payload, turn) -def _handle_openai_non_stream( - ctx: RouteContext, - cc_payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any], -): - """处理 OpenAI 兼容后端的非流式 Responses 返回。""" - cc_payload['stream'] = False - attach_upstream_request(turn, cc_payload, headers) - resp, err = forward_request(url, headers, cc_payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - fixed = fix_response(raw) - response_data = cc_to_responses(fixed, ctx.client_model) - return _finalize_responses_response( - response_data, - client_model=ctx.client_model, - turn=turn, - debug_label='转换为 Responses 后', - ) - - -def _handle_openai_stream( - ctx: RouteContext, - cc_payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 OpenAI 兼容后端的流式 Responses 返回。""" - cc_payload['stream'] = True - converter = ResponsesStreamConverter(model=ctx.client_model) - - def generate(): - """消费 OpenAI 聊天补全流,并实时改写为 Responses SSE。""" - yield from converter.start_events() - - attach_upstream_request(turn, cc_payload, headers) - resp, err = forward_request(url, headers, cc_payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield responses_error_event(str(err)) - return - - think_extractor = ThinkTagExtractor() - chunk_count = 0 - client_events: list[str] = [] - - for chunk in iter_openai_sse(resp): - if chunk is None: - _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') - finalized_events = converter.finalize() - for item in finalized_events: - client_events.append(item) - append_client_event(turn, {'type': 'responses_event', 'data': item}) - yield item - usage_tracker.record(ctx.client_model) - set_stream_summary(turn, { - 'chunk_count': chunk_count, - 'client_event_count': len(client_events), - }) - attach_client_response(turn, { - 'type': 'responses.stream.summary', - 'model': ctx.client_model, - 'event_count': len(client_events), - }) - finalize_turn(turn) - return - - append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) - if chunk_count < 10: - _dbg( - f'上游原始片段#{chunk_count}=' - + json.dumps(chunk, ensure_ascii=False, default=str)[:500] - ) - - chunk = fix_stream_chunk(chunk) - for out in think_extractor.process_chunk(chunk): - for evt in converter.process_cc_chunk(out): - client_events.append(evt) - append_client_event(turn, {'type': 'responses_event', 'data': evt}) - if chunk_count < 10: - _dbg( - f'转换后片段#{chunk_count}=' - + json.dumps(out, ensure_ascii=False, default=str)[:500] - ) - yield evt - - chunk_count += 1 - - return sse_response(generate()) - - -def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): - """处理走原生 Responses 后端的请求。 - - 当中转站本身就只支持 `/v1/responses` 时,不需要再绕到聊天补全中间协议, - 直接转发原生 Responses 请求即可。 - """ +def _handle_native_responses(ctx, payload: dict[str, Any], turn: dict[str, Any]): + """处理走原生 Responses 后端的请求(直接透传)。""" payload = dict(payload) payload['model'] = ctx.upstream_model payload = inject_instructions_responses(payload, ctx.custom_instructions, ctx.instructions_position) - url, headers = build_responses_target(ctx) - payload = apply_body_modifications(payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) + + outbound = ResponsesNativeOutbound() + client_fmt = ResponsesPassthroughFormatter(model=ctx.client_model) if ctx.is_stream: - return _handle_responses_stream(ctx, payload, url, headers, turn) - return _handle_responses_non_stream(ctx, payload, url, headers, turn) + return handle_stream(ctx, outbound, client_fmt, payload, turn) + return handle_non_stream(ctx, outbound, client_fmt, payload, turn) -def _handle_responses_non_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理原生 Responses 后端的非流式返回。""" - payload['stream'] = False - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - response_data = resp.json() - attach_upstream_response(turn, response_data) - response_data['model'] = ctx.client_model - return _finalize_responses_response( - response_data, - client_model=ctx.client_model, - turn=turn, - debug_label='原生 Responses 返回后', - ) - - -def _handle_responses_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理原生 Responses 后端的流式返回。""" - payload['stream'] = True - converter = ResponsesStreamConverter(model=ctx.client_model) - - def generate(): - """透传上游原生 Responses 流,并做轻量模型名改写。""" - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield responses_error_event(str(err)) - return - - event_count = 0 - client_events: list[str] = [] - last_usage: dict[str, Any] | None = None - for event_type, event_data in iter_responses_sse(resp): - append_upstream_event(turn, {'type': event_type, 'data': event_data}) - extracted_usage = _extract_responses_usage(event_data) - if extracted_usage: - last_usage = extracted_usage - if event_count < 10: - _dbg( - f'上游事件#{event_count} 类型={event_type} 数据=' - + json.dumps(event_data, ensure_ascii=False, default=str)[:500] - ) - produced = converter.process_responses_event(event_type, event_data) - for evt in produced: - client_events.append(evt) - append_client_event(turn, {'type': 'responses_event', 'data': evt}) - yield evt - event_count += 1 - - _dbg(f'流式响应结束,共 {event_count} 个事件') - usage_tracker.record( - ctx.client_model, - last_usage, - input_key='input_tokens', - output_key='output_tokens', - ) - set_stream_summary(turn, { - 'event_count': event_count, - 'client_event_count': len(client_events), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'responses.stream.summary', - 'model': ctx.client_model, - 'event_count': len(client_events), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) - - -def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None: - """从原生 Responses 事件中提取 usage。 - - 原生 `/v1/responses` 流式通常会在 `response.completed` 事件里携带 usage, - 也可能直接挂在顶层 `usage` 字段。这里统一做兼容提取,供统计与日志复用。 - """ - if not isinstance(event_data, dict): - return None - usage = event_data.get('usage') - if isinstance(usage, dict): - return usage - response_obj = event_data.get('response') - if isinstance(response_obj, dict): - nested_usage = response_obj.get('usage') - if isinstance(nested_usage, dict): - return nested_usage - return None - - -def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): - """处理走 Gemini Contents 后端的 Responses 请求。""" - gemini_payload = cc_to_gemini_request(cc_payload) - _dbg( - '已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys())) - + f' 内容数={len(gemini_payload.get("contents", []))}' - ) - - url, headers = build_gemini_target(ctx, stream=ctx.is_stream) - gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - if ctx.is_stream: - return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) - return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) - - -def _handle_gemini_non_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Gemini 后端的非流式 Responses 返回。""" - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - cc_data = gemini_to_cc_response(raw) - response_data = cc_to_responses(cc_data, ctx.client_model) - return _finalize_responses_response( - response_data, - client_model=ctx.client_model, - turn=turn, - debug_label='Gemini 转回 Responses 后', - ) - - -def _handle_gemini_stream( - ctx: RouteContext, - payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Gemini 后端的流式 Responses 返回。""" - converter = ResponsesStreamConverter(model=ctx.client_model) - gemini_converter = GeminiStreamConverter() - - def generate(): - yield from converter.start_events() - - attach_upstream_request(turn, payload, headers) - resp, err = forward_request(url, headers, payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield responses_error_event(str(err)) - return - - chunk_count = 0 - client_events: list[str] = [] - last_usage: dict[str, Any] | None = None - for gemini_chunk in iter_gemini_sse(resp): - append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) - usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None - if isinstance(usage_meta, dict): - last_usage = { - 'input_tokens': usage_meta.get('promptTokenCount', 0), - 'output_tokens': usage_meta.get('candidatesTokenCount', 0), - 'total_tokens': usage_meta.get('totalTokenCount', 0), - } - if chunk_count < 10: - _dbg( - f'上游 Gemini 片段#{chunk_count}=' - + json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500] - ) - - for cc_chunk in gemini_converter.process_chunk(gemini_chunk): - for evt in converter.process_cc_chunk(cc_chunk): - client_events.append(evt) - append_client_event(turn, {'type': 'responses_event', 'data': evt}) - yield evt - - chunk_count += 1 - - _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') - finalized_events = converter.finalize() - for evt in finalized_events: - client_events.append(evt) - append_client_event(turn, {'type': 'responses_event', 'data': evt}) - yield evt - usage_tracker.record( - ctx.client_model, - last_usage, - input_key='input_tokens', - output_key='output_tokens', - ) - set_stream_summary(turn, { - 'chunk_count': chunk_count, - 'client_event_count': len(client_events), - 'usage': last_usage, - }) - attach_client_response(turn, { - 'type': 'responses.stream.summary', - 'model': ctx.client_model, - 'event_count': len(client_events), - 'usage': last_usage, - }) - finalize_turn(turn, usage=last_usage) - - return sse_response(generate()) - - -def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): - """处理走 Anthropic 后端的 Responses 请求。""" - anthropic_payload = cc_to_messages_request(cc_payload) - _dbg( - '已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys())) - + f' 消息数={len(anthropic_payload.get("messages", []))}' - ) - - url, headers = build_anthropic_target(ctx) - anthropic_payload = apply_body_modifications(anthropic_payload, ctx.body_modifications) - headers = apply_header_modifications(headers, ctx.header_modifications) - - if ctx.is_stream: - return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) - return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) - - -def _handle_anthropic_non_stream( - ctx: RouteContext, - anthropic_payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Anthropic 后端的非流式 Responses 返回。""" - anthropic_payload['stream'] = False - attach_upstream_request(turn, anthropic_payload, headers) - resp, err = forward_request(url, headers, anthropic_payload) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) - finalize_turn(turn) - return err - - raw = resp.json() - attach_upstream_response(turn, raw) - _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) - - cc_data = messages_to_cc_response(raw) - response_data = cc_to_responses(cc_data, ctx.client_model) - return _finalize_responses_response( - response_data, - client_model=ctx.client_model, - turn=turn, - debug_label='Messages 转回 Responses 后', - ) - - -def _handle_anthropic_stream( - ctx: RouteContext, - anthropic_payload: dict[str, Any], - url: str, - headers: dict[str, str], - turn: dict[str, Any] | None, -): - """处理 Anthropic 后端的流式 Responses 返回。 - - 这里直接将 Anthropic SSE 事件映射到 Responses SSE,故意跳过 CC 流式中间态, - 这样可以减少一次事件重组,降低流式转换复杂度,也更容易保留原始时序。 - """ - anthropic_payload['stream'] = True - converter = ResponsesStreamConverter(model=ctx.client_model) - - def generate(): - """消费 Anthropic SSE,并直接映射为 Responses 事件序列。""" - yield from converter.start_events() - - attach_upstream_request(turn, anthropic_payload, headers) - resp, err = forward_request(url, headers, anthropic_payload, stream=True) - if err: - attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) - set_stream_summary(turn, {'status': 'error'}) - finalize_turn(turn) - yield responses_error_event(str(err)) - return - - event_count = 0 - client_events: list[str] = [] - for event_type, event_data in iter_anthropic_sse(resp): - append_upstream_event(turn, {'type': event_type, 'data': event_data}) - if event_count < 10: - _dbg( - f'上游事件#{event_count} 类型={event_type} 数据=' - + json.dumps(event_data, ensure_ascii=False, default=str)[:500] - ) - - produced = converter.process_anthropic_event(event_type, event_data) - for evt in produced: - client_events.append(evt) - append_client_event(turn, {'type': 'responses_event', 'data': evt}) - yield evt - event_count += 1 - - _dbg(f'流式响应结束,共 {event_count} 个事件') - finalized_events = converter.finalize() - for evt in finalized_events: - client_events.append(evt) - append_client_event(turn, {'type': 'responses_event', 'data': evt}) - yield evt - usage_tracker.record(ctx.client_model) - set_stream_summary(turn, { - 'event_count': event_count, - 'client_event_count': len(client_events), - }) - attach_client_response(turn, { - 'type': 'responses.stream.summary', - 'model': ctx.client_model, - 'event_count': len(client_events), - }) - finalize_turn(turn) - - return sse_response(generate()) - - -def _finalize_responses_response( - response_data: dict[str, Any], - *, - client_model: str, - turn: dict[str, Any], - debug_label: str, -): - """统一收尾非流式 Responses 响应。 - - 两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象,因此这里集中 - 处理调试日志、回填展示模型名以及 usage 日志。 - """ - response_data['model'] = response_data.get('model') or '' - _dbg(debug_label + '=' + json.dumps(response_data, ensure_ascii=False, default=str)[:1000]) - log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') - - usage_tracker.record( - client_model, - response_data.get('usage'), - input_key='input_tokens', - output_key='output_tokens', - ) - - attach_client_response(turn, response_data) - finalize_turn(turn, usage=response_data.get('usage')) - - return jsonify(response_data) +def _build_cc_payload(payload: dict[str, Any], ctx) -> dict[str, Any]: + """将 Responses 请求统一降级为 Chat Completions 中间表示。""" + cc_payload = responses_to_cc(payload) + cc_payload['model'] = ctx.upstream_model + cc_payload = normalize_request(cc_payload) + cc_payload['messages'] = thinking_cache.inject(cc_payload.get('messages', [])) + cc_payload = inject_instructions_cc(cc_payload, ctx.custom_instructions, ctx.instructions_position) + return cc_payload