修复日志

This commit is contained in:
h88782481 2026-03-14 11:04:55 +08:00
parent 3c4a50dc05
commit 49c69c4cc0
2 changed files with 136 additions and 16 deletions

View file

@ -354,8 +354,16 @@ def _handle_responses_stream(
event_count = 0 event_count = 0
client_chunks: list[Any] = [] client_chunks: list[Any] = []
last_usage: dict[str, Any] | None = None
for event_type, event_data in iter_responses_sse(resp): for event_type, event_data in iter_responses_sse(resp):
append_upstream_event(turn, {'type': event_type, 'data': event_data}) append_upstream_event(turn, {'type': event_type, 'data': event_data})
extracted_usage = _extract_responses_usage(event_data)
if extracted_usage:
last_usage = {
'prompt_tokens': extracted_usage.get('input_tokens', 0),
'completion_tokens': extracted_usage.get('output_tokens', 0),
'total_tokens': extracted_usage.get('total_tokens', 0),
}
if event_count < 10: if event_count < 10:
_dbg( _dbg(
f'上游事件#{event_count} 类型={event_type} 数据=' f'上游事件#{event_count} 类型={event_type} 数据='
@ -365,6 +373,8 @@ def _handle_responses_stream(
for chunk in converter.process_event(event_type, event_data): for chunk in converter.process_event(event_type, event_data):
client_chunks.append(chunk) client_chunks.append(chunk)
append_client_event(turn, {'type': 'chat_chunk', 'data': chunk}) append_client_event(turn, {'type': 'chat_chunk', 'data': chunk})
if isinstance(chunk, dict) and isinstance(chunk.get('usage'), dict):
last_usage = chunk['usage']
if event_count < 10: if event_count < 10:
_dbg( _dbg(
f'返回片段#{event_count}=' f'返回片段#{event_count}='
@ -377,17 +387,19 @@ def _handle_responses_stream(
_dbg(f'流式响应结束,共 {event_count} 个事件') _dbg(f'流式响应结束,共 {event_count} 个事件')
append_client_event(turn, {'type': 'done'}) append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]') yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model) usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, { set_stream_summary(turn, {
'event_count': event_count, 'event_count': event_count,
'client_chunk_count': len(client_chunks), 'client_chunk_count': len(client_chunks),
'usage': last_usage,
}) })
attach_client_response(turn, { attach_client_response(turn, {
'type': 'chat.completion.stream.summary', 'type': 'chat.completion.stream.summary',
'model': ctx.client_model, 'model': ctx.client_model,
'chunk_count': len(client_chunks), 'chunk_count': len(client_chunks),
'usage': last_usage,
}) })
finalize_turn(turn) finalize_turn(turn, usage=last_usage)
return sse_response(generate()) return sse_response(generate())
@ -455,8 +467,16 @@ def _handle_gemini_stream(
chunk_count = 0 chunk_count = 0
client_chunks: list[Any] = [] client_chunks: list[Any] = []
last_usage: dict[str, Any] | None = None
for gemini_chunk in iter_gemini_sse(resp): for gemini_chunk in iter_gemini_sse(resp):
append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk})
usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None
if isinstance(usage_meta, dict):
last_usage = {
'prompt_tokens': usage_meta.get('promptTokenCount', 0),
'completion_tokens': usage_meta.get('candidatesTokenCount', 0),
'total_tokens': usage_meta.get('totalTokenCount', 0),
}
if chunk_count < 10: if chunk_count < 10:
_dbg( _dbg(
f'上游 Gemini 片段#{chunk_count}=' f'上游 Gemini 片段#{chunk_count}='
@ -467,6 +487,8 @@ def _handle_gemini_stream(
cc_chunk['model'] = ctx.client_model cc_chunk['model'] = ctx.client_model
client_chunks.append(cc_chunk) client_chunks.append(cc_chunk)
append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk}) append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk})
if isinstance(cc_chunk, dict) and isinstance(cc_chunk.get('usage'), dict):
last_usage = cc_chunk['usage']
if chunk_count < 10: if chunk_count < 10:
_dbg( _dbg(
f'返回片段#{chunk_count}=' f'返回片段#{chunk_count}='
@ -479,17 +501,19 @@ def _handle_gemini_stream(
_dbg(f'流式响应结束,共 {chunk_count} 个数据片段') _dbg(f'流式响应结束,共 {chunk_count} 个数据片段')
append_client_event(turn, {'type': 'done'}) append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]') yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model) usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, { set_stream_summary(turn, {
'chunk_count': chunk_count, 'chunk_count': chunk_count,
'client_chunk_count': len(client_chunks), 'client_chunk_count': len(client_chunks),
'usage': last_usage,
}) })
attach_client_response(turn, { attach_client_response(turn, {
'type': 'chat.completion.stream.summary', 'type': 'chat.completion.stream.summary',
'model': ctx.client_model, 'model': ctx.client_model,
'chunk_count': len(client_chunks), 'chunk_count': len(client_chunks),
'usage': last_usage,
}) })
finalize_turn(turn) finalize_turn(turn, usage=last_usage)
return sse_response(generate()) return sse_response(generate())
@ -565,8 +589,29 @@ def _handle_anthropic_stream(
event_count = 0 event_count = 0
client_chunks: list[Any] = [] client_chunks: list[Any] = []
last_usage: dict[str, Any] | None = None
for event_type, event_data in iter_anthropic_sse(resp): for event_type, event_data in iter_anthropic_sse(resp):
append_upstream_event(turn, {'type': event_type, 'data': event_data}) append_upstream_event(turn, {'type': event_type, 'data': event_data})
if event_type == 'message_start':
message_usage = event_data.get('message', {}).get('usage', {})
if isinstance(message_usage, dict):
last_usage = {
'prompt_tokens': message_usage.get('input_tokens', 0),
'completion_tokens': 0,
'total_tokens': message_usage.get('input_tokens', 0),
}
elif event_type == 'message_delta':
delta_usage = event_data.get('usage', {})
if isinstance(delta_usage, dict):
prompt_tokens = 0
if isinstance(last_usage, dict):
prompt_tokens = last_usage.get('prompt_tokens', 0)
completion_tokens = delta_usage.get('output_tokens', 0)
last_usage = {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': prompt_tokens + completion_tokens,
}
if event_count < 10: if event_count < 10:
_dbg( _dbg(
f'上游事件#{event_count} 类型={event_type} 数据=' f'上游事件#{event_count} 类型={event_type} 数据='
@ -577,6 +622,8 @@ def _handle_anthropic_stream(
try: try:
chunk_obj = json.loads(chunk_str) chunk_obj = json.loads(chunk_str)
chunk_obj['model'] = ctx.client_model chunk_obj['model'] = ctx.client_model
if isinstance(chunk_obj.get('usage'), dict):
last_usage = chunk_obj['usage']
chunk_str = json.dumps(chunk_obj, ensure_ascii=False) chunk_str = json.dumps(chunk_obj, ensure_ascii=False)
except (json.JSONDecodeError, TypeError): except (json.JSONDecodeError, TypeError):
pass pass
@ -592,17 +639,19 @@ def _handle_anthropic_stream(
_dbg(f'流式响应结束,共 {event_count} 个事件') _dbg(f'流式响应结束,共 {event_count} 个事件')
append_client_event(turn, {'type': 'done'}) append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]') yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model) usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, { set_stream_summary(turn, {
'event_count': event_count, 'event_count': event_count,
'client_chunk_count': len(client_chunks), 'client_chunk_count': len(client_chunks),
'usage': last_usage,
}) })
attach_client_response(turn, { attach_client_response(turn, {
'type': 'chat.completion.stream.summary', 'type': 'chat.completion.stream.summary',
'model': ctx.client_model, 'model': ctx.client_model,
'chunk_count': len(client_chunks), 'chunk_count': len(client_chunks),
'usage': last_usage,
}) })
finalize_turn(turn) finalize_turn(turn, usage=last_usage)
return sse_response(generate()) return sse_response(generate())

