diff --git a/adapters/cc_anthropic_adapter.py b/adapters/cc_anthropic_adapter.py index 4f42cad..512b3de 100644 --- a/adapters/cc_anthropic_adapter.py +++ b/adapters/cc_anthropic_adapter.py @@ -62,7 +62,9 @@ def cc_to_messages_request(payload: JsonDict) -> JsonDict: anthropic_messages.append(converted) anthropic_messages = _merge_same_role(anthropic_messages) - return _build_messages_request(payload, anthropic_messages, system_parts) + result = _build_messages_request(payload, anthropic_messages, system_parts) + optimize_cache_control(result) + return result # ═══════════════════════════════════════════════════════════ @@ -560,3 +562,141 @@ def _merge_same_role(messages: list[JsonDict]) -> list[JsonDict]: else: merged.append(message) return merged + + +# ═══════════════════════════════════════════════════════════ +# Anthropic cache_control 优化 +# ═══════════════════════════════════════════════════════════ + +_MAX_BREAKPOINTS = 4 +_BLOCK_WINDOW = 20 +_EPHEMERAL = {'type': 'ephemeral'} + + +def optimize_cache_control(request: JsonDict) -> None: + """自动设置最优的 Anthropic cache_control 断点。 + + 算法移植自 CursorProxy 的 ensure_cache_control.go: + 1. 归一化所有消息 content 为数组格式 + 2. 清空所有已有 cache_control + 3. 注入结构锚点(tools 末尾 + system 末尾) + 4. 注入消息锚点(最后一个可缓存块 + 窗口边界) + 5. 总断点数不超过 4 个 + """ + _normalize_message_contents(request) + _clear_all_cache_controls(request) + + structural = _inject_structural_anchors(request) + remaining = _MAX_BREAKPOINTS - structural + if remaining <= 0: + return + + refs = _collect_cacheable_block_refs(request) + if not refs: + return + + desired = 1 if len(refs) < _BLOCK_WINDOW else 2 + anchors = min(desired, remaining) + + if anchors >= 1 and refs: + refs[-1]['cache_control'] = _EPHEMERAL + + if anchors >= 2 and len(refs) > 1: + target = len(refs) - _BLOCK_WINDOW + idx = _pick_window_anchor(refs, target) + if idx is not None and idx != len(refs) - 1: + refs[idx]['cache_control'] = _EPHEMERAL + + +def _normalize_message_contents(request: JsonDict) -> None: + """将所有消息的 content 统一转为数组格式。""" + for msg in request.get('messages', []): + content = msg.get('content') + if isinstance(content, str): + msg['content'] = [{'type': 'text', 'text': content}] + elif content is None: + msg['content'] = [] + + +def _clear_all_cache_controls(request: JsonDict) -> None: + """清空所有已有的 cache_control 字段。""" + for tool in request.get('tools', []): + tool.pop('cache_control', None) + + system = request.get('system') + if isinstance(system, list): + for block in system: + if isinstance(block, dict): + block.pop('cache_control', None) + + for msg in request.get('messages', []): + content = msg.get('content') + if isinstance(content, list): + for block in content: + if isinstance(block, dict): + block.pop('cache_control', None) + + +def _inject_structural_anchors(request: JsonDict) -> int: + """在 tools 末尾和 system 末尾注入结构锚点,返回注入数量。""" + count = 0 + + tools = request.get('tools') + if tools and isinstance(tools, list): + tools[-1]['cache_control'] = _EPHEMERAL + count += 1 + + system = request.get('system') + if isinstance(system, list) and system: + last = system[-1] + if isinstance(last, dict): + last['cache_control'] = _EPHEMERAL + count += 1 + elif isinstance(system, str) and system: + request['system'] = [ + {'type': 'text', 'text': system, 'cache_control': _EPHEMERAL} + ] + count += 1 + + return count + + +def _is_cacheable_block(block: Any) -> bool: + """判断一个内容块是否可以设置 cache_control。""" + if not isinstance(block, dict): + return False + block_type = block.get('type', '') + if block_type in ('thinking', 'redacted_thinking'): + return False + if block_type == 'text' and not block.get('text'): + return False + return True + + +def _collect_cacheable_block_refs(request: JsonDict) -> list[JsonDict]: + """收集所有消息中可缓存块的引用列表。""" + refs: list[JsonDict] = [] + for msg in request.get('messages', []): + content = msg.get('content') + if not isinstance(content, list): + continue + for block in content: + if _is_cacheable_block(block): + refs.append(block) + return refs + + +def _pick_window_anchor(refs: list[JsonDict], target: int) -> int | None: + """在目标位置附近选择一个窗口锚点,优先左侧。""" + if target < 0: + target = 0 + if target >= len(refs): + return None + + for i in range(target, -1, -1): + if 'cache_control' not in refs[i]: + return i + for i in range(target + 1, len(refs)): + if 'cache_control' not in refs[i]: + return i + return None diff --git a/adapters/cc_gemini_adapter.py b/adapters/cc_gemini_adapter.py new file mode 100644 index 0000000..5e8aad0 --- /dev/null +++ b/adapters/cc_gemini_adapter.py @@ -0,0 +1,363 @@ +"""OpenAI Chat Completions ↔ Gemini Contents 格式转换 + +将 CC 格式请求转换为 Gemini generateContent 格式, +并将 Gemini 响应转换回 CC 格式。仅支持出站方向(CC → Gemini → CC)。 +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from utils.http import gen_id + +JsonDict = dict[str, Any] + +logger = logging.getLogger(__name__) + +_FINISH_REASON_MAP = { + 'STOP': 'stop', + 'MAX_TOKENS': 'length', + 'SAFETY': 'content_filter', + 'RECITATION': 'content_filter', +} + + +# ═══════════════════════════════════════════════════════════ +# 请求转换: CC → Gemini generateContent +# ═══════════════════════════════════════════════════════════ + + +def cc_to_gemini_request(payload: JsonDict) -> JsonDict: + """将 CC 请求转换为 Gemini generateContent 请求。""" + messages = payload.get('messages', []) + system_parts: list[str] = [] + contents: list[JsonDict] = [] + + for msg in messages: + role = msg.get('role', '') + if role in ('system', 'developer'): + system_parts.append(_flatten_text(msg.get('content', ''))) + continue + converted = _convert_message(msg) + if converted: + contents.append(converted) + + contents = _merge_same_role(contents) + + result: JsonDict = { + 'contents': contents, + 'generationConfig': _build_generation_config(payload), + } + + if system_parts: + result['systemInstruction'] = { + 'parts': [{'text': '\n\n'.join(system_parts)}], + } + + tools = _convert_tools(payload.get('tools')) + if tools: + result['tools'] = tools + + return result + + +# ═══════════════════════════════════════════════════════════ +# 非流式响应转换: Gemini → CC +# ═══════════════════════════════════════════════════════════ + + +def gemini_to_cc_response(data: JsonDict, request_id: str | None = None) -> JsonDict: + """将 Gemini generateContent 响应转换为 CC 响应。""" + request_id = request_id or gen_id('chatcmpl-') + candidates = data.get('candidates', []) + candidate = candidates[0] if candidates else {} + + content_text, reasoning_text, tool_calls = _extract_parts( + candidate.get('content', {}).get('parts', []) + ) + + finish = candidate.get('finishReason', 'STOP') + if tool_calls and finish == 'STOP': + finish_reason = 'tool_calls' + else: + finish_reason = _FINISH_REASON_MAP.get(finish, 'stop') + + message: JsonDict = {'role': 'assistant', 'content': content_text or None} + if reasoning_text: + message['reasoning_content'] = reasoning_text + if tool_calls: + message['tool_calls'] = tool_calls + + usage = _convert_usage(data.get('usageMetadata', {})) + + return { + 'id': request_id, + 'object': 'chat.completion', + 'model': data.get('modelVersion', 'gemini'), + 'choices': [{'index': 0, 'message': message, 'finish_reason': finish_reason}], + 'usage': usage, + } + + +# ═══════════════════════════════════════════════════════════ +# 流式转换: Gemini SSE → CC chunks +# ═══════════════════════════════════════════════════════════ + + +class GeminiStreamConverter: + """将 Gemini SSE chunk 逐个转换为 CC chunk。 + + Gemini 流式每个 SSE data 是一个完整的 GenerateContentResponse, + 包含 candidates[0].content.parts。 + """ + + def __init__(self, request_id: str | None = None): + self._id = request_id or gen_id('chatcmpl-') + self._tool_call_index = 0 + self._started = False + + def process_chunk(self, data: JsonDict) -> list[JsonDict]: + """处理一个 Gemini SSE chunk,返回 CC chunk 列表。""" + results: list[JsonDict] = [] + candidates = data.get('candidates', []) + if not candidates: + return results + + candidate = candidates[0] + parts = candidate.get('content', {}).get('parts', []) + + if not self._started: + self._started = True + results.append(self._make_chunk({'role': 'assistant', 'content': ''})) + + for part in parts: + if part.get('thought') and part.get('text'): + results.append(self._make_chunk({'reasoning_content': part['text']})) + elif 'text' in part and not part.get('thought'): + results.append(self._make_chunk({'content': part['text']})) + elif 'functionCall' in part: + fc = part['functionCall'] + results.append(self._make_chunk({'tool_calls': [{ + 'index': self._tool_call_index, + 'id': fc.get('id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': fc.get('name', ''), + 'arguments': json.dumps(fc.get('args', {}), ensure_ascii=False), + }, + }]})) + self._tool_call_index += 1 + + finish = candidate.get('finishReason') + if finish: + has_tools = self._tool_call_index > 0 + if has_tools and finish == 'STOP': + fr = 'tool_calls' + else: + fr = _FINISH_REASON_MAP.get(finish, 'stop') + chunk = self._make_chunk({}, finish_reason=fr) + usage_meta = data.get('usageMetadata') + if usage_meta: + chunk['usage'] = _convert_usage(usage_meta) + results.append(chunk) + + return results + + def _make_chunk(self, delta: JsonDict, finish_reason: str | None = None) -> JsonDict: + choice: JsonDict = {'index': 0, 'delta': delta} + if finish_reason: + choice['finish_reason'] = finish_reason + return { + 'id': self._id, + 'object': 'chat.completion.chunk', + 'model': 'gemini', + 'choices': [choice], + } + + +# ═══════════════════════════════════════════════════════════ +# 请求转换辅助 +# ═══════════════════════════════════════════════════════════ + + +def _convert_message(msg: JsonDict) -> JsonDict | None: + """将单条 CC 消息转为 Gemini Content。""" + role = msg.get('role', '') + gemini_role = 'model' if role == 'assistant' else 'user' + parts: list[JsonDict] = [] + + if role == 'tool': + return { + 'role': 'user', + 'parts': [{ + 'functionResponse': { + 'name': msg.get('name', msg.get('tool_call_id', '')), + 'response': _parse_json_safe(msg.get('content', '')), + }, + }], + } + + if msg.get('reasoning_content'): + parts.append({'text': msg['reasoning_content'], 'thought': True}) + + content = msg.get('content') + if isinstance(content, str) and content: + parts.append({'text': content}) + elif isinstance(content, list): + for block in content: + if not isinstance(block, dict): + continue + if block.get('type') == 'text': + parts.append({'text': block.get('text', '')}) + elif block.get('type') == 'image_url': + img = _convert_image_part(block) + if img: + parts.append(img) + + for tc in msg.get('tool_calls', []): + func = tc.get('function', {}) + parts.append({ + 'functionCall': { + 'name': func.get('name', ''), + 'args': _parse_json_safe(func.get('arguments', '{}')), + }, + }) + + if not parts: + return None + return {'role': gemini_role, 'parts': parts} + + +def _convert_image_part(block: JsonDict) -> JsonDict | None: + """将 OpenAI image_url 转为 Gemini inlineData。""" + url_data = block.get('image_url', {}) + url = url_data.get('url', '') if isinstance(url_data, dict) else str(url_data) + if url.startswith('data:'): + media_type, _, b64 = url.partition(';base64,') + return {'inlineData': { + 'mimeType': media_type.replace('data:', '') or 'image/png', + 'data': b64, + }} + return None + + +def _build_generation_config(payload: JsonDict) -> JsonDict: + """从 CC payload 构建 Gemini generationConfig。""" + config: JsonDict = {} + if 'max_tokens' in payload: + config['maxOutputTokens'] = payload['max_tokens'] + elif 'max_completion_tokens' in payload: + config['maxOutputTokens'] = payload['max_completion_tokens'] + if 'temperature' in payload: + config['temperature'] = payload['temperature'] + if 'top_p' in payload: + config['topP'] = payload['top_p'] + stop = payload.get('stop') + if stop: + config['stopSequences'] = stop if isinstance(stop, list) else [stop] + return config + + +def _convert_tools(tools: Any) -> list[JsonDict] | None: + """将 CC tools 转为 Gemini functionDeclarations。""" + if not isinstance(tools, list) or not tools: + return None + declarations: list[JsonDict] = [] + for tool in tools: + if not isinstance(tool, dict): + continue + func = tool.get('function', tool) if tool.get('type') == 'function' else tool + if 'name' not in func: + continue + decl: JsonDict = { + 'name': func.get('name', ''), + 'description': func.get('description', ''), + } + params = func.get('parameters') + if params: + decl['parameters'] = params + declarations.append(decl) + if not declarations: + return None + return [{'functionDeclarations': declarations}] + + +# ═══════════════════════════════════════════════════════════ +# 响应转换辅助 +# ═══════════════════════════════════════════════════════════ + + +def _extract_parts(parts: list[Any]) -> tuple[str, str, list[JsonDict]]: + """从 Gemini parts 中提取文本、思考内容和工具调用。""" + text = '' + reasoning = '' + tool_calls: list[JsonDict] = [] + + for part in parts: + if not isinstance(part, dict): + continue + if part.get('thought') and 'text' in part: + reasoning += part['text'] + elif 'text' in part: + text += part['text'] + elif 'functionCall' in part: + fc = part['functionCall'] + tool_calls.append({ + 'index': len(tool_calls), + 'id': fc.get('id') or gen_id('call_'), + 'type': 'function', + 'function': { + 'name': fc.get('name', ''), + 'arguments': json.dumps(fc.get('args', {}), ensure_ascii=False), + }, + }) + + return text, reasoning, tool_calls + + +def _convert_usage(meta: JsonDict) -> JsonDict: + """将 Gemini usageMetadata 转为 CC usage。""" + prompt = meta.get('promptTokenCount', 0) + candidates = meta.get('candidatesTokenCount', 0) + thoughts = meta.get('thoughtsTokenCount', 0) + completion = candidates + thoughts + return { + 'prompt_tokens': prompt, + 'completion_tokens': completion, + 'total_tokens': prompt + completion, + } + + +def _merge_same_role(contents: list[JsonDict]) -> list[JsonDict]: + """合并相邻同角色的 Gemini contents。""" + if not contents: + return contents + merged = [contents[0]] + for c in contents[1:]: + if c['role'] == merged[-1]['role']: + merged[-1]['parts'].extend(c['parts']) + else: + merged.append(c) + return merged + + +def _flatten_text(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + return '\n'.join( + p.get('text', '') if isinstance(p, dict) else str(p) + for p in content + ) + return str(content) + + +def _parse_json_safe(text: Any) -> Any: + if not isinstance(text, str): + return text if text is not None else {} + try: + return json.loads(text) + except (json.JSONDecodeError, ValueError): + return {'result': text} if text else {} diff --git a/adapters/openai_compat_fixer.py b/adapters/openai_compat_fixer.py index e73d7d1..8a2d252 100644 --- a/adapters/openai_compat_fixer.py +++ b/adapters/openai_compat_fixer.py @@ -311,6 +311,7 @@ def _fix_stream_choice(choice: Any) -> None: _promote_reasoning_field(delta) _convert_legacy_delta_function_call(delta, choice) + _sanitize_tool_call_deltas(delta) _ensure_stream_tool_calls(delta) _rewrite_function_call_finish_reason(choice) @@ -332,6 +333,25 @@ def _convert_legacy_delta_function_call(delta: JsonDict, choice: JsonDict) -> No _rewrite_function_call_finish_reason(choice) +def _sanitize_tool_call_deltas(delta: JsonDict) -> None: + """清理流式 tool_calls 中的空白字段。 + + 某些 OpenAI 兼容提供商在后续 tool_calls chunk 中错误地发送空字符串的 + id/type/function.name,导致 Cursor 用空值覆盖真实值。 + 不处理 function.arguments,因为空字符串是合法的增量拼接值。 + """ + for tc in delta.get('tool_calls') or []: + if not isinstance(tc, dict): + continue + if 'id' in tc and not str(tc['id']).strip(): + del tc['id'] + if 'type' in tc and not str(tc['type']).strip(): + del tc['type'] + func = tc.get('function') + if isinstance(func, dict) and 'name' in func and not str(func['name']).strip(): + del func['name'] + + def _ensure_stream_tool_calls(delta: JsonDict) -> None: """补全流式 tool_calls 的最小必需字段。 diff --git a/routes/admin.py b/routes/admin.py index e9241b0..fbb62a5 100644 --- a/routes/admin.py +++ b/routes/admin.py @@ -188,6 +188,19 @@ def delete_mapping(name): return jsonify({'ok': True}) +# ─── 用量统计 ───────────────────────────────────── + + +@bp.route('/api/admin/stats', methods=['GET']) +def get_stats(): + """返回运行时用量统计数据。""" + err = _check_auth() + if err: + return err + from utils.usage_tracker import usage_tracker + return jsonify(usage_tracker.get_stats()) + + # ─── 内部辅助 ───────────────────────────────────── diff --git a/routes/chat.py b/routes/chat.py index 6ef1f7a..63eb18c 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -18,6 +18,11 @@ from adapters.cc_anthropic_adapter import ( cc_to_messages_request, messages_to_cc_response, ) +from adapters.cc_gemini_adapter import ( + GeminiStreamConverter, + cc_to_gemini_request, + gemini_to_cc_response, +) from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request from adapters.responses_cc_adapter import ( ResponsesToCCStreamConverter, @@ -31,6 +36,7 @@ from routes.common import ( apply_body_modifications, apply_header_modifications, build_anthropic_target, + build_gemini_target, build_openai_target, build_responses_target, build_route_context, @@ -44,12 +50,27 @@ from routes.common import ( ) from utils.http import ( forward_request, + gen_id, iter_anthropic_sse, + iter_gemini_sse, iter_openai_sse, iter_responses_sse, sse_response, ) +from utils.request_logger import ( + append_client_event, + append_upstream_event, + attach_client_response, + attach_error, + attach_upstream_request, + attach_upstream_response, + finalize_turn, + set_stream_summary, + start_turn, +) from utils.think_tag import ThinkTagExtractor +from utils.thinking_cache import thinking_cache +from utils.usage_tracker import usage_tracker logger = logging.getLogger(__name__) @@ -65,21 +86,36 @@ def _dbg(message: str) -> None: @bp.route('/v1/chat/completions', methods=['POST']) def chat_completions(): """处理聊天补全请求并按模型映射分发到不同后端。""" - payload = request.get_json(force=True) - payload, message_count = _normalize_chat_payload(payload) + original_payload = request.get_json(force=True) + payload, message_count = _normalize_chat_payload(json.loads(json.dumps(original_payload, ensure_ascii=False, default=str))) client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) ctx = build_route_context(client_model, is_stream) + turn = start_turn( + route='chat', + client_model=client_model, + backend=ctx.backend, + stream=is_stream, + client_request=original_payload, + request_headers=dict(request.headers), + target_url=ctx.target_url, + upstream_model=ctx.upstream_model, + metadata={'message_count': message_count}, + ) log_route_context('聊天补全', ctx, extra=f'消息数={message_count}') _log_messages(payload) + payload['messages'] = thinking_cache.inject(payload.get('messages', [])) + if ctx.backend == 'openai': - return _handle_openai_backend(ctx, payload) + return _handle_openai_backend(ctx, payload, turn) if ctx.backend == 'responses': - return _handle_responses_backend(ctx, payload) - return _handle_anthropic_backend(ctx, payload) + return _handle_responses_backend(ctx, payload, turn) + if ctx.backend == 'gemini': + return _handle_gemini_backend(ctx, payload, turn) + return _handle_anthropic_backend(ctx, payload, turn) def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], int]: @@ -100,7 +136,7 @@ def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], in return payload, message_count -def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any]): +def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any]): """处理走 OpenAI 兼容后端的聊天补全请求。""" _dbg( '原始请求字段=' + str(list(payload.keys())) + ' ' @@ -124,8 +160,8 @@ def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any]): headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_openai_stream(ctx, payload, url, headers) - return _handle_openai_non_stream(ctx, payload, url, headers) + return _handle_openai_stream(ctx, payload, url, headers, turn) + return _handle_openai_non_stream(ctx, payload, url, headers, turn) def _handle_openai_non_stream( @@ -133,18 +169,23 @@ def _handle_openai_non_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any], ): """处理 OpenAI 兼容后端的非流式返回。""" payload['stream'] = False + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err raw = resp.json() + attach_upstream_response(turn, raw) _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = fix_response(raw) - return _finalize_chat_response(ctx, data, debug_label='修复后响应') + return _finalize_chat_response(ctx, data, turn=turn, debug_label='修复后响应') def _handle_openai_stream( @@ -152,29 +193,56 @@ def _handle_openai_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any], ): """处理 OpenAI 兼容后端的流式返回。""" payload['stream'] = True def generate(): """消费上游 OpenAI SSE,并逐段产出给 Cursor 的聊天补全流。""" + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload, stream=True) if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield chat_error_chunk(str(err)) return think_extractor = ThinkTagExtractor() chunk_count = 0 + last_usage = None + client_chunks: list[dict[str, Any]] = [] for chunk in iter_openai_sse(resp): if chunk is None: _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') close_chunk = think_extractor.finalize() if close_chunk: + client_chunks.append(close_chunk) + append_client_event(turn, {'type': 'chat_chunk', 'data': close_chunk}) yield sse_data_message(close_chunk) + append_client_event(turn, {'type': 'done'}) yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunks': client_chunks, + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) return + append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) + if chunk.get('usage'): + last_usage = chunk['usage'] + if chunk_count < 10: _dbg( f'上游原始片段#{chunk_count}=' @@ -185,6 +253,8 @@ def _handle_openai_stream( chunk['model'] = ctx.client_model for out in think_extractor.process_chunk(chunk): + client_chunks.append(out) + append_client_event(turn, {'type': 'chat_chunk', 'data': out}) if chunk_count < 10: _dbg( f'返回片段#{chunk_count}=' @@ -194,10 +264,25 @@ def _handle_openai_stream( chunk_count += 1 + usage_tracker.record(ctx.client_model, last_usage) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_chunk_count': len(client_chunks), + 'usage': last_usage, + 'ended_without_done': True, + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunks': client_chunks, + 'usage': last_usage, + }) + finalize_turn(turn, usage=last_usage) + return sse_response(generate()) -def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any]): +def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): """处理走原生 Responses 后端的聊天补全请求。 当上游只支持 `/v1/responses` 时,需要先把聊天补全请求转换为 Responses 请求, @@ -216,8 +301,8 @@ def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any]): headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_responses_stream(ctx, responses_payload, url, headers) - return _handle_responses_non_stream(ctx, responses_payload, url, headers) + return _handle_responses_stream(ctx, responses_payload, url, headers, turn) + return _handle_responses_non_stream(ctx, responses_payload, url, headers, turn) def _handle_responses_non_stream( @@ -225,18 +310,23 @@ def _handle_responses_non_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理原生 Responses 后端的非流式返回。""" payload['stream'] = False + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err raw = resp.json() + attach_upstream_response(turn, raw) _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = responses_to_cc_response(raw, ctx.client_model) - return _finalize_chat_response(ctx, data, debug_label='Responses 转回聊天补全后') + return _finalize_chat_response(ctx, data, turn=turn, debug_label='Responses 转回聊天补全后') def _handle_responses_stream( @@ -244,6 +334,7 @@ def _handle_responses_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理原生 Responses 后端的流式返回。""" payload['stream'] = True @@ -251,13 +342,19 @@ def _handle_responses_stream( def generate(): """消费上游 Responses 事件,并实时转换成聊天补全 chunk。""" + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload, stream=True) if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield chat_error_chunk(str(err)) return event_count = 0 + client_chunks: list[Any] = [] for event_type, event_data in iter_responses_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' @@ -265,6 +362,8 @@ def _handle_responses_stream( ) for chunk in converter.process_event(event_type, event_data): + client_chunks.append(chunk) + append_client_event(turn, {'type': 'chat_chunk', 'data': chunk}) if event_count < 10: _dbg( f'返回片段#{event_count}=' @@ -275,12 +374,126 @@ def _handle_responses_stream( event_count += 1 _dbg(f'流式响应结束,共 {event_count} 个事件') + append_client_event(turn, {'type': 'done'}) yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_chunk_count': len(client_chunks), + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunks': client_chunks, + }) + finalize_turn(turn) return sse_response(generate()) -def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any]): +def _handle_gemini_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走 Gemini Contents 后端的聊天补全请求。""" + payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) + gemini_payload = cc_to_gemini_request(payload) + _dbg( + '已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys())) + + f' 内容数={len(gemini_payload.get("contents", []))}' + ) + + url, headers = build_gemini_target(ctx, stream=ctx.is_stream) + gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) + return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) + + +def _handle_gemini_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的非流式返回。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + data = gemini_to_cc_response(raw) + return _finalize_chat_response(ctx, data, turn=turn, debug_label='Gemini 转回聊天补全后') + + +def _handle_gemini_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的流式返回。""" + converter = GeminiStreamConverter() + + def generate(): + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield chat_error_chunk(str(err)) + return + + chunk_count = 0 + client_chunks: list[Any] = [] + for gemini_chunk in iter_gemini_sse(resp): + append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) + if chunk_count < 10: + _dbg( + f'上游 Gemini 片段#{chunk_count}=' + + json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500] + ) + + for cc_chunk in converter.process_chunk(gemini_chunk): + cc_chunk['model'] = ctx.client_model + client_chunks.append(cc_chunk) + append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk}) + if chunk_count < 10: + _dbg( + f'返回片段#{chunk_count}=' + + json.dumps(cc_chunk, ensure_ascii=False, default=str)[:500] + ) + yield sse_data_message(cc_chunk) + + chunk_count += 1 + + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + append_client_event(turn, {'type': 'done'}) + yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_chunk_count': len(client_chunks), + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunks': client_chunks, + }) + finalize_turn(turn) + + return sse_response(generate()) + + +def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): """处理走 Anthropic Messages 后端的聊天补全请求。""" payload['model'] = ctx.upstream_model anthropic_payload = cc_to_messages_request(payload) @@ -295,8 +508,8 @@ def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any]): headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_anthropic_stream(ctx, anthropic_payload, url, headers) - return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers) + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) def _handle_anthropic_non_stream( @@ -304,18 +517,23 @@ def _handle_anthropic_non_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理 Anthropic 后端的非流式返回。""" payload['stream'] = False + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err raw = resp.json() + attach_upstream_response(turn, raw) _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = messages_to_cc_response(raw) - return _finalize_chat_response(ctx, data, debug_label='Messages 转回聊天补全后') + return _finalize_chat_response(ctx, data, turn=turn, debug_label='Messages 转回聊天补全后') def _handle_anthropic_stream( @@ -323,6 +541,7 @@ def _handle_anthropic_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理 Anthropic 后端的流式返回。 @@ -334,13 +553,19 @@ def _handle_anthropic_stream( def generate(): """消费上游 Anthropic 事件流,并逐步映射为聊天补全 SSE。""" + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload, stream=True) if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield chat_error_chunk(str(err)) return event_count = 0 + client_chunks: list[Any] = [] for event_type, event_data in iter_anthropic_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' @@ -355,6 +580,8 @@ def _handle_anthropic_stream( except (json.JSONDecodeError, TypeError): pass + client_chunks.append(chunk_str) + append_client_event(turn, {'type': 'chat_chunk', 'data': chunk_str}) if event_count < 10: _dbg(f'返回片段#{event_count}={chunk_str[:500]}') yield sse_data_message(chunk_str) @@ -362,7 +589,19 @@ def _handle_anthropic_stream( event_count += 1 _dbg(f'流式响应结束,共 {event_count} 个事件') + append_client_event(turn, {'type': 'done'}) yield sse_data_message('[DONE]') + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_chunk_count': len(client_chunks), + }) + attach_client_response(turn, { + 'type': 'chat.completion.stream.summary', + 'model': ctx.client_model, + 'chunks': client_chunks, + }) + finalize_turn(turn) return sse_response(generate()) @@ -371,6 +610,7 @@ def _finalize_chat_response( ctx: RouteContext, data: dict[str, Any], *, + turn: dict[str, Any] | None, debug_label: str, ): """统一收尾非流式聊天补全响应。 @@ -383,6 +623,20 @@ def _finalize_chat_response( data['model'] = ctx.client_model _dbg(debug_label + '=' + json.dumps(data, ensure_ascii=False, default=str)[:1000]) log_usage('聊天补全', data.get('usage', {}), input_key='prompt_tokens', output_key='completion_tokens') + + usage_tracker.record(ctx.client_model, data.get('usage')) + attach_client_response(turn, data) + finalize_turn(turn, usage=data.get('usage')) + + for choice in data.get('choices', []): + msg = choice.get('message', {}) + if msg.get('reasoning_content'): + thinking_cache.store_from_response( + request.get_json(silent=True, force=True).get('messages', []), + msg['reasoning_content'], + ) + break + return jsonify(data) diff --git a/routes/common.py b/routes/common.py index 3ddcb72..0ad7518 100644 --- a/routes/common.py +++ b/routes/common.py @@ -12,7 +12,7 @@ import logging from typing import Any import settings -from utils.http import build_anthropic_headers, build_openai_headers +from utils.http import build_anthropic_headers, build_gemini_headers, build_openai_headers logger = logging.getLogger(__name__) @@ -76,6 +76,22 @@ def build_anthropic_target(ctx: RouteContext) -> tuple[str, dict[str, str]]: return url, headers +def build_gemini_target(ctx: RouteContext, stream: bool = False) -> tuple[str, dict[str, str]]: + """根据路由上下文生成 Gemini 后端的地址和请求头。 + + Gemini URL 格式: {base}/v1/models/{model}:generateContent + 流式: {base}/v1/models/{model}:streamGenerateContent?alt=sse + """ + base = ctx.target_url.rstrip('/') + model = ctx.upstream_model + if stream: + url = f'{base}/v1/models/{model}:streamGenerateContent?alt=sse' + else: + url = f'{base}/v1/models/{model}:generateContent' + headers = build_gemini_headers(ctx.api_key) + return url, headers + + def log_route_context(route_name: str, ctx: RouteContext, *, extra: str = '') -> None: """统一输出路由级日志,避免不同入口的日志格式逐渐漂移。""" parts = [ diff --git a/routes/messages.py b/routes/messages.py index 73637a2..f25da8b 100644 --- a/routes/messages.py +++ b/routes/messages.py @@ -15,6 +15,17 @@ import settings from config import Config from routes.common import apply_body_modifications, apply_header_modifications, inject_instructions_anthropic from utils.http import build_anthropic_headers, forward_request, sse_response +from utils.request_logger import ( + append_client_event, + append_upstream_event, + attach_client_response, + attach_error, + attach_upstream_request, + attach_upstream_response, + finalize_turn, + set_stream_summary, + start_turn, +) logger = logging.getLogger(__name__) @@ -24,7 +35,8 @@ bp = Blueprint('messages', __name__) @bp.route('/v1/messages', methods=['POST']) def messages_passthrough(): """透传 Anthropic Messages 请求,并在必要时补齐 thinking 兼容层。""" - payload = request.get_json(force=True) + original_payload = request.get_json(force=True) + payload = json.loads(json.dumps(original_payload, ensure_ascii=False, default=str)) model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) @@ -41,20 +53,37 @@ def messages_passthrough(): headers = apply_header_modifications(headers, header_mods) url = f'{url_base.rstrip("/")}/v1/messages' + turn = start_turn( + route='messages', + client_model=model, + backend='anthropic', + stream=is_stream, + client_request=original_payload, + request_headers=dict(request.headers), + target_url=url_base, + upstream_model=model, + ) + payload = inject_instructions_anthropic(payload, custom_instructions, instructions_position) payload = apply_body_modifications(payload, body_mods) if not is_stream: + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err data = resp.json() + attach_upstream_response(turn, data) _inject_thinking(data) + attach_client_response(turn, data) + finalize_turn(turn) return jsonify(data) - # 流式透传 def generate(): """建立上游流式连接并逐段回传处理后的 SSE 数据。""" + attach_upstream_request(turn, payload, headers) try: resp = req_lib.post( url, headers=headers, json=payload, @@ -63,12 +92,28 @@ def messages_passthrough(): if resp.status_code != 200: body = resp.content.decode('utf-8', errors='replace') logger.warning(f'上游返回 {resp.status_code}: {body[:300]}') + attach_error(turn, {'stage': 'upstream_status', 'status_code': resp.status_code, 'message': body}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield f'data: {json.dumps({"error": {"message": body, "type": "upstream_error"}})}\n\n' return - yield from _process_stream(resp) + summary = {'upstream_event_count': 0, 'client_event_count': 0} + client_events = [] + for out in _process_stream(resp, turn=turn, summary=summary): + client_events.append(out) + yield out + set_stream_summary(turn, summary) + attach_client_response(turn, { + 'type': 'messages.stream.summary', + 'events': client_events, + }) + finalize_turn(turn) except req_lib.RequestException as e: logger.error(f'请求上游失败: {e}') + attach_error(turn, {'stage': 'request_exception', 'message': str(e)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield f'data: {json.dumps({"error": {"message": str(e), "type": "proxy_error"}})}\n\n' return sse_response(generate()) @@ -96,7 +141,7 @@ def _inject_thinking(data): logger.info(f'已注入 thinking block ({len(rc)} 字符)') -def _process_stream(resp): +def _process_stream(resp, *, turn=None, summary: dict[str, int] | None = None): """处理 /v1/messages 流式响应,检测并注入 thinking 事件 追踪上游 content block 的 index,在注入 thinking blocks 时使用独立的 index, @@ -105,11 +150,15 @@ def _process_stream(resp): reasoning_buf = '' injected = False index_offset = 0 + if summary is None: + summary = {'upstream_event_count': 0, 'client_event_count': 0} for line in resp.iter_lines(): if not line: continue decoded = line.decode('utf-8', errors='replace') + append_upstream_event(turn, {'raw': decoded}) + summary['upstream_event_count'] += 1 if not decoded.startswith('data:'): yield decoded + '\n\n' @@ -140,7 +189,10 @@ def _process_stream(resp): if reasoning_buf and not injected: if event_data.get('delta', {}).get('type') == 'text_delta': injected = True - yield from _emit_thinking_blocks(reasoning_buf) + for injected_event in _emit_thinking_blocks(reasoning_buf): + append_client_event(turn, {'raw': injected_event}) + summary['client_event_count'] += 1 + yield injected_event index_offset = 1 reasoning_buf = '' @@ -148,7 +200,10 @@ def _process_stream(resp): event_data['index'] = event_data['index'] + index_offset modified = True - yield f'data: {json.dumps(event_data)}\n\n' if modified else decoded + '\n\n' + output = f'data: {json.dumps(event_data)}\n\n' if modified else decoded + '\n\n' + append_client_event(turn, {'raw': output}) + summary['client_event_count'] += 1 + yield output def _emit_thinking_blocks(text): diff --git a/routes/responses.py b/routes/responses.py index e1b4b62..6df5166 100644 --- a/routes/responses.py +++ b/routes/responses.py @@ -13,6 +13,7 @@ from typing import Any from flask import Blueprint, jsonify, request from adapters.cc_anthropic_adapter import cc_to_messages_request, messages_to_cc_response +from adapters.cc_gemini_adapter import GeminiStreamConverter, cc_to_gemini_request, gemini_to_cc_response from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request from adapters.responses_cc_adapter import ResponsesStreamConverter, cc_to_responses, responses_to_cc from config import Config @@ -21,6 +22,7 @@ from routes.common import ( apply_body_modifications, apply_header_modifications, build_anthropic_target, + build_gemini_target, build_openai_target, build_responses_target, build_route_context, @@ -33,12 +35,27 @@ from routes.common import ( ) from utils.http import ( forward_request, + gen_id, iter_anthropic_sse, + iter_gemini_sse, iter_openai_sse, iter_responses_sse, sse_response, ) +from utils.request_logger import ( + append_client_event, + append_upstream_event, + attach_client_response, + attach_error, + attach_upstream_request, + attach_upstream_response, + finalize_turn, + set_stream_summary, + start_turn, +) from utils.think_tag import ThinkTagExtractor +from utils.thinking_cache import thinking_cache +from utils.usage_tracker import usage_tracker logger = logging.getLogger(__name__) @@ -54,20 +71,33 @@ def _dbg(message: str) -> None: @bp.route('/v1/responses', methods=['POST']) def responses_endpoint(): """处理 Responses 请求并按模型映射分发。""" - payload = request.get_json(force=True) + original_payload = request.get_json(force=True) + payload = json.loads(json.dumps(original_payload, ensure_ascii=False, default=str)) client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) ctx = build_route_context(client_model, is_stream) + turn = start_turn( + route='responses', + client_model=client_model, + backend=ctx.backend, + stream=is_stream, + client_request=original_payload, + request_headers=dict(request.headers), + target_url=ctx.target_url, + upstream_model=ctx.upstream_model, + ) log_route_context('响应生成', ctx) cc_payload = _build_cc_payload(payload, ctx) if ctx.backend == 'openai': - return _handle_openai_backend(ctx, cc_payload) + return _handle_openai_backend(ctx, cc_payload, turn) if ctx.backend == 'responses': - return _handle_responses_backend(ctx, payload) - return _handle_anthropic_backend(ctx, cc_payload) + return _handle_responses_backend(ctx, payload, turn) + if ctx.backend == 'gemini': + return _handle_gemini_backend(ctx, cc_payload, turn) + return _handle_anthropic_backend(ctx, cc_payload, turn) def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, Any]: @@ -78,6 +108,7 @@ def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, A """ cc_payload = responses_to_cc(payload) cc_payload['model'] = ctx.upstream_model + cc_payload['messages'] = thinking_cache.inject(cc_payload.get('messages', [])) cc_payload = inject_instructions_cc(cc_payload, ctx.custom_instructions, ctx.instructions_position) _dbg( '已转换为聊天补全中间表示:字段=' + str(list(cc_payload.keys())) @@ -86,7 +117,7 @@ def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, A return cc_payload -def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any]): +def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any]): """处理走 OpenAI 兼容后端的 Responses 请求。""" cc_payload = normalize_request(cc_payload) _dbg( @@ -99,8 +130,8 @@ def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any]): headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_openai_stream(ctx, cc_payload, url, headers) - return _handle_openai_non_stream(ctx, cc_payload, url, headers) + return _handle_openai_stream(ctx, cc_payload, url, headers, turn) + return _handle_openai_non_stream(ctx, cc_payload, url, headers, turn) def _handle_openai_non_stream( @@ -108,19 +139,24 @@ def _handle_openai_non_stream( cc_payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any], ): """处理 OpenAI 兼容后端的非流式 Responses 返回。""" cc_payload['stream'] = False + attach_upstream_request(turn, cc_payload, headers) resp, err = forward_request(url, headers, cc_payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err raw = resp.json() + attach_upstream_response(turn, raw) _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) fixed = fix_response(raw) response_data = cc_to_responses(fixed, ctx.client_model) - return _finalize_responses_response(response_data, debug_label='转换为 Responses 后') + return _finalize_responses_response(response_data, turn=turn, debug_label='转换为 Responses 后') def _handle_openai_stream( @@ -128,6 +164,7 @@ def _handle_openai_stream( cc_payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理 OpenAI 兼容后端的流式 Responses 返回。""" cc_payload['stream'] = True @@ -137,20 +174,41 @@ def _handle_openai_stream( """消费 OpenAI 聊天补全流,并实时改写为 Responses SSE。""" yield from converter.start_events() + attach_upstream_request(turn, cc_payload, headers) resp, err = forward_request(url, headers, cc_payload, stream=True) if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield responses_error_event(str(err)) return think_extractor = ThinkTagExtractor() chunk_count = 0 + client_events: list[str] = [] for chunk in iter_openai_sse(resp): if chunk is None: _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') - yield from converter.finalize() + finalized_events = converter.finalize() + for item in finalized_events: + client_events.append(item) + append_client_event(turn, {'type': 'responses_event', 'data': item}) + yield item + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_event_count': len(client_events), + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'events': client_events, + }) + finalize_turn(turn) return + append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) if chunk_count < 10: _dbg( f'上游原始片段#{chunk_count}=' @@ -159,19 +217,22 @@ def _handle_openai_stream( chunk = fix_stream_chunk(chunk) for out in think_extractor.process_chunk(chunk): - if chunk_count < 10: - _dbg( - f'转换后片段#{chunk_count}=' - + json.dumps(out, ensure_ascii=False, default=str)[:500] - ) - yield from converter.process_cc_chunk(out) + for evt in converter.process_cc_chunk(out): + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + if chunk_count < 10: + _dbg( + f'转换后片段#{chunk_count}=' + + json.dumps(out, ensure_ascii=False, default=str)[:500] + ) + yield evt chunk_count += 1 return sse_response(generate()) -def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any]): +def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): """处理走原生 Responses 后端的请求。 当中转站本身就只支持 `/v1/responses` 时,不需要再绕到聊天补全中间协议, @@ -185,8 +246,8 @@ def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any]): headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_responses_stream(ctx, payload, url, headers) - return _handle_responses_non_stream(ctx, payload, url, headers) + return _handle_responses_stream(ctx, payload, url, headers, turn) + return _handle_responses_non_stream(ctx, payload, url, headers, turn) def _handle_responses_non_stream( @@ -194,16 +255,21 @@ def _handle_responses_non_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理原生 Responses 后端的非流式返回。""" payload['stream'] = False + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err response_data = resp.json() + attach_upstream_response(turn, response_data) response_data['model'] = ctx.client_model - return _finalize_responses_response(response_data, debug_label='原生 Responses 返回后') + return _finalize_responses_response(response_data, turn=turn, debug_label='原生 Responses 返回后') def _handle_responses_stream( @@ -211,6 +277,7 @@ def _handle_responses_stream( payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理原生 Responses 后端的流式返回。""" payload['stream'] = True @@ -218,27 +285,151 @@ def _handle_responses_stream( def generate(): """透传上游原生 Responses 流,并做轻量模型名改写。""" + attach_upstream_request(turn, payload, headers) resp, err = forward_request(url, headers, payload, stream=True) if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield responses_error_event(str(err)) return event_count = 0 + client_events: list[str] = [] for event_type, event_data in iter_responses_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' + json.dumps(event_data, ensure_ascii=False, default=str)[:500] ) - yield from converter.process_responses_event(event_type, event_data) + produced = converter.process_responses_event(event_type, event_data) + for evt in produced: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt event_count += 1 _dbg(f'流式响应结束,共 {event_count} 个事件') + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_event_count': len(client_events), + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'events': client_events, + }) + finalize_turn(turn) return sse_response(generate()) -def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any]): +def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): + """处理走 Gemini Contents 后端的 Responses 请求。""" + gemini_payload = cc_to_gemini_request(cc_payload) + _dbg( + '已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys())) + + f' 内容数={len(gemini_payload.get("contents", []))}' + ) + + url, headers = build_gemini_target(ctx, stream=ctx.is_stream) + gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications) + headers = apply_header_modifications(headers, ctx.header_modifications) + + if ctx.is_stream: + return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) + return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) + + +def _handle_gemini_non_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的非流式 Responses 返回。""" + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) + return err + + raw = resp.json() + attach_upstream_response(turn, raw) + _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) + + cc_data = gemini_to_cc_response(raw) + response_data = cc_to_responses(cc_data, ctx.client_model) + return _finalize_responses_response(response_data, turn=turn, debug_label='Gemini 转回 Responses 后') + + +def _handle_gemini_stream( + ctx: RouteContext, + payload: dict[str, Any], + url: str, + headers: dict[str, str], + turn: dict[str, Any] | None, +): + """处理 Gemini 后端的流式 Responses 返回。""" + converter = ResponsesStreamConverter(model=ctx.client_model) + gemini_converter = GeminiStreamConverter() + + def generate(): + yield from converter.start_events() + + attach_upstream_request(turn, payload, headers) + resp, err = forward_request(url, headers, payload, stream=True) + if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) + yield responses_error_event(str(err)) + return + + chunk_count = 0 + client_events: list[str] = [] + for gemini_chunk in iter_gemini_sse(resp): + append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) + if chunk_count < 10: + _dbg( + f'上游 Gemini 片段#{chunk_count}=' + + json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500] + ) + + for cc_chunk in gemini_converter.process_chunk(gemini_chunk): + for evt in converter.process_cc_chunk(cc_chunk): + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + + chunk_count += 1 + + _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + finalized_events = converter.finalize() + for evt in finalized_events: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'chunk_count': chunk_count, + 'client_event_count': len(client_events), + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'events': client_events, + }) + finalize_turn(turn) + + return sse_response(generate()) + + +def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): """处理走 Anthropic 后端的 Responses 请求。""" anthropic_payload = cc_to_messages_request(cc_payload) _dbg( @@ -251,8 +442,8 @@ def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any]): headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_anthropic_stream(ctx, anthropic_payload, url, headers) - return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers) + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) def _handle_anthropic_non_stream( @@ -260,19 +451,24 @@ def _handle_anthropic_non_stream( anthropic_payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理 Anthropic 后端的非流式 Responses 返回。""" anthropic_payload['stream'] = False + attach_upstream_request(turn, anthropic_payload, headers) resp, err = forward_request(url, headers, anthropic_payload) if err: + attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'}) + finalize_turn(turn) return err raw = resp.json() + attach_upstream_response(turn, raw) _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) cc_data = messages_to_cc_response(raw) response_data = cc_to_responses(cc_data, ctx.client_model) - return _finalize_responses_response(response_data, debug_label='Messages 转回 Responses 后') + return _finalize_responses_response(response_data, turn=turn, debug_label='Messages 转回 Responses 后') def _handle_anthropic_stream( @@ -280,6 +476,7 @@ def _handle_anthropic_stream( anthropic_payload: dict[str, Any], url: str, headers: dict[str, str], + turn: dict[str, Any] | None, ): """处理 Anthropic 后端的流式 Responses 返回。 @@ -293,29 +490,54 @@ def _handle_anthropic_stream( """消费 Anthropic SSE,并直接映射为 Responses 事件序列。""" yield from converter.start_events() + attach_upstream_request(turn, anthropic_payload, headers) resp, err = forward_request(url, headers, anthropic_payload, stream=True) if err: + attach_error(turn, {'stage': 'forward_request', 'message': str(err)}) + set_stream_summary(turn, {'status': 'error'}) + finalize_turn(turn) yield responses_error_event(str(err)) return event_count = 0 + client_events: list[str] = [] for event_type, event_data in iter_anthropic_sse(resp): + append_upstream_event(turn, {'type': event_type, 'data': event_data}) if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' + json.dumps(event_data, ensure_ascii=False, default=str)[:500] ) - yield from converter.process_anthropic_event(event_type, event_data) + produced = converter.process_anthropic_event(event_type, event_data) + for evt in produced: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt event_count += 1 _dbg(f'流式响应结束,共 {event_count} 个事件') - yield from converter.finalize() + finalized_events = converter.finalize() + for evt in finalized_events: + client_events.append(evt) + append_client_event(turn, {'type': 'responses_event', 'data': evt}) + yield evt + usage_tracker.record(ctx.client_model) + set_stream_summary(turn, { + 'event_count': event_count, + 'client_event_count': len(client_events), + }) + attach_client_response(turn, { + 'type': 'responses.stream.summary', + 'model': ctx.client_model, + 'events': client_events, + }) + finalize_turn(turn) return sse_response(generate()) -def _finalize_responses_response(response_data: dict[str, Any], *, debug_label: str): +def _finalize_responses_response(response_data: dict[str, Any], *, turn: dict[str, Any], debug_label: str): """统一收尾非流式 Responses 响应。 两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象,因此这里集中 @@ -324,4 +546,15 @@ def _finalize_responses_response(response_data: dict[str, Any], *, debug_label: response_data['model'] = response_data.get('model') or '' _dbg(debug_label + '=' + json.dumps(response_data, ensure_ascii=False, default=str)[:1000]) log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') + + usage_tracker.record( + response_data.get('model', ''), + response_data.get('usage'), + input_key='input_tokens', + output_key='output_tokens', + ) + + attach_client_response(turn, response_data) + finalize_turn(turn, usage=response_data.get('usage')) + return jsonify(response_data) diff --git a/settings.py b/settings.py index 0ae6939..e8f788f 100644 --- a/settings.py +++ b/settings.py @@ -118,4 +118,8 @@ def _auto_detect(name): 其余模型默认视为 OpenAI 兼容后端。 """ lower = (name or '').lower() - return 'anthropic' if ('claude' in lower or 'anthropic' in lower) else 'openai' + if 'claude' in lower or 'anthropic' in lower: + return 'anthropic' + if 'gemini' in lower: + return 'gemini' + return 'openai' diff --git a/static/admin.css b/static/admin.css index b68fc96..875bbcb 100644 --- a/static/admin.css +++ b/static/admin.css @@ -59,6 +59,7 @@ main{padding:28px 0 60px} .tag-anthropic{background:rgba(249,115,22,.15);color:#fb923c} .tag-openai{background:rgba(16,185,129,.15);color:#34d399} .tag-responses{background:rgba(59,130,246,.15);color:#60a5fa} +.tag-gemini{background:rgba(66,133,244,.15);color:#4285f4} .tag-auto{background:rgba(139,92,246,.15);color:#a78bfa} .tag-override{background:rgba(59,130,246,.1);color:var(--primary)} .tag-instructions{background:rgba(234,179,8,.15);color:var(--yellow)} @@ -66,6 +67,11 @@ main{padding:28px 0 60px} .mapping-actions{margin-left:auto;display:flex;gap:6px} .empty{text-align:center;padding:40px;color:var(--muted)} +.stats-table{width:100%;border-collapse:collapse;font-size:13px} +.stats-table th{text-align:left;color:var(--muted);font-weight:500;padding:8px 12px;border-bottom:1px solid var(--border)} +.stats-table td{padding:8px 12px;border-bottom:1px solid var(--border)} +.stats-table tr:hover{background:var(--surface)} + .modal-overlay{display:none;position:fixed;inset:0;background:rgba(0,0,0,.6);z-index:100;align-items:center;justify-content:center} .modal-overlay.active{display:flex} .modal{background:var(--card);border:1px solid var(--border);border-radius:14px;padding:28px;width:520px;max-width:90vw;max-height:85vh;overflow-y:auto;box-shadow:0 20px 60px rgba(0,0,0,.5)} diff --git a/static/admin.html b/static/admin.html index 3c2f62e..76e7653 100644 --- a/static/admin.html +++ b/static/admin.html @@ -70,6 +70,15 @@
+ + +
+
+

