api2cursor/routes/chat.py
2026-05-05 13:42:35 +08:00

862 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""路由: /v1/chat/completions
处理 Cursor 发来的 OpenAI Chat Completions 格式请求。
根据模型映射的后端类型,转发到 OpenAI 兼容接口、Anthropic Messages 接口,
或原生 OpenAI Responses 接口。
"""
from __future__ import annotations
import json
import logging
from time import perf_counter
from typing import Any
import settings
from flask import Blueprint, jsonify, request
from adapters.cc_anthropic_adapter import (
AnthropicStreamConverter,
cc_to_messages_request,
messages_to_cc_response,
)
from adapters.cc_gemini_adapter import (
GeminiStreamConverter,
cc_to_gemini_request,
gemini_to_cc_response,
)
from adapters.openai_compat_fixer import fix_response, fix_stream_chunk, normalize_request
from adapters.responses_cc_adapter import (
ResponsesToCCStreamConverter,
cc_to_responses_request,
responses_to_cc,
responses_to_cc_response,
)
from config import Config
from routes.common import (
RouteContext,
apply_body_modifications,
apply_header_modifications,
build_anthropic_target,
build_gemini_target,
build_openai_target,
build_responses_target,
build_route_context,
chat_error_chunk,
ensure_prompt_cache_key,
inject_instructions_anthropic,
inject_instructions_cc,
inject_instructions_responses,
log_route_context,
log_usage,
sse_data_message,
)
from utils.http import (
forward_request,
gen_id,
iter_anthropic_sse,
iter_gemini_sse,
iter_openai_sse,
iter_responses_sse,
sse_response,
)
from utils.request_history import request_history
from utils.request_logger import (
append_client_event,
append_upstream_event,
attach_client_response,
attach_error,
attach_upstream_request,
attach_upstream_response,
finalize_turn,
set_stream_summary,
start_turn,
)
from utils.think_tag import ThinkTagExtractor
from utils.thinking_cache import thinking_cache
from utils.usage_tracker import usage_tracker
logger = logging.getLogger(__name__)
bp = Blueprint('chat', __name__)
def _dbg(message: str) -> None:
"""仅在调试模式下输出详细日志。"""
if settings.get_debug_mode() in ('simple', 'verbose'):
logger.info('[聊天补全调试] %s', message)
def _extract_responses_usage(event_data: dict[str, Any]) -> dict[str, Any] | None:
"""从原生 Responses 事件中提取 usage。
`/v1/chat/completions -> /v1/responses` 的桥接流式路径也需要读取 usage
因此在本文件保留一个本地辅助函数,避免依赖其他路由模块的私有实现。
"""
if not isinstance(event_data, dict):
return None
usage = event_data.get('usage')
if isinstance(usage, dict):
return usage
response_obj = event_data.get('response')
if isinstance(response_obj, dict):
nested_usage = response_obj.get('usage')
if isinstance(nested_usage, dict):
return nested_usage
return None
@bp.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
"""处理聊天补全请求并按模型映射分发到不同后端。"""
original_payload = request.get_json(force=True)
payload, message_count = _normalize_chat_payload(json.loads(json.dumps(original_payload, ensure_ascii=False, default=str)))
client_model = payload.get('model', 'unknown')
is_stream = payload.get('stream', False)
ctx = build_route_context(client_model, is_stream)
request_started_at = perf_counter()
turn = start_turn(
route='chat',
client_model=client_model,
backend=ctx.backend,
stream=is_stream,
client_request=original_payload,
request_headers=dict(request.headers),
target_url=ctx.target_url,
upstream_model=ctx.upstream_model,
metadata={'message_count': message_count},
)
log_route_context('聊天补全', ctx, extra=f'消息数={message_count}')
_log_messages(payload)
if ctx.backend != 'responses':
payload['messages'] = thinking_cache.inject(payload.get('messages', []))
if ctx.backend == 'openai':
return _handle_openai_backend(ctx, payload, turn, request_started_at)
if ctx.backend == 'responses':
return _handle_responses_backend(ctx, payload, turn, request_started_at)
if ctx.backend == 'gemini':
return _handle_gemini_backend(ctx, payload, turn, request_started_at)
return _handle_anthropic_backend(ctx, payload, turn, request_started_at)
def _normalize_chat_payload(payload: dict[str, Any]) -> tuple[dict[str, Any], int]:
"""整理聊天补全入口的请求体。
这里保留了一层兼容逻辑:当 Cursor 或调用方把 Responses 格式误发到
`/v1/chat/completions` 时,先降级转换成 Chat Completions再进入统一主流程。
"""
message_count = len(payload.get('messages', []))
if message_count == 0 and 'input' in payload:
logger.info('检测到 Responses 格式误入聊天补全接口,已自动转换为 Chat Completions 格式')
payload = responses_to_cc(payload)
message_count = len(payload.get('messages', []))
elif message_count == 0:
logger.warning('消息列表为空,请求字段=%s', list(payload.keys()))
return payload, message_count
def _handle_openai_backend(
ctx: RouteContext,
payload: dict[str, Any],
turn: dict[str, Any],
request_started_at: float,
):
"""处理走 OpenAI 兼容后端的聊天补全请求。"""
_dbg(
'原始请求字段=' + str(list(payload.keys())) + ' '
+ '附加字段='
+ json.dumps(
{k: v for k, v in payload.items() if k != 'messages'},
ensure_ascii=False,
default=str,
)[:500]
)
payload = normalize_request(payload, ctx.upstream_model)
payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position)
_dbg(
f'标准化完成:模型={payload.get("model")} '
f'工具数={len(payload.get("tools", []))}'
)
url, headers = build_openai_target(ctx)
payload = apply_body_modifications(payload, ctx.body_modifications)
headers = apply_header_modifications(headers, ctx.header_modifications)
if ctx.is_stream:
return _handle_openai_stream(ctx, payload, url, headers, turn, request_started_at)
return _handle_openai_non_stream(ctx, payload, url, headers, turn, request_started_at)
def _handle_openai_non_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any],
request_started_at: float,
):
"""处理 OpenAI 兼容后端的非流式返回。"""
payload['stream'] = False
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'})
finalize_turn(turn)
return err
raw = resp.json()
attach_upstream_response(turn, raw)
_dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000])
data = fix_response(raw)
return _finalize_chat_response(
ctx,
data,
turn=turn,
debug_label='修复后响应',
request_started_at=request_started_at,
upstream_url=url,
)
def _handle_openai_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any],
request_started_at: float,
):
"""处理 OpenAI 兼容后端的流式返回。"""
payload['stream'] = True
def generate():
"""消费上游 OpenAI SSE并逐段产出给 Cursor 的聊天补全流。"""
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload, stream=True)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': str(err)})
set_stream_summary(turn, {'status': 'error'})
finalize_turn(turn)
yield chat_error_chunk(str(err))
return
think_extractor = ThinkTagExtractor()
chunk_count = 0
last_usage = None
client_chunks: list[dict[str, Any]] = []
for chunk in iter_openai_sse(resp):
if chunk is None:
_dbg(f'流式响应结束,共 {chunk_count} 个数据片段')
close_chunk = think_extractor.finalize()
if close_chunk:
client_chunks.append(close_chunk)
append_client_event(turn, {'type': 'chat_chunk', 'data': close_chunk})
yield sse_data_message(close_chunk)
append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, {
'chunk_count': chunk_count,
'client_chunk_count': len(client_chunks),
'usage': last_usage,
})
attach_client_response(turn, {
'type': 'chat.completion.stream.summary',
'model': ctx.client_model,
'chunk_count': len(client_chunks),
'usage': last_usage,
})
duration_ms = int((perf_counter() - request_started_at) * 1000)
request_history.record(
route='chat',
client_model=ctx.client_model,
actual_model=ctx.upstream_model,
backend=ctx.backend,
upstream_url=url,
usage=last_usage,
duration_ms=duration_ms,
started_at=(turn or {}).get('started_at'),
)
finalize_turn(turn, usage=last_usage, duration_ms=duration_ms)
return
append_upstream_event(turn, {'type': 'openai_chunk', 'data': chunk})
if chunk.get('usage'):
last_usage = chunk['usage']
if chunk_count < 10:
_dbg(
f'上游原始片段#{chunk_count}='
+ json.dumps(chunk, ensure_ascii=False, default=str)[:500]
)
chunk = fix_stream_chunk(chunk)
chunk['model'] = ctx.client_model
for out in think_extractor.process_chunk(chunk):
client_chunks.append(out)
append_client_event(turn, {'type': 'chat_chunk', 'data': out})
if chunk_count < 10:
_dbg(
f'返回片段#{chunk_count}='
+ json.dumps(out, ensure_ascii=False, default=str)[:500]
)
yield sse_data_message(out)
chunk_count += 1
usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, {
'chunk_count': chunk_count,
'client_chunk_count': len(client_chunks),
'usage': last_usage,
'ended_without_done': True,
})
attach_client_response(turn, {
'type': 'chat.completion.stream.summary',
'model': ctx.client_model,
'chunk_count': len(client_chunks),
'usage': last_usage,
})
duration_ms = int((perf_counter() - request_started_at) * 1000)
request_history.record(
route='chat',
client_model=ctx.client_model,
actual_model=ctx.upstream_model,
backend=ctx.backend,
upstream_url=url,
usage=last_usage,
duration_ms=duration_ms,
started_at=(turn or {}).get('started_at'),
)
finalize_turn(turn, usage=last_usage, duration_ms=duration_ms)
return sse_response(generate())
def _handle_responses_backend(
ctx: RouteContext,
payload: dict[str, Any],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理走原生 Responses 后端的聊天补全请求。
当上游只支持 `/v1/responses` 时,需要先把聊天补全请求转换为 Responses 请求,
返回时再转换回聊天补全协议。
"""
responses_payload = cc_to_responses_request(payload)
responses_payload['model'] = ctx.upstream_model
responses_payload = inject_instructions_responses(responses_payload, ctx.custom_instructions, ctx.instructions_position)
responses_payload = ensure_prompt_cache_key(responses_payload)
_dbg(
'已转换为 Responses 请求:字段=' + str(list(responses_payload.keys()))
+ f' 输入项数={len(responses_payload.get("input", []))}'
)
url, headers = build_responses_target(ctx)
responses_payload = apply_body_modifications(responses_payload, ctx.body_modifications)
headers = apply_header_modifications(headers, ctx.header_modifications)
if ctx.is_stream:
return _handle_responses_stream(ctx, responses_payload, url, headers, turn, request_started_at)
return _handle_responses_non_stream(ctx, responses_payload, url, headers, turn, request_started_at)
def _handle_responses_non_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理原生 Responses 后端的非流式返回。"""
payload['stream'] = False
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'})
finalize_turn(turn)
return err
raw = resp.json()
attach_upstream_response(turn, raw)
_dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000])
data = responses_to_cc_response(raw, ctx.client_model)
return _finalize_chat_response(
ctx,
data,
turn=turn,
debug_label='Responses 转回聊天补全后',
request_started_at=request_started_at,
upstream_url=url,
)
def _handle_responses_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理原生 Responses 后端的流式返回。"""
payload['stream'] = True
converter = ResponsesToCCStreamConverter(model=ctx.client_model)
def generate():
"""消费上游 Responses 事件,并实时转换成聊天补全 chunk。"""
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload, stream=True)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': str(err)})
set_stream_summary(turn, {'status': 'error'})
finalize_turn(turn)
yield chat_error_chunk(str(err))
return
event_count = 0
client_chunks: list[Any] = []
last_usage: dict[str, Any] | None = None
for event_type, event_data in iter_responses_sse(resp):
append_upstream_event(turn, {'type': event_type, 'data': event_data})
extracted_usage = _extract_responses_usage(event_data)
if extracted_usage:
last_usage = {
'prompt_tokens': extracted_usage.get('input_tokens', 0),
'completion_tokens': extracted_usage.get('output_tokens', 0),
'total_tokens': extracted_usage.get('total_tokens', 0),
}
if event_count < 10:
_dbg(
f'上游事件#{event_count} 类型={event_type} 数据='
+ json.dumps(event_data, ensure_ascii=False, default=str)[:500]
)
for chunk in converter.process_event(event_type, event_data):
client_chunks.append(chunk)
append_client_event(turn, {'type': 'chat_chunk', 'data': chunk})
if isinstance(chunk, dict) and isinstance(chunk.get('usage'), dict):
last_usage = chunk['usage']
if event_count < 10:
_dbg(
f'返回片段#{event_count}='
+ json.dumps(chunk, ensure_ascii=False, default=str)[:500]
)
yield sse_data_message(chunk)
event_count += 1
_dbg(f'流式响应结束,共 {event_count} 个事件')
append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, {
'event_count': event_count,
'client_chunk_count': len(client_chunks),
'usage': last_usage,
})
attach_client_response(turn, {
'type': 'chat.completion.stream.summary',
'model': ctx.client_model,
'chunk_count': len(client_chunks),
'usage': last_usage,
})
duration_ms = int((perf_counter() - request_started_at) * 1000)
request_history.record(
route='chat',
client_model=ctx.client_model,
actual_model=ctx.upstream_model,
backend=ctx.backend,
upstream_url=url,
usage=last_usage,
duration_ms=duration_ms,
started_at=(turn or {}).get('started_at'),
)
finalize_turn(turn, usage=last_usage, duration_ms=duration_ms)
return sse_response(generate())
def _handle_gemini_backend(
ctx: RouteContext,
payload: dict[str, Any],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理走 Gemini Contents 后端的聊天补全请求。"""
payload = inject_instructions_cc(payload, ctx.custom_instructions, ctx.instructions_position)
gemini_payload = cc_to_gemini_request(payload)
_dbg(
'已转换为 Gemini 请求:字段=' + str(list(gemini_payload.keys()))
+ f' 内容数={len(gemini_payload.get("contents", []))}'
)
url, headers = build_gemini_target(ctx, stream=ctx.is_stream)
gemini_payload = apply_body_modifications(gemini_payload, ctx.body_modifications)
headers = apply_header_modifications(headers, ctx.header_modifications)
if ctx.is_stream:
return _handle_gemini_stream(ctx, gemini_payload, url, headers, turn, request_started_at)
return _handle_gemini_non_stream(ctx, gemini_payload, url, headers, turn, request_started_at)
def _handle_gemini_non_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理 Gemini 后端的非流式返回。"""
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'})
finalize_turn(turn)
return err
raw = resp.json()
attach_upstream_response(turn, raw)
_dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000])
data = gemini_to_cc_response(raw)
return _finalize_chat_response(
ctx,
data,
turn=turn,
debug_label='Gemini 转回聊天补全后',
request_started_at=request_started_at,
upstream_url=url,
)
def _handle_gemini_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理 Gemini 后端的流式返回。"""
converter = GeminiStreamConverter()
def generate():
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload, stream=True)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': str(err)})
set_stream_summary(turn, {'status': 'error'})
finalize_turn(turn)
yield chat_error_chunk(str(err))
return
chunk_count = 0
client_chunks: list[Any] = []
last_usage: dict[str, Any] | None = None
for gemini_chunk in iter_gemini_sse(resp):
append_upstream_event(turn, {'type': 'gemini_chunk', 'data': gemini_chunk})
usage_meta = gemini_chunk.get('usageMetadata') if isinstance(gemini_chunk, dict) else None
if isinstance(usage_meta, dict):
last_usage = {
'prompt_tokens': usage_meta.get('promptTokenCount', 0),
'completion_tokens': usage_meta.get('candidatesTokenCount', 0),
'total_tokens': usage_meta.get('totalTokenCount', 0),
}
if chunk_count < 10:
_dbg(
f'上游 Gemini 片段#{chunk_count}='
+ json.dumps(gemini_chunk, ensure_ascii=False, default=str)[:500]
)
for cc_chunk in converter.process_chunk(gemini_chunk):
cc_chunk['model'] = ctx.client_model
client_chunks.append(cc_chunk)
append_client_event(turn, {'type': 'chat_chunk', 'data': cc_chunk})
if isinstance(cc_chunk, dict) and isinstance(cc_chunk.get('usage'), dict):
last_usage = cc_chunk['usage']
if chunk_count < 10:
_dbg(
f'返回片段#{chunk_count}='
+ json.dumps(cc_chunk, ensure_ascii=False, default=str)[:500]
)
yield sse_data_message(cc_chunk)
chunk_count += 1
_dbg(f'流式响应结束,共 {chunk_count} 个数据片段')
append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, {
'chunk_count': chunk_count,
'client_chunk_count': len(client_chunks),
'usage': last_usage,
})
attach_client_response(turn, {
'type': 'chat.completion.stream.summary',
'model': ctx.client_model,
'chunk_count': len(client_chunks),
'usage': last_usage,
})
duration_ms = int((perf_counter() - request_started_at) * 1000)
request_history.record(
route='chat',
client_model=ctx.client_model,
actual_model=ctx.upstream_model,
backend=ctx.backend,
upstream_url=url,
usage=last_usage,
duration_ms=duration_ms,
started_at=(turn or {}).get('started_at'),
)
finalize_turn(turn, usage=last_usage, duration_ms=duration_ms)
return sse_response(generate())
def _handle_anthropic_backend(
ctx: RouteContext,
payload: dict[str, Any],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理走 Anthropic Messages 后端的聊天补全请求。"""
payload['model'] = ctx.upstream_model
anthropic_payload = cc_to_messages_request(payload)
anthropic_payload = inject_instructions_anthropic(anthropic_payload, ctx.custom_instructions, ctx.instructions_position)
_dbg(
'已转换为 Messages 请求:字段=' + str(list(anthropic_payload.keys()))
+ f' 消息数={len(anthropic_payload.get("messages", []))}'
)
url, headers = build_anthropic_target(ctx)
anthropic_payload = apply_body_modifications(anthropic_payload, ctx.body_modifications)
headers = apply_header_modifications(headers, ctx.header_modifications)
if ctx.is_stream:
return _handle_anthropic_stream(ctx, anthropic_payload, url, headers, turn, request_started_at)
return _handle_anthropic_non_stream(ctx, anthropic_payload, url, headers, turn, request_started_at)
def _handle_anthropic_non_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理 Anthropic 后端的非流式返回。"""
payload['stream'] = False
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': 'upstream request failed'})
finalize_turn(turn)
return err
raw = resp.json()
attach_upstream_response(turn, raw)
_dbg('上游原始响应=' + json.dumps(raw, ensure_ascii=False, default=str)[:1000])
data = messages_to_cc_response(raw)
return _finalize_chat_response(
ctx,
data,
turn=turn,
debug_label='Messages 转回聊天补全后',
request_started_at=request_started_at,
upstream_url=url,
)
def _handle_anthropic_stream(
ctx: RouteContext,
payload: dict[str, Any],
url: str,
headers: dict[str, str],
turn: dict[str, Any] | None,
request_started_at: float,
):
"""处理 Anthropic 后端的流式返回。
这里仍然保留独立的事件级转换器,而不是先落成完整响应再回放,
是为了尽量保持 Cursor 端的流式体验和工具调用时序。
"""
payload['stream'] = True
converter = AnthropicStreamConverter()
def generate():
"""消费上游 Anthropic 事件流,并逐步映射为聊天补全 SSE。"""
attach_upstream_request(turn, payload, headers)
resp, err = forward_request(url, headers, payload, stream=True)
if err:
attach_error(turn, {'stage': 'forward_request', 'message': str(err)})
set_stream_summary(turn, {'status': 'error'})
finalize_turn(turn)
yield chat_error_chunk(str(err))
return
event_count = 0
client_chunks: list[Any] = []
last_usage: dict[str, Any] | None = None
for event_type, event_data in iter_anthropic_sse(resp):
append_upstream_event(turn, {'type': event_type, 'data': event_data})
if event_type == 'message_start':
message_usage = event_data.get('message', {}).get('usage', {})
if isinstance(message_usage, dict):
last_usage = {
'prompt_tokens': message_usage.get('input_tokens', 0),
'completion_tokens': 0,
'total_tokens': message_usage.get('input_tokens', 0),
}
elif event_type == 'message_delta':
delta_usage = event_data.get('usage', {})
if isinstance(delta_usage, dict):
prompt_tokens = 0
if isinstance(last_usage, dict):
prompt_tokens = last_usage.get('prompt_tokens', 0)
completion_tokens = delta_usage.get('output_tokens', 0)
last_usage = {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': prompt_tokens + completion_tokens,
}
if event_count < 10:
_dbg(
f'上游事件#{event_count} 类型={event_type} 数据='
+ json.dumps(event_data, ensure_ascii=False, default=str)[:500]
)
for chunk_str in converter.process_event(event_type, event_data):
try:
chunk_obj = json.loads(chunk_str)
chunk_obj['model'] = ctx.client_model
if isinstance(chunk_obj.get('usage'), dict):
last_usage = chunk_obj['usage']
chunk_str = json.dumps(chunk_obj, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
pass
client_chunks.append(chunk_str)
append_client_event(turn, {'type': 'chat_chunk', 'data': chunk_str})
if event_count < 10:
_dbg(f'返回片段#{event_count}={chunk_str[:500]}')
yield sse_data_message(chunk_str)
event_count += 1
_dbg(f'流式响应结束,共 {event_count} 个事件')
append_client_event(turn, {'type': 'done'})
yield sse_data_message('[DONE]')
usage_tracker.record(ctx.client_model, last_usage)
set_stream_summary(turn, {
'event_count': event_count,
'client_chunk_count': len(client_chunks),
'usage': last_usage,
})
attach_client_response(turn, {
'type': 'chat.completion.stream.summary',
'model': ctx.client_model,
'chunk_count': len(client_chunks),
'usage': last_usage,
})
duration_ms = int((perf_counter() - request_started_at) * 1000)
request_history.record(
route='chat',
client_model=ctx.client_model,
actual_model=ctx.upstream_model,
backend=ctx.backend,
upstream_url=url,
usage=last_usage,
duration_ms=duration_ms,
started_at=(turn or {}).get('started_at'),
)
finalize_turn(turn, usage=last_usage, duration_ms=duration_ms)
return sse_response(generate())
def _finalize_chat_response(
ctx: RouteContext,
data: dict[str, Any],
*,
turn: dict[str, Any] | None,
debug_label: str,
request_started_at: float,
upstream_url: str,
):
"""统一收尾非流式聊天补全响应。
三条后端链路最终都会回到 Chat Completions 格式,因此这里集中做:
- 回填给 Cursor 展示的模型名
- 输出统一调试日志
- 输出统一令牌统计日志
"""
data['model'] = ctx.client_model
_dbg(debug_label + '=' + json.dumps(data, ensure_ascii=False, default=str)[:1000])
log_usage('聊天补全', data.get('usage', {}), input_key='prompt_tokens', output_key='completion_tokens')
usage = data.get('usage')
duration_ms = int((perf_counter() - request_started_at) * 1000)
usage_tracker.record(ctx.client_model, usage)
request_history.record(
route='chat',
client_model=ctx.client_model,
actual_model=ctx.upstream_model,
backend=ctx.backend,
upstream_url=upstream_url,
usage=usage,
duration_ms=duration_ms,
started_at=(turn or {}).get('started_at'),
)
attach_client_response(turn, data)
finalize_turn(turn, usage=usage, duration_ms=duration_ms)
for choice in data.get('choices', []):
msg = choice.get('message', {})
if msg.get('reasoning_content'):
thinking_cache.store_from_response(
request.get_json(silent=True, force=True).get('messages', []),
msg['reasoning_content'],
)
break
return jsonify(data)
def _log_messages(payload: dict[str, Any]) -> None:
"""记录消息摘要,方便排查请求形态是否符合预期。"""
for index, message in enumerate(payload.get('messages', [])):
role = message.get('role', '?')
content = message.get('content')
extra = ''
if 'tool_calls' in message:
extra += f' 工具调用数={len(message["tool_calls"])}'
if message.get('tool_call_id'):
extra += f' 工具调用ID={message["tool_call_id"]}'
if isinstance(content, list):
content_info = f'列表[{len(content)}]'
elif isinstance(content, str):
content_info = f'文本[{len(content)}]'
else:
content_info = type(content).__name__
logger.info(' 消息[%s] 角色=%s 内容=%s%s', index, role, content_info, extra)