From e373295cf5d44715e829dc8e4d872afb189243a8 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 5 May 2026 13:42:35 +0800 Subject: [PATCH] add admin log --- routes/admin.py | 10 +++ routes/chat.py | 182 +++++++++++++++++++++++++++++++++------ routes/messages.py | 29 ++++++- routes/responses.py | 146 ++++++++++++++++++++++++++----- static/admin.css | 8 ++ static/admin.html | 10 +++ static/admin.js | 50 +++++++++++ utils/request_history.py | 111 ++++++++++++++++++++++++ 8 files changed, 495 insertions(+), 51 deletions(-) create mode 100644 utils/request_history.py diff --git a/routes/admin.py b/routes/admin.py index e8a9e77..612ee89 100644 --- a/routes/admin.py +++ b/routes/admin.py @@ -13,6 +13,7 @@ from flask import Blueprint, request, jsonify, send_from_directory import settings from config import Config +from utils.request_history import request_history logger = logging.getLogger(__name__) @@ -202,6 +203,15 @@ def get_stats(): return jsonify(usage_tracker.get_stats()) +@bp.route('/api/admin/request-logs', methods=['GET']) +def get_request_logs(): + """返回最近 500 条请求日志。""" + err = _check_auth() + if err: + return err + return jsonify({'items': request_history.get_recent(500)}) + + # ─── 内部辅助 ───────────────────────────────────── diff --git a/routes/chat.py b/routes/chat.py index 66e1e67..9532ed0 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -9,6 +9,7 @@ from __future__ import annotations import json import logging +from time import perf_counter from typing import Any import settings @@ -59,6 +60,7 @@ from utils.http import ( iter_responses_sse, sse_response, ) +from utils.request_history import request_history from utils.request_logger import ( append_client_event, append_upstream_event, @@ -113,6 +115,7 @@ def chat_completions(): client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) ctx = build_route_context(client_model, is_stream) + request_started_at = perf_counter() turn = start_turn( route='chat', client_model=client_model, @@ -132,12 +135,12 @@ def chat_completions(): payload['messages'] = thinking_cache.inject(payload.get('messages', [])) if ctx.backend == 'openai': - return _handle_openai_backend(ctx, payload, turn) + return _handle_openai_backend(ctx, payload, turn, request_started_at) if ctx.backend == 'responses': - return _handle_responses_backend(ctx, payload, turn) + return _handle_responses_backend(ctx, payload, turn, request_started_at) if ctx.backend == 'gemini': - return _handle_gemini_backend(ctx, payload, turn) - return _handle_anthropic_backend(ctx, payload, turn) + return _handle_gemini_backend(ctx, payload, turn, request_started_at) + return _handle_anthropic_backend(ctx, payload, turn, request_started_at) def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], int]: @@ -158,7 +161,12 @@ def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], in return payload, message_count -def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any]): +def _handle_openai_backend( + ctx: RouteContext, + payload: dict[str, Any], + turn: dict[str, Any], + request_started_at: float, +): """处理走 OpenAI 兼容后端的聊天补全请求。""" _dbg( '原始请求字段=' + str(list(payload.keys())) + ' ' @@ -182,8 +190,8 @@ def _handle_openai_backend(ctx: RouteContext, payload: dict[str, Any], turn: dic headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_openai_stream(ctx, payload, url, headers, turn) - return _handle_openai_non_stream(ctx, payload, url, headers, turn) + return _handle_openai_stream(ctx, payload, url, headers, turn, request_started_at) + return _handle_openai_non_stream(ctx, payload, url, headers, turn, request_started_at) def _handle_openai_non_stream( @@ -192,6 +200,7 @@ def _handle_openai_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any], + request_started_at: float, ): """处理 OpenAI 兼容后端的非流式返回。""" payload['stream'] = False @@ -207,7 +216,14 @@ def _handle_openai_non_stream( _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = fix_response(raw) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='修复后响应') + return _finalize_chat_response( + ctx, + data, + turn=turn, + debug_label='修复后响应', + request_started_at=request_started_at, + upstream_url=url, + ) def _handle_openai_stream( @@ -216,6 +232,7 @@ def _handle_openai_stream( url: str, headers: dict[str, str], turn: dict[str, Any], + request_started_at: float, ): """处理 OpenAI 兼容后端的流式返回。""" payload['stream'] = True @@ -258,7 +275,18 @@ def _handle_openai_stream( 'chunk_count': len(client_chunks), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='chat', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) @@ -299,12 +327,28 @@ def _handle_openai_stream( 'chunk_count': len(client_chunks), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='chat', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return sse_response(generate()) -def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): +def _handle_responses_backend( + ctx: RouteContext, + payload: dict[str, Any], + turn: dict[str, Any] | None, + request_started_at: float, +): """处理走原生 Responses 后端的聊天补全请求。 当上游只支持 `/v1/responses` 时,需要先把聊天补全请求转换为 Responses 请求, @@ -324,8 +368,8 @@ def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_responses_stream(ctx, responses_payload, url, headers, turn) - return _handle_responses_non_stream(ctx, responses_payload, url, headers, turn) + return _handle_responses_stream(ctx, responses_payload, url, headers, turn, request_started_at) + return _handle_responses_non_stream(ctx, responses_payload, url, headers, turn, request_started_at) def _handle_responses_non_stream( @@ -334,6 +378,7 @@ def _handle_responses_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理原生 Responses 后端的非流式返回。""" payload['stream'] = False @@ -349,7 +394,14 @@ def _handle_responses_non_stream( _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = responses_to_cc_response(raw, ctx.client_model) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='Responses 转回聊天补全后') + return _finalize_chat_response( + ctx, + data, + turn=turn, + debug_label='Responses 转回聊天补全后', + request_started_at=request_started_at, + upstream_url=url, + ) def _handle_responses_stream( @@ -358,6 +410,7 @@ def _handle_responses_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理原生 Responses 后端的流式返回。""" payload['stream'] = True @@ -421,12 +474,28 @@ def _handle_responses_stream( 'chunk_count': len(client_chunks), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='chat', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return sse_response(generate()) -def _handle_gemini_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): +def _handle_gemini_backend( + ctx: RouteContext, + payload: dict[str, Any], + turn: dict[str, Any] | None, + request_started_at: float, +): """处理走 Gemini Contents 后端的聊天补全请求。""" payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position) gemini_payload = cc_to_gemini_request(payload) @@ -440,8 +509,8 @@ def _handle_gemini_backend(ctx: RouteContext, payload: dict[str, Any], turn: dic headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) - return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) + return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn, request_started_at) + return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn, request_started_at) def _handle_gemini_non_stream( @@ -450,6 +519,7 @@ def _handle_gemini_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Gemini 后端的非流式返回。""" attach_upstream_request(turn, payload, headers) @@ -464,7 +534,14 @@ def _handle_gemini_non_stream( _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = gemini_to_cc_response(raw) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='Gemini 转回聊天补全后') + return _finalize_chat_response( + ctx, + data, + turn=turn, + debug_label='Gemini 转回聊天补全后', + request_started_at=request_started_at, + upstream_url=url, + ) def _handle_gemini_stream( @@ -473,6 +550,7 @@ def _handle_gemini_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Gemini 后端的流式返回。""" converter = GeminiStreamConverter() @@ -535,12 +613,28 @@ def _handle_gemini_stream( 'chunk_count': len(client_chunks), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='chat', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return sse_response(generate()) -def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): +def _handle_anthropic_backend( + ctx: RouteContext, + payload: dict[str, Any], + turn: dict[str, Any] | None, + request_started_at: float, +): """处理走 Anthropic Messages 后端的聊天补全请求。""" payload['model'] = ctx.upstream_model anthropic_payload = cc_to_messages_request(payload) @@ -555,8 +649,8 @@ def _handle_anthropic_backend(ctx: RouteContext, payload: dict[str, Any], turn: headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) - return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn, request_started_at) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn, request_started_at) def _handle_anthropic_non_stream( @@ -565,6 +659,7 @@ def _handle_anthropic_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Anthropic 后端的非流式返回。""" payload['stream'] = False @@ -580,7 +675,14 @@ def _handle_anthropic_non_stream( _dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000]) data = messages_to_cc_response(raw) - return _finalize_chat_response(ctx, data, turn=turn, debug_label='Messages 转回聊天补全后') + return _finalize_chat_response( + ctx, + data, + turn=turn, + debug_label='Messages 转回聊天补全后', + request_started_at=request_started_at, + upstream_url=url, + ) def _handle_anthropic_stream( @@ -589,6 +691,7 @@ def _handle_anthropic_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Anthropic 后端的流式返回。 @@ -673,7 +776,18 @@ def _handle_anthropic_stream( 'chunk_count': len(client_chunks), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='chat', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return sse_response(generate()) @@ -684,6 +798,8 @@ def _finalize_chat_response( *, turn: dict[str, Any] | None, debug_label: str, + request_started_at: float, + upstream_url: str, ): """统一收尾非流式聊天补全响应。 @@ -696,9 +812,21 @@ def _finalize_chat_response( _dbg(debug_label + '=' + json.dumps(data, ensure_ascii=False, default=str)[:1000]) log_usage('聊天补全', data.get('usage', {}), input_key='prompt_tokens', output_key='completion_tokens') - usage_tracker.record(ctx.client_model, data.get('usage')) + usage = data.get('usage') + duration_ms = int((perf_counter() - request_started_at) * 1000) + usage_tracker.record(ctx.client_model, usage) + request_history.record( + route='chat', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=upstream_url, + usage=usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) attach_client_response(turn, data) - finalize_turn(turn, usage=data.get('usage')) + finalize_turn(turn, usage=usage, duration_ms=duration_ms) for choice in data.get('choices', []): msg = choice.get('message', {}) diff --git a/routes/messages.py b/routes/messages.py index 0d9faa5..a320081 100644 --- a/routes/messages.py +++ b/routes/messages.py @@ -7,6 +7,7 @@ Anthropic Messages API 透传。当 Cursor 直接发送 Anthropic 格式请求 import json import logging +from time import perf_counter import requests as req_lib from flask import Blueprint, request, jsonify @@ -15,6 +16,7 @@ import settings from config import Config from routes.common import apply_body_modifications, apply_header_modifications, inject_instructions_anthropic from utils.http import build_anthropic_headers, forward_request, sse_response +from utils.request_history import request_history from utils.request_logger import ( append_client_event, append_upstream_event, @@ -40,6 +42,7 @@ def messages_passthrough(): model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) + request_started_at = perf_counter() logger.info(f'[透传] model={model} 流式={is_stream}') mapping = settings.resolve_model(model) @@ -78,7 +81,18 @@ def messages_passthrough(): attach_upstream_response(turn, data) _inject_thinking(data) attach_client_response(turn, data) - finalize_turn(turn) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='messages', + client_model=model, + actual_model=model, + backend='anthropic', + upstream_url=url, + usage=data.get('usage'), + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=data.get('usage'), duration_ms=duration_ms) return jsonify(data) def generate(): @@ -108,7 +122,18 @@ def messages_passthrough(): 'type': 'messages.stream.summary', 'event_count': len(client_events), }) - finalize_turn(turn) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='messages', + client_model=model, + actual_model=model, + backend='anthropic', + upstream_url=url, + usage=None, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, duration_ms=duration_ms) except req_lib.RequestException as e: logger.error(f'请求上游失败: {e}') attach_error(turn, {'stage': 'request_exception', 'message': str(e)}) diff --git a/routes/responses.py b/routes/responses.py index 2496a4b..271c30f 100644 --- a/routes/responses.py +++ b/routes/responses.py @@ -8,6 +8,7 @@ from __future__ import annotations import json import logging +from time import perf_counter from typing import Any import settings @@ -44,6 +45,7 @@ from utils.http import ( iter_responses_sse, sse_response, ) +from utils.request_history import request_history from utils.request_logger import ( append_client_event, append_upstream_event, @@ -78,6 +80,7 @@ def responses_endpoint(): client_model = payload.get('model', 'unknown') is_stream = payload.get('stream', False) + request_started_at = perf_counter() ctx = build_route_context(client_model, is_stream) turn = start_turn( route='responses', @@ -94,12 +97,12 @@ def responses_endpoint(): cc_payload = _build_cc_payload(payload, ctx) if ctx.backend == 'openai': - return _handle_openai_backend(ctx, cc_payload, turn) + return _handle_openai_backend(ctx, cc_payload, turn, request_started_at) if ctx.backend == 'responses': - return _handle_responses_backend(ctx, payload, turn) + return _handle_responses_backend(ctx, payload, turn, request_started_at) if ctx.backend == 'gemini': - return _handle_gemini_backend(ctx, cc_payload, turn) - return _handle_anthropic_backend(ctx, cc_payload, turn) + return _handle_gemini_backend(ctx, cc_payload, turn, request_started_at) + return _handle_anthropic_backend(ctx, cc_payload, turn, request_started_at) def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, Any]: @@ -119,7 +122,12 @@ def _build_cc_payload(payload: dict[str, Any], ctx: RouteContext) -> dict[str, A return cc_payload -def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any]): +def _handle_openai_backend( + ctx: RouteContext, + cc_payload: dict[str, Any], + turn: dict[str, Any], + request_started_at: float, +): """处理走 OpenAI 兼容后端的 Responses 请求。""" cc_payload = normalize_request(cc_payload) _dbg( @@ -132,8 +140,8 @@ def _handle_openai_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_openai_stream(ctx, cc_payload, url, headers, turn) - return _handle_openai_non_stream(ctx, cc_payload, url, headers, turn) + return _handle_openai_stream(ctx, cc_payload, url, headers, turn, request_started_at) + return _handle_openai_non_stream(ctx, cc_payload, url, headers, turn, request_started_at) def _handle_openai_non_stream( @@ -142,6 +150,7 @@ def _handle_openai_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any], + request_started_at: float, ): """处理 OpenAI 兼容后端的非流式 Responses 返回。""" cc_payload['stream'] = False @@ -163,6 +172,9 @@ def _handle_openai_non_stream( client_model=ctx.client_model, turn=turn, debug_label='转换为 Responses 后', + ctx=ctx, + request_started_at=request_started_at, + upstream_url=url, ) @@ -172,6 +184,7 @@ def _handle_openai_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 OpenAI 兼容后端的流式 Responses 返回。""" cc_payload['stream'] = True @@ -212,7 +225,18 @@ def _handle_openai_stream( 'model': ctx.client_model, 'event_count': len(client_events), }) - finalize_turn(turn) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='responses', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=None, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, duration_ms=duration_ms) return append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk}) @@ -239,7 +263,12 @@ def _handle_openai_stream( return sse_response(generate()) -def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: dict[str, Any] | None): +def _handle_responses_backend( + ctx: RouteContext, + payload: dict[str, Any], + turn: dict[str, Any] | None, + request_started_at: float, +): """处理走原生 Responses 后端的请求。 当中转站本身就只支持 `/v1/responses` 时,不需要再绕到聊天补全中间协议, @@ -254,8 +283,8 @@ def _handle_responses_backend(ctx: RouteContext, payload: dict[str, Any], turn: headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_responses_stream(ctx, payload, url, headers, turn) - return _handle_responses_non_stream(ctx, payload, url, headers, turn) + return _handle_responses_stream(ctx, payload, url, headers, turn, request_started_at) + return _handle_responses_non_stream(ctx, payload, url, headers, turn, request_started_at) def _handle_responses_non_stream( @@ -264,6 +293,7 @@ def _handle_responses_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理原生 Responses 后端的非流式返回。""" payload['stream'] = False @@ -282,6 +312,9 @@ def _handle_responses_non_stream( client_model=ctx.client_model, turn=turn, debug_label='原生 Responses 返回后', + ctx=ctx, + request_started_at=request_started_at, + upstream_url=url, ) @@ -291,6 +324,7 @@ def _handle_responses_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理原生 Responses 后端的流式返回。""" payload['stream'] = True @@ -345,7 +379,18 @@ def _handle_responses_stream( 'event_count': len(client_events), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='responses', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return sse_response(generate()) @@ -369,7 +414,12 @@ def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | Non return None -def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): +def _handle_gemini_backend( + ctx: RouteContext, + cc_payload: dict[str, Any], + turn: dict[str, Any] | None, + request_started_at: float, +): """处理走 Gemini Contents 后端的 Responses 请求。""" gemini_payload = cc_to_gemini_request(cc_payload) _dbg( @@ -382,8 +432,8 @@ def _handle_gemini_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn) - return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn) + return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn, request_started_at) + return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn, request_started_at) def _handle_gemini_non_stream( @@ -392,6 +442,7 @@ def _handle_gemini_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Gemini 后端的非流式 Responses 返回。""" attach_upstream_request(turn, payload, headers) @@ -412,6 +463,9 @@ def _handle_gemini_non_stream( client_model=ctx.client_model, turn=turn, debug_label='Gemini 转回 Responses 后', + ctx=ctx, + request_started_at=request_started_at, + upstream_url=url, ) @@ -421,6 +475,7 @@ def _handle_gemini_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Gemini 后端的流式 Responses 返回。""" converter = ResponsesStreamConverter(model=ctx.client_model) @@ -487,12 +542,28 @@ def _handle_gemini_stream( 'event_count': len(client_events), 'usage': last_usage, }) - finalize_turn(turn, usage=last_usage) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='responses', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=last_usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, usage=last_usage, duration_ms=duration_ms) return sse_response(generate()) -def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any], turn: dict[str, Any] | None): +def _handle_anthropic_backend( + ctx: RouteContext, + cc_payload: dict[str, Any], + turn: dict[str, Any] | None, + request_started_at: float, +): """处理走 Anthropic 后端的 Responses 请求。""" anthropic_payload = cc_to_messages_request(cc_payload) _dbg( @@ -505,8 +576,8 @@ def _handle_anthropic_backend(ctx: RouteContext, cc_payload: dict[str, Any], tur headers = apply_header_modifications(headers, ctx.header_modifications) if ctx.is_stream: - return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn) - return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn) + return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn, request_started_at) + return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn, request_started_at) def _handle_anthropic_non_stream( @@ -515,6 +586,7 @@ def _handle_anthropic_non_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Anthropic 后端的非流式 Responses 返回。""" anthropic_payload['stream'] = False @@ -536,6 +608,9 @@ def _handle_anthropic_non_stream( client_model=ctx.client_model, turn=turn, debug_label='Messages 转回 Responses 后', + ctx=ctx, + request_started_at=request_started_at, + upstream_url=url, ) @@ -545,6 +620,7 @@ def _handle_anthropic_stream( url: str, headers: dict[str, str], turn: dict[str, Any] | None, + request_started_at: float, ): """处理 Anthropic 后端的流式 Responses 返回。 @@ -600,7 +676,18 @@ def _handle_anthropic_stream( 'model': ctx.client_model, 'event_count': len(client_events), }) - finalize_turn(turn) + duration_ms = int((perf_counter() - request_started_at) * 1000) + request_history.record( + route='responses', + client_model=ctx.client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=url, + usage=None, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) + finalize_turn(turn, duration_ms=duration_ms) return sse_response(generate()) @@ -611,6 +698,9 @@ def _finalize_responses_response( client_model: str, turn: dict[str, Any], debug_label: str, + ctx: RouteContext, + request_started_at: float, + upstream_url: str, ): """统一收尾非流式 Responses 响应。 @@ -621,14 +711,26 @@ def _finalize_responses_response( _dbg(debug_label + '=' + json.dumps(response_data, ensure_ascii=False, default=str)[:1000]) log_usage('响应生成', response_data.get('usage', {}), input_key='input_tokens', output_key='output_tokens') + usage = response_data.get('usage') + duration_ms = int((perf_counter() - request_started_at) * 1000) usage_tracker.record( client_model, - response_data.get('usage'), + usage, input_key='input_tokens', output_key='output_tokens', ) + request_history.record( + route='responses', + client_model=client_model, + actual_model=ctx.upstream_model, + backend=ctx.backend, + upstream_url=upstream_url, + usage=usage, + duration_ms=duration_ms, + started_at=(turn or {}).get('started_at'), + ) attach_client_response(turn, response_data) - finalize_turn(turn, usage=response_data.get('usage')) + finalize_turn(turn, usage=usage, duration_ms=duration_ms) return jsonify(response_data) diff --git a/static/admin.css b/static/admin.css index 875bbcb..1824b17 100644 --- a/static/admin.css +++ b/static/admin.css @@ -83,3 +83,11 @@ main{padding:28px 0 60px} .toast-ok{background:#065f46;color:#a7f3d0} .toast-err{background:#7f1d1d;color:#fca5a5} @keyframes slideIn{from{transform:translateX(100px);opacity:0}to{transform:none;opacity:1}} + +.request-logs-wrap{overflow:auto} +.request-logs-table{min-width:1100px} +.request-logs-table td{vertical-align:top} +.log-url{max-width:320px;word-break:break-all;color:var(--muted)} +.log-status{display:inline-flex;align-items:center;padding:2px 8px;border-radius:999px;font-size:12px;font-weight:600} +.status-ok{background:rgba(34,197,94,.15);color:var(--green)} +.status-error{background:rgba(239,68,68,.15);color:var(--red)} diff --git a/static/admin.html b/static/admin.html index 5d382ad..bf79a2e 100644 --- a/static/admin.html +++ b/static/admin.html @@ -90,6 +90,16 @@
加载中…
+ + +
+
+