用量统计

+ +
+
加载中…
+
@@ -95,13 +104,15 @@ +
anthropic:转换为 Anthropic Messages 格式 — 适用于中转站通过 /v1/messages 提供 Claude 模型
openai:保持 OpenAI Chat Completions 格式 — 适用于 GPT、DeepSeek、Codex 或通过 /v1/chat/completions 提供所有模型的中转站
responses:保持 OpenAI Responses 格式 — 适用于中转站仅通过 /v1/responses 提供模型能力
- 自动检测:根据上游模型名判断(含 claude → anthropic,其他 → openai) + gemini:转换为 Gemini Contents 格式 — 适用于 Google Gemini API 或兼容的中转站
+ 自动检测:根据上游模型名判断(含 claude → anthropic,含 gemini → gemini,其他 → openai)
diff --git a/static/admin.js b/static/admin.js index df636c3..e4d8899 100644 --- a/static/admin.js +++ b/static/admin.js @@ -70,11 +70,39 @@ async function loadDashboard() { document.getElementById('envKey').textContent = s.env_api_key ? '环境变量: (已配置)' : '环境变量: (未设置)'; await loadMappings(); checkHealth(); + loadStats(); } catch (e) { toast('加载设置失败: ' + e.message, false); } } +async function loadStats() { + const el = document.getElementById('statsContent'); + try { + const data = await api('/api/admin/stats'); + const models = data.models || {}; + const keys = Object.keys(models); + if (!keys.length) { + el.innerHTML = '
暂无请求统计数据
'; + return; + } + const uptime = data.uptime_seconds || 0; + const h = Math.floor(uptime / 3600); + const m = Math.floor((uptime % 3600) / 60); + let html = '
运行时长: ' + h + '小时' + m + '分钟
'; + html += ''; + keys.sort((a, b) => models[b].request_count - models[a].request_count); + for (const name of keys) { + const s = models[name]; + html += ''; + } + html += '
模型请求数输入 Tokens输出 Tokens总 Tokens
' + esc(name) + '' + s.request_count + '' + s.input_tokens.toLocaleString() + '' + s.output_tokens.toLocaleString() + '' + s.total_tokens.toLocaleString() + '
'; + el.innerHTML = html; + } catch (e) { + el.innerHTML = '
加载统计失败
'; + } +} + async function checkHealth() { try { const r = await fetch(API + '/health'); @@ -130,7 +158,9 @@ async function loadMappings() { ? 'tag-responses' : backend === 'openai' ? 'tag-openai' - : 'tag-auto'; + : backend === 'gemini' + ? 'tag-gemini' + : 'tag-auto'; const tagLabel = backend === 'auto' ? '自动' : backend === 'responses' diff --git a/utils/http.py b/utils/http.py index c0f4506..60e14c6 100644 --- a/utils/http.py +++ b/utils/http.py @@ -44,6 +44,16 @@ def build_anthropic_headers(api_key: str) -> dict[str, str]: return headers +def build_gemini_headers(api_key: str) -> dict[str, str]: + """构建 Gemini 请求头,根据密钥前缀选择鉴权方式""" + headers = {'Content-Type': 'application/json'} + if api_key.startswith('AIza'): + headers['x-goog-api-key'] = api_key + else: + headers['Authorization'] = f'Bearer {api_key}' + return headers + + # ─── 响应构建 ────────────────────────────────────── @@ -125,6 +135,26 @@ def iter_responses_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: yield from _iter_event_sse(response) +def iter_gemini_sse(response) -> Iterator[dict[str, Any]]: + """解析 Gemini SSE 流,yield 完整的 GenerateContentResponse 字典。 + + Gemini 流式使用 ?alt=sse,每个 data: 行是一个完整的 JSON 响应。 + """ + for line in response.iter_lines(): + if not line: + continue + decoded = line.decode('utf-8', errors='replace') + if not decoded.startswith('data:'): + continue + data_str = decoded[5:].strip() + if not data_str: + continue + try: + yield json.loads(data_str) + except json.JSONDecodeError: + continue + + def _iter_event_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: """解析带 event/data 的通用 SSE 流。 diff --git a/utils/request_logger.py b/utils/request_logger.py new file mode 100644 index 0000000..67229c1 --- /dev/null +++ b/utils/request_logger.py @@ -0,0 +1,331 @@ +"""对话级文件日志 + +将同一段多轮对话聚合到一个 JSON 文件中,而不是按单次请求散落成多个文件。 +仅在 DEBUG 开启时记录。 +日志目录: data/conversations/YYYY-MM-DD/{conversation_id}.json +""" + +from __future__ import annotations + +import copy +import hashlib +import json +import logging +import os +import threading +from datetime import datetime +from typing import Any + +from config import Config +from settings import DATA_DIR +from utils.http import gen_id + +logger = logging.getLogger(__name__) + +_LOG_DIR = os.path.join(DATA_DIR, 'conversations') +_LOCKS: dict[str, threading.Lock] = {} +_LOCKS_GUARD = threading.Lock() + + +def start_turn( + *, + route: str, + client_model: str, + backend: str, + stream: bool, + client_request: dict[str, Any], + request_headers: dict[str, Any] | None = None, + target_url: str = '', + upstream_model: str = '', + metadata: dict[str, Any] | None = None, +) -> dict[str, Any] | None: + """创建一条新的对话 turn 上下文。""" + if not Config.DEBUG: + return None + + now = datetime.utcnow().isoformat() + 'Z' + conversation_id = get_conversation_id(route=route, payload=client_request) + turn_id = gen_id('turn_') + return { + 'conversation_id': conversation_id, + 'turn_id': turn_id, + 'route': route, + 'client_model': client_model, + 'backend': backend, + 'stream': stream, + 'target_url': target_url, + 'upstream_model': upstream_model, + 'started_at': now, + 'updated_at': now, + 'request_headers': sanitize_headers(request_headers or {}), + 'client_request': deep_copy_jsonable(client_request), + 'metadata': deep_copy_jsonable(metadata or {}), + 'upstream_request': None, + 'upstream_response': None, + 'client_response': None, + 'stream_trace': { + 'upstream_events': [], + 'client_events': [], + 'summary': {}, + }, + 'error': None, + } + + +def get_conversation_id(*, route: str, payload: dict[str, Any]) -> str: + """尽量为同一段多轮对话生成稳定的会话 ID。""" + explicit = _pick_explicit_conversation_id(payload) + if explicit: + return _safe_id(explicit) + + seed = _conversation_seed(route, payload) + digest = hashlib.sha256(seed.encode('utf-8')).hexdigest()[:24] + return f'conv_{digest}' + + +def attach_upstream_request(turn: dict[str, Any] | None, payload: dict[str, Any], headers: dict[str, Any] | None = None) -> None: + """记录最终发往上游的请求。""" + if turn is None: + return + turn['upstream_request'] = { + 'headers': sanitize_headers(headers or {}), + 'body': deep_copy_jsonable(payload), + } + _touch(turn) + + +def attach_upstream_response(turn: dict[str, Any] | None, response_data: Any) -> None: + """记录上游完整非流式响应。""" + if turn is None: + return + turn['upstream_response'] = deep_copy_jsonable(response_data) + _touch(turn) + + +def attach_client_response(turn: dict[str, Any] | None, response_data: Any) -> None: + """记录最终返回给客户端的完整响应。""" + if turn is None: + return + turn['client_response'] = deep_copy_jsonable(response_data) + _touch(turn) + + +def append_upstream_event(turn: dict[str, Any] | None, event: Any) -> None: + """记录一条上游流式事件。""" + if turn is None: + return + turn['stream_trace']['upstream_events'].append(deep_copy_jsonable(event)) + _touch(turn) + + +def append_client_event(turn: dict[str, Any] | None, event: Any) -> None: + """记录一条返回给客户端的流式事件。""" + if turn is None: + return + turn['stream_trace']['client_events'].append(deep_copy_jsonable(event)) + _touch(turn) + + +def set_stream_summary(turn: dict[str, Any] | None, summary: dict[str, Any]) -> None: + """记录流式摘要,例如累计文本、事件数、usage 等。""" + if turn is None: + return + turn['stream_trace']['summary'] = deep_copy_jsonable(summary) + _touch(turn) + + +def attach_error(turn: dict[str, Any] | None, error: Any) -> None: + """记录错误信息。""" + if turn is None: + return + turn['error'] = deep_copy_jsonable(error) + _touch(turn) + + +def finalize_turn( + turn: dict[str, Any] | None, + *, + usage: dict[str, Any] | None = None, + duration_ms: int = 0, +) -> None: + """将 turn 追加/更新到对应的会话日志文件。""" + if turn is None or not Config.DEBUG: + return + + turn['updated_at'] = datetime.utcnow().isoformat() + 'Z' + turn['duration_ms'] = duration_ms + if usage is not None: + turn['usage'] = deep_copy_jsonable(usage) + + threading.Thread(target=_write_turn, args=(deep_copy_jsonable(turn),), daemon=True).start() + + +def sanitize_headers(headers: dict[str, Any]) -> dict[str, Any]: + """对敏感请求头做脱敏。""" + sanitized: dict[str, Any] = {} + for key, value in headers.items(): + key_lower = str(key).lower() + if key_lower in {'authorization', 'x-api-key', 'api-key', 'x-goog-api-key'}: + sanitized[key] = _mask_secret(value) + else: + sanitized[key] = value + return sanitized + + +def deep_copy_jsonable(value: Any) -> Any: + """尽量深拷贝 JSON 兼容数据。""" + try: + return copy.deepcopy(value) + except Exception: + try: + return json.loads(json.dumps(value, ensure_ascii=False, default=str)) + except Exception: + return str(value) + + +def _write_turn(turn: dict[str, Any]) -> None: + conversation_id = turn['conversation_id'] + lock = _get_lock(conversation_id) + with lock: + try: + date_str = turn['started_at'][:10] + day_dir = os.path.join(_LOG_DIR, date_str) + os.makedirs(day_dir, exist_ok=True) + filepath = os.path.join(day_dir, f'{conversation_id}.json') + + if os.path.exists(filepath): + with open(filepath, 'r', encoding='utf-8') as f: + doc = json.load(f) + else: + doc = { + 'conversation_id': conversation_id, + 'route': turn.get('route', ''), + 'created_at': turn['started_at'], + 'updated_at': turn['updated_at'], + 'turns': [], + } + + turns = doc.setdefault('turns', []) + replaced = False + for index, existing in enumerate(turns): + if existing.get('turn_id') == turn.get('turn_id'): + turns[index] = turn + replaced = True + break + if not replaced: + turns.append(turn) + + doc['updated_at'] = turn['updated_at'] + doc['last_client_model'] = turn.get('client_model', '') + doc['last_backend'] = turn.get('backend', '') + doc['turn_count'] = len(turns) + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(doc, f, ensure_ascii=False, indent=2, default=str) + except OSError as e: + logger.warning('写入对话日志失败: %s', e) + except json.JSONDecodeError as e: + logger.warning('解析对话日志失败: %s', e) + + +def _get_lock(conversation_id: str) -> threading.Lock: + with _LOCKS_GUARD: + if conversation_id not in _LOCKS: + _LOCKS[conversation_id] = threading.Lock() + return _LOCKS[conversation_id] + + +def _touch(turn: dict[str, Any] | None) -> None: + if turn is None: + return + turn['updated_at'] = datetime.utcnow().isoformat() + 'Z' + + +def _pick_explicit_conversation_id(payload: dict[str, Any]) -> str: + candidates = ( + payload.get('conversation_id'), + payload.get('conversationId'), + payload.get('session_id'), + payload.get('sessionId'), + payload.get('chat_id'), + payload.get('chatId'), + payload.get('metadata', {}).get('conversation_id') if isinstance(payload.get('metadata'), dict) else None, + payload.get('metadata', {}).get('session_id') if isinstance(payload.get('metadata'), dict) else None, + ) + for item in candidates: + if isinstance(item, str) and item.strip(): + return item.strip() + return '' + + +def _conversation_seed(route: str, payload: dict[str, Any]) -> str: + if route == 'chat': + messages = payload.get('messages', []) + return 'chat|' + _normalize_messages_seed(messages) + + if route == 'responses': + instructions = payload.get('instructions') or '' + input_data = payload.get('input', []) + if isinstance(input_data, str): + seed_input = input_data + else: + seed_input = json.dumps(input_data, ensure_ascii=False, default=str) + return 'responses|' + instructions + '|' + seed_input + + if route == 'messages': + messages = payload.get('messages', []) + system = payload.get('system', '') + return 'messages|' + str(system) + '|' + json.dumps(messages, ensure_ascii=False, default=str) + + return route + '|' + json.dumps(payload, ensure_ascii=False, default=str) + + +def _normalize_messages_seed(messages: Any) -> str: + if not isinstance(messages, list): + return '' + normalized: list[dict[str, Any]] = [] + for msg in messages: + if not isinstance(msg, dict): + continue + normalized.append({ + 'role': msg.get('role', ''), + 'content': _normalize_content(msg.get('content')), + 'tool_call_id': msg.get('tool_call_id', ''), + 'tool_calls': [ + { + 'id': tc.get('id', ''), + 'name': (tc.get('function') or {}).get('name', ''), + } + for tc in msg.get('tool_calls', []) + if isinstance(tc, dict) + ], + }) + return json.dumps(normalized, ensure_ascii=False, separators=(',', ':')) + + +def _normalize_content(content: Any) -> Any: + if isinstance(content, str): + return content + if isinstance(content, list): + result = [] + for item in content: + if isinstance(item, dict): + result.append(item) + else: + result.append(str(item)) + return result + if content is None: + return '' + return str(content) + + +def _safe_id(raw: str) -> str: + cleaned = ''.join(ch if ch.isalnum() or ch in ('-', '_', '.') else '_' for ch in raw.strip()) + return cleaned[:120] or gen_id('conv_') + + +def _mask_secret(value: Any) -> str: + text = str(value or '') + if len(text) <= 8: + return '***' + return text[:4] + '***' + text[-4:] diff --git a/utils/thinking_cache.py b/utils/thinking_cache.py new file mode 100644 index 0000000..0a0b797 --- /dev/null +++ b/utils/thinking_cache.py @@ -0,0 +1,150 @@ +"""轻量 Thinking 缓存 + +纯内存缓存,在多轮对话中保存和恢复 thinking/reasoning 内容。 +解决 Cursor 不会把 thinking 内容回传给 API 的问题, +某些模型(如推理模型)在缺少历史 thinking 时表现会下降。 +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import re +import time +from typing import Any + +logger = logging.getLogger(__name__) + +_THINK_RE = re.compile(r'.*?', re.DOTALL) +_UNCLOSED_THINK_RE = re.compile(r'.*$', re.DOTALL) +_TOOL_ID_RE = re.compile(r'[^a-zA-Z0-9_-]') +_TTL = 86400 # 24 hours + + +class ThinkingCache: + """纯内存 thinking 缓存,TTL 2 小时。""" + + def __init__(self): + self._store: dict[str, tuple[str, float]] = {} + + def inject(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]: + """遍历 assistant 消息,缺少 reasoning_content 时从缓存注入。""" + sid = self._session_id(messages) + if not sid: + return messages + + now = time.time() + for msg in messages: + if msg.get('role') != 'assistant': + continue + if msg.get('reasoning_content'): + continue + key = sid + ':' + self._message_hash(msg) + entry = self._store.get(key) + if entry and (now - entry[1]) < _TTL: + msg['reasoning_content'] = entry[0] + logger.debug('已从缓存注入 thinking (%d 字符)', len(entry[0])) + + return messages + + def store_from_response( + self, + messages: list[dict[str, Any]], + reasoning_content: str, + ) -> None: + """将响应中的 thinking 内容存入缓存。""" + if not reasoning_content: + return + sid = self._session_id(messages) + if not sid: + return + + fake_msg: dict[str, Any] = {'role': 'assistant', 'content': '', 'tool_calls': []} + key = sid + ':' + self._message_hash(fake_msg) + self._store[key] = (reasoning_content, time.time()) + self._cleanup() + + def store_assistant_thinking( + self, + messages: list[dict[str, Any]], + assistant_msg: dict[str, Any], + ) -> None: + """从完整的 assistant 消息中提取并缓存 thinking。""" + rc = assistant_msg.get('reasoning_content', '') + if not rc: + return + sid = self._session_id(messages) + if not sid: + return + key = sid + ':' + self._message_hash(assistant_msg) + self._store[key] = (rc, time.time()) + self._cleanup() + + def _session_id(self, messages: list[dict[str, Any]]) -> str: + first_user = '' + first_assistant = '' + for msg in messages: + role = msg.get('role', '') + if role in ('system', 'developer'): + continue + if role == 'user' and not first_user: + first_user = self._normalize_content( + msg.get('content', '') + ) + elif role == 'assistant' and not first_assistant: + first_assistant = self._normalize_content( + msg.get('content', '') + ) + if first_user and first_assistant: + break + + if not first_user or not first_assistant: + return '' + + raw = first_user + '|' + first_assistant + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + def _message_hash(self, msg: dict[str, Any]) -> str: + content = self._normalize_content(msg.get('content', '')) + tool_ids = sorted( + self._normalize_tool_id(tc.get('id', '')) + for tc in msg.get('tool_calls', []) + if isinstance(tc, dict) + ) + raw = json.dumps({'c': content, 't': tool_ids}, ensure_ascii=False) + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + @staticmethod + def _normalize_content(content: Any) -> str: + if isinstance(content, list): + parts = [] + for p in content: + if isinstance(p, dict) and p.get('type') == 'text': + parts.append(p.get('text', '')) + elif isinstance(p, str): + parts.append(p) + text = '\n'.join(parts) + elif isinstance(content, str): + text = content + else: + text = str(content) if content else '' + text = _THINK_RE.sub('', text) + text = _UNCLOSED_THINK_RE.sub('', text) + return text.strip() + + @staticmethod + def _normalize_tool_id(tid: str) -> str: + return _TOOL_ID_RE.sub('', tid) + + def _cleanup(self) -> None: + """惰性清理过期条目(每 100 次写入触发一次全量扫描)。""" + if len(self._store) < 100: + return + now = time.time() + expired = [k for k, (_, ts) in self._store.items() if (now - ts) >= _TTL] + for k in expired: + del self._store[k] + + +thinking_cache = ThinkingCache() diff --git a/utils/usage_tracker.py b/utils/usage_tracker.py new file mode 100644 index 0000000..bbe698d --- /dev/null +++ b/utils/usage_tracker.py @@ -0,0 +1,72 @@ +"""用量统计 — 内存聚合 + +按模型名聚合请求数、token 用量等统计数据。 +重启后重置,适合轻量监控场景。 +""" + +from __future__ import annotations + +import threading +import time +from typing import Any + + +class _ModelStats: + __slots__ = ('request_count', 'input_tokens', 'output_tokens', 'first_seen', 'last_seen') + + def __init__(self): + self.request_count = 0 + self.input_tokens = 0 + self.output_tokens = 0 + self.first_seen = time.time() + self.last_seen = time.time() + + +class UsageTracker: + def __init__(self): + self._lock = threading.Lock() + self._stats: dict[str, _ModelStats] = {} + self._start_time = time.time() + + def record( + self, + model: str, + usage: dict[str, Any] | None = None, + *, + input_key: str = 'prompt_tokens', + output_key: str = 'completion_tokens', + ) -> None: + """记录一次请求的用量。""" + with self._lock: + if model not in self._stats: + self._stats[model] = _ModelStats() + s = self._stats[model] + s.request_count += 1 + s.last_seen = time.time() + if usage: + s.input_tokens += usage.get(input_key, 0) or 0 + s.output_tokens += usage.get(output_key, 0) or 0 + + def get_stats(self) -> dict[str, Any]: + """返回所有模型的聚合统计。""" + with self._lock: + result = {} + for model, s in self._stats.items(): + result[model] = { + 'request_count': s.request_count, + 'input_tokens': s.input_tokens, + 'output_tokens': s.output_tokens, + 'total_tokens': s.input_tokens + s.output_tokens, + } + return { + 'uptime_seconds': int(time.time() - self._start_time), + 'models': result, + } + + def reset(self) -> None: + with self._lock: + self._stats.clear() + self._start_time = time.time() + + +usage_tracker = UsageTracker()