From 49c69c4cc0afce5353c47611af368f0c01cf85d4 Mon Sep 17 00:00:00 2001 From: h88782481 <54714341+h88782481@users.noreply.github.com> Date: Sat, 14 Mar 2026 11:04:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- routes/chat.py | 61 +++++++++++++++++++++++++++--- routes/responses.py | 91 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 136 insertions(+), 16 deletions(-) diff --git a/routes/chat.py b/routes/chat.py index 60eedf0..ebf329b 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -354,8 +354,16 @@ def _handle_responses_stream( event_count = 0 client_chunks: list[Any] = [] + last_usage: dict[str, Any] | None = None for event_type, event_data in iter_responses_sse(resp): append_upstream_event(turn, {'type': event_type, 'data': event_data}) + extracted_usage = _extract_responses_usage(event_data) + if extracted_usage: + last_usage = { + 'prompt_tokens': extracted_usage.get('input_tokens', 0), + 'completion_tokens': extracted_usage.get('output_tokens', 0), + 'total_tokens': extracted_usage.get('total_tokens', 0), + } if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' @@ -365,6 +373,8 @@ def _handle_responses_stream( for chunk in converter.process_event(event_type, event_data): client_chunks.append(chunk) append_client_event(turn, {'type': 'chat_chunk', 'data': chunk}) + if isinstance(chunk, dict) and isinstance(chunk.get('usage'), dict): + last_usage = chunk['usage'] if event_count < 10: _dbg( f'返回片段#{event_count}=' @@ -377,17 +387,19 @@ def _handle_responses_stream( _dbg(f'流式响应结束,共 {event_count} 个事件') append_client_event(turn, {'type': 'done'}) yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model) + usage_tracker.record(ctx.client_model, last_usage) set_stream_summary(turn, { 'event_count': event_count, 'client_chunk_count': len(client_chunks), + 'usage': last_usage, }) attach_client_response(turn, { 'type': 'chat.completion.stream.summary', 'model': ctx.client_model, 'chunk_count': len(client_chunks), + 'usage': last_usage, }) - finalize_turn(turn) + finalize_turn(turn, usage=last_usage) return sse_response(generate()) @@ -455,8 +467,16 @@ def _handle_gemini_stream( chunk_count = 0 client_chunks: list[Any] = [] + last_usage: dict[str, Any] | None = None for gemini_chunk in iter_gemini_sse(resp): append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) + usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None + if isinstance(usage_meta, dict): + last_usage = { + 'prompt_tokens': usage_meta.get('promptTokenCount', 0), + 'completion_tokens': usage_meta.get('candidatesTokenCount', 0), + 'total_tokens': usage_meta.get('totalTokenCount', 0), + } if chunk_count < 10: _dbg( f'上游 Gemini 片段#{chunk_count}=' @@ -467,6 +487,8 @@ def _handle_gemini_stream( cc_chunk['model'] = ctx.client_model client_chunks.append(cc_chunk) append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk}) + if isinstance(cc_chunk, dict) and isinstance(cc_chunk.get('usage'), dict): + last_usage = cc_chunk['usage'] if chunk_count < 10: _dbg( f'返回片段#{chunk_count}=' @@ -479,17 +501,19 @@ def _handle_gemini_stream( _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') append_client_event(turn, {'type': 'done'}) yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model) + usage_tracker.record(ctx.client_model, last_usage) set_stream_summary(turn, { 'chunk_count': chunk_count, 'client_chunk_count': len(client_chunks), + 'usage': last_usage, }) attach_client_response(turn, { 'type': 'chat.completion.stream.summary', 'model': ctx.client_model, 'chunk_count': len(client_chunks), + 'usage': last_usage, }) - finalize_turn(turn) + finalize_turn(turn, usage=last_usage) return sse_response(generate()) @@ -565,8 +589,29 @@ def _handle_anthropic_stream( event_count = 0 client_chunks: list[Any] = [] + last_usage: dict[str, Any] | None = None for event_type, event_data in iter_anthropic_sse(resp): append_upstream_event(turn, {'type': event_type, 'data': event_data}) + if event_type == 'message_start': + message_usage = event_data.get('message', {}).get('usage', {}) + if isinstance(message_usage, dict): + last_usage = { + 'prompt_tokens': message_usage.get('input_tokens', 0), + 'completion_tokens': 0, + 'total_tokens': message_usage.get('input_tokens', 0), + } + elif event_type == 'message_delta': + delta_usage = event_data.get('usage', {}) + if isinstance(delta_usage, dict): + prompt_tokens = 0 + if isinstance(last_usage, dict): + prompt_tokens = last_usage.get('prompt_tokens', 0) + completion_tokens = delta_usage.get('output_tokens', 0) + last_usage = { + 'prompt_tokens': prompt_tokens, + 'completion_tokens': completion_tokens, + 'total_tokens': prompt_tokens + completion_tokens, + } if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' @@ -577,6 +622,8 @@ def _handle_anthropic_stream( try: chunk_obj = json.loads(chunk_str) chunk_obj['model'] = ctx.client_model + if isinstance(chunk_obj.get('usage'), dict): + last_usage = chunk_obj['usage'] chunk_str = json.dumps(chunk_obj, ensure_ascii=False) except (json.JSONDecodeError, TypeError): pass @@ -592,17 +639,19 @@ def _handle_anthropic_stream( _dbg(f'流式响应结束,共 {event_count} 个事件') append_client_event(turn, {'type': 'done'}) yield sse_data_message('[DONE]') - usage_tracker.record(ctx.client_model) + usage_tracker.record(ctx.client_model, last_usage) set_stream_summary(turn, { 'event_count': event_count, 'client_chunk_count': len(client_chunks), + 'usage': last_usage, }) attach_client_response(turn, { 'type': 'chat.completion.stream.summary', 'model': ctx.client_model, 'chunk_count': len(client_chunks), + 'usage': last_usage, }) - finalize_turn(turn) + finalize_turn(turn, usage=last_usage) return sse_response(generate()) diff --git a/routes/responses.py b/routes/responses.py index aa61f06..4889a40 100644 --- a/routes/responses.py +++ b/routes/responses.py @@ -157,7 +157,12 @@ def _handle_openai_non_stream( fixed = fix_response(raw) response_data = cc_to_responses(fixed, ctx.client_model) - return _finalize_responses_response(response_data, turn=turn, debug_label='转换为 Responses 后') + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='转换为 Responses 后', + ) def _handle_openai_stream( @@ -270,7 +275,12 @@ def _handle_responses_non_stream( response_data = resp.json() attach_upstream_response(turn, response_data) response_data['model'] = ctx.client_model - return _finalize_responses_response(response_data, turn=turn, debug_label='原生 Responses 返回后') + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='原生 Responses 返回后', + ) def _handle_responses_stream( @@ -297,8 +307,12 @@ def _handle_responses_stream( event_count = 0 client_events: list[str] = [] + last_usage: dict[str, Any] | None = None for event_type, event_data in iter_responses_sse(resp): append_upstream_event(turn, {'type': event_type, 'data': event_data}) + extracted_usage = _extract_responses_usage(event_data) + if extracted_usage: + last_usage = extracted_usage if event_count < 10: _dbg( f'上游事件#{event_count} 类型={event_type} 数据=' @@ -312,21 +326,47 @@ def _handle_responses_stream( event_count += 1 _dbg(f'流式响应结束,共 {event_count} 个事件') - usage_tracker.record(ctx.client_model) + usage_tracker.record( + ctx.client_model, + last_usage, + input_key='input_tokens', + output_key='output_tokens', + ) set_stream_summary(turn, { 'event_count': event_count, 'client_event_count': len(client_events), + 'usage': last_usage, }) attach_client_response(turn, { 'type': 'responses.stream.summary', 'model': ctx.client_model, 'event_count': len(client_events), + 'usage': last_usage, }) - finalize_turn(turn) + finalize_turn(turn, usage=last_usage) return sse_response(generate()) +def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None: + """从原生 Responses 事件中提取 usage。 + + 原生 `/v1/responses` 流式通常会在 `response.completed` 事件里携带 usage, + 也可能直接挂在顶层 `usage` 字段。这里统一做兼容提取,供统计与日志复用。 + """ + if not isinstance(event_data, dict): + return None + usage = event_data.get('usage') + if isinstance(usage, dict): + return usage + response_obj = event_data.get('response') + if isinstance(response_obj, dict): + nested_usage = response_obj.get('usage') + if isinstance(nested_usage, dict): + return nested_usage + return None + + def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): """处理走 Gemini Contents 后端的 Responses 请求。""" gemini_payload = cc_to_gemini_request(cc_payload) @@ -365,7 +405,12 @@ def _handle_gemini_non_stream( cc_data = gemini_to_cc_response(raw) response_data = cc_to_responses(cc_data, ctx.client_model) - return _finalize_responses_response(response_data, turn=turn, debug_label='Gemini 转回 Responses 后') + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='Gemini 转回 Responses 后', + ) def _handle_gemini_stream( @@ -393,8 +438,16 @@ def _handle_gemini_stream( chunk_count = 0 client_events: list[str] = [] + last_usage: dict[str, Any] | None = None for gemini_chunk in iter_gemini_sse(resp): append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) + usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None + if isinstance(usage_meta, dict): + last_usage = { + 'input_tokens': usage_meta.get('promptTokenCount', 0), + 'output_tokens': usage_meta.get('candidatesTokenCount', 0), + 'total_tokens': usage_meta.get('totalTokenCount', 0), + } if chunk_count < 10: _dbg( f'上游 Gemini 片段#{chunk_count}=' @@ -415,17 +468,24 @@ def _handle_gemini_stream( client_events.append(evt) append_client_event(turn, {'type': 'responses_event', 'data': evt}) yield evt - usage_tracker.record(ctx.client_model) + usage_tracker.record( + ctx.client_model, + last_usage, + input_key='input_tokens', + output_key='output_tokens', + ) set_stream_summary(turn, { 'chunk_count': chunk_count, 'client_event_count': len(client_events), + 'usage': last_usage, }) attach_client_response(turn, { 'type': 'responses.stream.summary', 'model': ctx.client_model, 'event_count': len(client_events), + 'usage': last_usage, }) - finalize_turn(turn) + finalize_turn(turn, usage=last_usage) return sse_response(generate()) @@ -469,7 +529,12 @@ def _handle_anthropic_non_stream( cc_data = messages_to_cc_response(raw) response_data = cc_to_responses(cc_data, ctx.client_model) - return _finalize_responses_response(response_data, turn=turn, debug_label='Messages 转回 Responses 后') + return _finalize_responses_response( + response_data, + client_model=ctx.client_model, + turn=turn, + debug_label='Messages 转回 Responses 后', + ) def _handle_anthropic_stream( @@ -538,7 +603,13 @@ def _handle_anthropic_stream( return sse_response(generate()) -def _finalize_responses_response(response_data: dict[str, Any], *, turn: dict[str, Any], debug_label: str): +def _finalize_responses_response( + response_data: dict[str, Any], + *, + client_model: str, + turn: dict[str, Any], + debug_label: str, +): """统一收尾非流式 Responses 响应。 两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象,因此这里集中 @@ -549,7 +620,7 @@ def _finalize_responses_response(response_data: dict[str, Any], *, turn: dict[st log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') usage_tracker.record( - response_data.get('model', ''), + client_model, response_data.get('usage'), input_key='input_tokens', output_key='output_tokens',