最近 500 条请求日志

+ +
+
显示请求时间、请求模型、实际上游模型、上游 URL、Token 统计、耗时和状态。
+
加载中…
+
diff --git a/static/admin.js b/static/admin.js index a5d5e6c..67a4679 100644 --- a/static/admin.js +++ b/static/admin.js @@ -72,6 +72,7 @@ async function loadDashboard() { await loadMappings(); checkHealth(); loadStats(); + loadRequestLogs(); } catch (e) { toast('加载设置失败: ' + e.message, false); } @@ -104,6 +105,55 @@ async function loadStats() { } } +async function loadRequestLogs() { + const el = document.getElementById('requestLogsContent'); + try { + const data = await api('/api/admin/request-logs'); + const items = data.items || []; + if (!items.length) { + el.innerHTML = '
暂无请求日志
'; + return; + } + let html = '
'; + for (const item of items) { + const usage = item.usage || {}; + const tokens = '输 ' + fmtNum(usage.input_tokens) + ' / 出 ' + fmtNum(usage.output_tokens) + ' / 总 ' + fmtNum(usage.total_tokens); + const statusClass = item.status === 'ok' ? 'status-ok' : 'status-error'; + const statusText = item.status === 'ok' ? '成功' : '异常'; + html += '' + + '' + + '' + + '' + + '' + + '' + + '' + + '' + + ''; + } + html += '
请求时间请求模型实际模型上游 URLTokens耗时状态
' + esc(fmtTime(item.requested_at)) + '' + esc(item.requested_model || '-') + '' + esc(item.actual_model || '-') + '' + esc(item.upstream_url || '-') + '' + esc(tokens) + '' + fmtNum(item.duration_ms) + ' ms' + statusText + '
'; + el.innerHTML = html; + } catch (e) { + el.innerHTML = '
加载请求日志失败
'; + } +} + +function fmtNum(value) { + return Number(value || 0).toLocaleString(); +} + +function fmtTime(value) { + if (!value) return '-'; + const d = new Date(value); + if (Number.isNaN(d.getTime())) return String(value); + const pad = n => String(n).padStart(2, '0'); + return d.getFullYear() + '-' + + pad(d.getMonth() + 1) + '-' + + pad(d.getDate()) + ' ' + + pad(d.getHours()) + ':' + + pad(d.getMinutes()) + ':' + + pad(d.getSeconds()); +} + async function checkHealth() { try { const r = await fetch(API + '/health'); diff --git a/utils/request_history.py b/utils/request_history.py new file mode 100644 index 0000000..e8dde7f --- /dev/null +++ b/utils/request_history.py @@ -0,0 +1,111 @@ +"""请求历史记录。 + +为管理后台提供最近请求查询能力,默认仅保留最近 500 条, +重启后会从磁盘恢复最近一次快照。 +""" + +from __future__ import annotations + +import json +import os +import threading +from collections import deque +from datetime import datetime, timezone +from typing import Any + +from settings import DATA_DIR + +_MAX_RECORDS = 500 +_FILE_PATH = os.path.join(DATA_DIR, 'request_logs.json') + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') + + +def _safe_int(value: Any) -> int: + try: + return int(value or 0) + except (TypeError, ValueError): + return 0 + + +def _normalize_usage(usage: dict[str, Any] | None) -> dict[str, int]: + usage = usage or {} + input_tokens = _safe_int( + usage.get('prompt_tokens', usage.get('input_tokens', 0)) + ) + output_tokens = _safe_int( + usage.get('completion_tokens', usage.get('output_tokens', 0)) + ) + total_tokens = _safe_int(usage.get('total_tokens', input_tokens + output_tokens)) + return { + 'input_tokens': input_tokens, + 'output_tokens': output_tokens, + 'total_tokens': total_tokens, + } + + +class RequestHistory: + def __init__(self) -> None: + self._lock = threading.Lock() + self._records: deque[dict[str, Any]] = deque(maxlen=_MAX_RECORDS) + self._load() + + def record( + self, + *, + route: str, + client_model: str, + actual_model: str, + backend: str, + upstream_url: str, + usage: dict[str, Any] | None, + duration_ms: int, + started_at: str | None = None, + status: str = 'ok', + error_message: str = '', + ) -> None: + record = { + 'requested_at': started_at or _now_iso(), + 'route': route, + 'requested_model': client_model or '', + 'actual_model': actual_model or '', + 'backend': backend or '', + 'upstream_url': upstream_url or '', + 'duration_ms': max(_safe_int(duration_ms), 0), + 'status': status or 'ok', + 'error_message': error_message or '', + 'usage': _normalize_usage(usage), + 'recorded_at': _now_iso(), + } + with self._lock: + self._records.appendleft(record) + self._persist_locked() + + def get_recent(self, limit: int = _MAX_RECORDS) -> list[dict[str, Any]]: + size = max(1, min(_safe_int(limit), _MAX_RECORDS)) + with self._lock: + return list(self._records)[:size] + + def _load(self) -> None: + if not os.path.exists(_FILE_PATH): + return + try: + with open(_FILE_PATH, 'r', encoding='utf-8') as f: + data = json.load(f) + if not isinstance(data, list): + return + for item in data[:_MAX_RECORDS]: + if isinstance(item, dict): + self._records.append(item) + except (OSError, json.JSONDecodeError): + self._records.clear() + + def _persist_locked(self) -> None: + os.makedirs(DATA_DIR, exist_ok=True) + with open(_FILE_PATH, 'w', encoding='utf-8') as f: + json.dump(list(self._records), f, ensure_ascii=False, indent=2) + + +request_history = RequestHistory()