View file

@ -157,7 +157,12 @@ def _handle_openai_non_stream(
fixed = fix_response(raw) fixed = fix_response(raw)
response_data = cc_to_responses(fixed, ctx.client_model) response_data = cc_to_responses(fixed, ctx.client_model)
return _finalize_responses_response(response_data, turn=turn, debug_label='转换为 Responses 后') return _finalize_responses_response(
response_data,
client_model=ctx.client_model,
turn=turn,
debug_label='转换为 Responses 后',
)
def _handle_openai_stream( def _handle_openai_stream(
@ -270,7 +275,12 @@ def _handle_responses_non_stream(
response_data = resp.json() response_data = resp.json()
attach_upstream_response(turn, response_data) attach_upstream_response(turn, response_data)
response_data['model'] = ctx.client_model response_data['model'] = ctx.client_model
return _finalize_responses_response(response_data, turn=turn, debug_label='原生 Responses 返回后') return _finalize_responses_response(
response_data,
client_model=ctx.client_model,
turn=turn,
debug_label='原生 Responses 返回后',
)
def _handle_responses_stream( def _handle_responses_stream(
@ -297,8 +307,12 @@ def _handle_responses_stream(
event_count = 0 event_count = 0
client_events: list[str] = [] client_events: list[str] = []
last_usage: dict[str, Any] | None = None
for event_type, event_data in iter_responses_sse(resp): for event_type, event_data in iter_responses_sse(resp):
append_upstream_event(turn, {'type': event_type, 'data': event_data}) append_upstream_event(turn, {'type': event_type, 'data': event_data})
extracted_usage = _extract_responses_usage(event_data)
if extracted_usage:
last_usage = extracted_usage
if event_count < 10: if event_count < 10:
_dbg( _dbg(
f'上游事件#{event_count} 类型={event_type} 数据=' f'上游事件#{event_count} 类型={event_type} 数据='
@ -312,21 +326,47 @@ def _handle_responses_stream(
event_count += 1 event_count += 1
_dbg(f'流式响应结束,共 {event_count} 个事件') _dbg(f'流式响应结束,共 {event_count} 个事件')
usage_tracker.record(ctx.client_model) usage_tracker.record(
ctx.client_model,
last_usage,
input_key='input_tokens',
output_key='output_tokens',
)
set_stream_summary(turn, { set_stream_summary(turn, {
'event_count': event_count, 'event_count': event_count,
'client_event_count': len(client_events), 'client_event_count': len(client_events),
'usage': last_usage,
}) })
attach_client_response(turn, { attach_client_response(turn, {
'type': 'responses.stream.summary', 'type': 'responses.stream.summary',
'model': ctx.client_model, 'model': ctx.client_model,
'event_count': len(client_events), 'event_count': len(client_events),
'usage': last_usage,
}) })
finalize_turn(turn) finalize_turn(turn, usage=last_usage)
return sse_response(generate()) return sse_response(generate())
def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None:
"""从原生 Responses 事件中提取 usage。
原生 `/v1/responses` 流式通常会在 `response.completed` 事件里携带 usage
也可能直接挂在顶层 `usage` 字段这里统一做兼容提取供统计与日志复用
"""
if not isinstance(event_data, dict):
return None
usage = event_data.get('usage')
if isinstance(usage, dict):
return usage
response_obj = event_data.get('response')
if isinstance(response_obj, dict):
nested_usage = response_obj.get('usage')
if isinstance(nested_usage, dict):
return nested_usage
return None
def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None):
"""处理走 Gemini Contents 后端的 Responses 请求。""" """处理走 Gemini Contents 后端的 Responses 请求。"""
gemini_payload = cc_to_gemini_request(cc_payload) gemini_payload = cc_to_gemini_request(cc_payload)
@ -365,7 +405,12 @@ def _handle_gemini_non_stream(
cc_data = gemini_to_cc_response(raw) cc_data = gemini_to_cc_response(raw)
response_data = cc_to_responses(cc_data, ctx.client_model) response_data = cc_to_responses(cc_data, ctx.client_model)
return _finalize_responses_response(response_data, turn=turn, debug_label='Gemini 转回 Responses 后') return _finalize_responses_response(
response_data,
client_model=ctx.client_model,
turn=turn,
debug_label='Gemini 转回 Responses 后',
)
def _handle_gemini_stream( def _handle_gemini_stream(
@ -393,8 +438,16 @@ def _handle_gemini_stream(
chunk_count = 0 chunk_count = 0
client_events: list[str] = [] client_events: list[str] = []
last_usage: dict[str, Any] | None = None
for gemini_chunk in iter_gemini_sse(resp): for gemini_chunk in iter_gemini_sse(resp):
append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk}) append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk})
usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None
if isinstance(usage_meta, dict):
last_usage = {
'input_tokens': usage_meta.get('promptTokenCount', 0),
'output_tokens': usage_meta.get('candidatesTokenCount', 0),
'total_tokens': usage_meta.get('totalTokenCount', 0),
}
if chunk_count < 10: if chunk_count < 10:
_dbg( _dbg(
f'上游 Gemini 片段#{chunk_count}=' f'上游 Gemini 片段#{chunk_count}='
@ -415,17 +468,24 @@ def _handle_gemini_stream(
client_events.append(evt) client_events.append(evt)
append_client_event(turn, {'type': 'responses_event', 'data': evt}) append_client_event(turn, {'type': 'responses_event', 'data': evt})
yield evt yield evt
usage_tracker.record(ctx.client_model) usage_tracker.record(
ctx.client_model,
last_usage,
input_key='input_tokens',
output_key='output_tokens',
)
set_stream_summary(turn, { set_stream_summary(turn, {
'chunk_count': chunk_count, 'chunk_count': chunk_count,
'client_event_count': len(client_events), 'client_event_count': len(client_events),
'usage': last_usage,
}) })
attach_client_response(turn, { attach_client_response(turn, {
'type': 'responses.stream.summary', 'type': 'responses.stream.summary',
'model': ctx.client_model, 'model': ctx.client_model,
'event_count': len(client_events), 'event_count': len(client_events),
'usage': last_usage,
}) })
finalize_turn(turn) finalize_turn(turn, usage=last_usage)
return sse_response(generate()) return sse_response(generate())
@ -469,7 +529,12 @@ def _handle_anthropic_non_stream(
cc_data = messages_to_cc_response(raw) cc_data = messages_to_cc_response(raw)
response_data = cc_to_responses(cc_data, ctx.client_model) response_data = cc_to_responses(cc_data, ctx.client_model)
return _finalize_responses_response(response_data, turn=turn, debug_label='Messages 转回 Responses 后') return _finalize_responses_response(
response_data,
client_model=ctx.client_model,
turn=turn,
debug_label='Messages 转回 Responses 后',
)
def _handle_anthropic_stream( def _handle_anthropic_stream(
@ -538,7 +603,13 @@ def _handle_anthropic_stream(
return sse_response(generate()) return sse_response(generate())
def _finalize_responses_response(response_data: dict[str, Any], *, turn: dict[str, Any], debug_label: str): def _finalize_responses_response(
response_data: dict[str, Any],
*,
client_model: str,
turn: dict[str, Any],
debug_label: str,
):
"""统一收尾非流式 Responses 响应。 """统一收尾非流式 Responses 响应。
两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象因此这里集中 两条转换链路和一条原生 Responses 链路最终都会回到 Responses 对象因此这里集中
@ -549,7 +620,7 @@ def _finalize_responses_response(response_data: dict[str, Any], *, turn: dict[st
log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens')
usage_tracker.record( usage_tracker.record(
response_data.get('model', ''), client_model,
response_data.get('usage'), response_data.get('usage'),
input_key='input_tokens', input_key='input_tokens',
output_key='output_tokens', output_key='output_tokens',