"""OpenAI 格式修复
这个模块专门处理 OpenAI Chat Completions 兼容层里的“脏活”:
- 请求方向:把 Cursor 发来的近似 OpenAI 格式修整成更标准的请求
- 响应方向:把上游返回的近似 OpenAI 格式修整成 Cursor 更容易消费的结果
这里之所以集中做兼容性修复,而不是散落在路由层,是因为这些规则本质上属于
“协议清洗”而不是“请求编排”。路由层只应该关心把请求送到哪里,修复规则则应该
在适配层统一收口,避免两条主链路各自维护一份类似逻辑。
"""
from __future__ import annotations
import json
import logging
from typing import Any
from utils.http import gen_id
from utils.think_tag import extract_from_text
from utils.tool_fixer import normalize_args, repair_str_replace_args
logger = logging.getLogger(__name__)
JsonDict = dict[str, Any]
# ─── 请求预处理 ───────────────────────────────────
def normalize_request(payload: JsonDict, upstream_model: str | None = None) -> JsonDict:
"""预处理 Cursor 发来的 OpenAI 风格请求。
这个函数只做“让请求更像标准 OpenAI CC”的整理,不负责路由或网络层决策。
当前处理的重点有两类:
1. Cursor 偶尔会在 CC 端点混入 Anthropic 风格内容块,需要先转回 OpenAI 语义。
2. 工具定义和 tool_choice 可能是 Cursor 的便捷写法,需要标准化后再发给上游。
"""
if upstream_model:
payload['model'] = upstream_model
if 'messages' in payload:
payload['messages'] = _convert_anthropic_messages(payload['messages'])
if 'tools' not in payload:
return payload
payload['tools'] = [_normalize_tool_definition(tool) for tool in payload['tools']]
_normalize_tool_choice(payload)
return payload
# ─── 消息兼容转换 ─────────────────────────────────
def _convert_anthropic_messages(messages: Any) -> Any:
"""将消息中的 Anthropic tool_use/tool_result 块转回 OpenAI 风格消息。
Cursor 在少数场景下会把 Anthropic 风格内容块直接发到
`/v1/chat/completions`。如果不在这里先转换,后续上游即使是 OpenAI 兼容接口,
也未必能理解这类内容块。
"""
if not isinstance(messages, list):
return messages
converted: list[JsonDict] = []
for message in messages:
converted.extend(_convert_single_message(message))
return converted
def _convert_single_message(message: Any) -> list[JsonDict]:
"""将单条消息转换为 1 条或多条 OpenAI 风格消息。"""
if not isinstance(message, dict):
return [message]
content = message.get('content')
if not isinstance(content, list):
return [message]
has_tool_use, has_tool_result = _detect_tool_blocks(content)
if not has_tool_use and not has_tool_result:
return [message]
role = message.get('role', '')
if role == 'assistant' and has_tool_use:
return [_convert_assistant_tool_use_message(content)]
if has_tool_result:
return _convert_tool_result_message(role, content)
return [message]
def _detect_tool_blocks(content: list[Any]) -> tuple[bool, bool]:
"""识别内容块里是否包含 Anthropic 风格工具调用或工具结果。"""
has_tool_use = any(
isinstance(block, dict) and block.get('type') == 'tool_use'
for block in content
)
has_tool_result = any(
isinstance(block, dict) and block.get('type') == 'tool_result'
for block in content
)
return has_tool_use, has_tool_result
def _convert_assistant_tool_use_message(content: list[Any]) -> JsonDict:
"""将 assistant 的 tool_use 内容块转为 OpenAI tool_calls。"""
text_parts: list[str] = []
tool_calls: list[JsonDict] = []
for block in content:
if not isinstance(block, dict):
continue
if block.get('type') == 'text':
text_parts.append(block.get('text', ''))
elif block.get('type') == 'tool_use':
tool_calls.append({
'id': block.get('id', gen_id('call_')),
'type': 'function',
'function': {
'name': block.get('name', ''),
'arguments': json.dumps(block.get('input', {}), ensure_ascii=False),
},
})
result: JsonDict = {
'role': 'assistant',
'content': '\n'.join(text_parts) if text_parts else None,
}
if tool_calls:
result['tool_calls'] = tool_calls
return result
def _convert_tool_result_message(role: str, content: list[Any]) -> list[JsonDict]:
"""将 tool_result 块拆成 OpenAI 的 tool 消息,并保留其余内容块。"""
converted: list[JsonDict] = []
other_parts: list[Any] = []
for block in content:
if not isinstance(block, dict):
continue
if block.get('type') == 'tool_result':
converted.append({
'role': 'tool',
'tool_call_id': block.get('tool_use_id', ''),
'content': _stringify_tool_result_content(block.get('content', '')),
})
else:
other_parts.append(block)
if other_parts:
converted.append({'role': role, 'content': other_parts})
return converted
def _stringify_tool_result_content(content: Any) -> str:
"""将 tool_result 的 content 规范为字符串。
OpenAI 的 tool 消息内容天然更偏向字符串;而 Anthropic 的 tool_result 允许列表块。
这里做一次降维,避免后续上游把结构化结果误当成普通消息块。
"""
if isinstance(content, str):
return content
if isinstance(content, list):
return '\n'.join(
block.get('text', '')
for block in content
if isinstance(block, dict) and block.get('type') == 'text'
)
return str(content)
def _normalize_tool_definition(tool: Any) -> Any:
"""将 Cursor 可能使用的扁平工具定义补成标准 OpenAI function tool。
这里不主动过滤未知字段,只做最小标准化,避免在兼容层里过早丢失调用方提供的
额外上下文。
"""
if not isinstance(tool, dict):
return tool
if tool.get('type') == 'function' and 'function' in tool:
return tool
if 'name' not in tool:
return tool
return {
'type': 'function',
'function': {
'name': tool.get('name', ''),
'description': tool.get('description', ''),
'parameters': (
tool.get('input_schema')
or tool.get('parameters')
or {'type': 'object', 'properties': {}}
),
},
}
def _normalize_tool_choice(payload: JsonDict) -> None:
"""规范化 tool_choice。
这里保留当前项目已有的映射约定:
- `{"type": "auto"}` → `"auto"`
- `{"type": "any"}` → `"required"`
这样做是因为部分上游只接受 OpenAI 常见的字符串写法,而不接受 Cursor/Anthropic
风格的对象写法。
"""
tool_choice = payload.get('tool_choice')
if not isinstance(tool_choice, dict):
return
if tool_choice.get('type') == 'auto':
payload['tool_choice'] = 'auto'
elif tool_choice.get('type') == 'any':
payload['tool_choice'] = 'required'
# ─── 非流式响应修复 ───────────────────────────────
def fix_response(data: Any) -> Any:
"""修复上游返回的非流式 OpenAI 响应。"""
if not isinstance(data, dict):
return data
for choice in data.get('choices') or []:
_fix_response_choice(choice)
return data
def _fix_response_choice(choice: Any) -> None:
"""修复单个非流式 choice。"""
if not isinstance(choice, dict):
return
message = choice.get('message') or {}
if not isinstance(message, dict):
return
_promote_reasoning_field(message)
_extract_reasoning_from_content(message)
_convert_legacy_message_function_call(message, choice)
_fix_tool_calls(message, choice)
def _promote_reasoning_field(container: JsonDict) -> None:
"""兼容不同上游返回的 reasoning 字段命名差异。"""
if 'reasoningContent' in container and 'reasoning_content' not in container:
container['reasoning_content'] = container.pop('reasoningContent')
def _extract_reasoning_from_content(message: JsonDict) -> None:
"""从 `...` 中提取 reasoning_content。
有些上游把思考内容直接塞进 content 字符串里,而不是单独返回 reasoning 字段。
这里主动提取,是为了让 Cursor 端更稳定地展示思考过程。
"""
content = message.get('content') or ''
if not isinstance(content, str):
return
if '' not in content or message.get('reasoning_content'):
return
cleaned, reasoning = extract_from_text(content)
if not reasoning:
return
message['reasoning_content'] = reasoning
message['content'] = cleaned
logger.info('已提取 标签内容并映射为 reasoning_content,长度=%s', len(reasoning))
def _convert_legacy_message_function_call(message: JsonDict, choice: JsonDict) -> None:
"""将旧版 function_call 字段升级为新版 tool_calls。"""
if 'function_call' not in message or 'tool_calls' in message:
return
function_call = message.pop('function_call') or {}
message['tool_calls'] = [{
'id': gen_id('call_'),
'type': 'function',
'function': {
'name': function_call.get('name', ''),
'arguments': function_call.get('arguments', '{}'),
},
}]
_rewrite_function_call_finish_reason(choice)
# ─── 流式 chunk 修复 ──────────────────────────────
def fix_stream_chunk(data: Any) -> Any:
"""修复上游返回的流式 OpenAI chunk。"""
if not isinstance(data, dict):
return data
for choice in data.get('choices') or []:
_fix_stream_choice(choice)
return data
def _fix_stream_choice(choice: Any) -> None:
"""修复单个流式 choice。"""
if not isinstance(choice, dict):
return
delta = choice.get('delta') or {}
if not isinstance(delta, dict):
return
_promote_reasoning_field(delta)
_convert_legacy_delta_function_call(delta, choice)
_sanitize_tool_call_deltas(delta)
_ensure_stream_tool_calls(delta)
_rewrite_function_call_finish_reason(choice)
def _convert_legacy_delta_function_call(delta: JsonDict, choice: JsonDict) -> None:
"""将流式旧版 function_call 增量升级为 tool_calls 增量。"""
if 'function_call' not in delta or 'tool_calls' in delta:
return
function_call = delta.pop('function_call') or {}
tool_call: JsonDict = {'index': 0, 'type': 'function', 'function': {}}
if 'name' in function_call:
tool_call['id'] = gen_id('call_')
tool_call['function']['name'] = function_call['name']
if 'arguments' in function_call:
tool_call['function']['arguments'] = function_call['arguments']
delta['tool_calls'] = [tool_call]
_rewrite_function_call_finish_reason(choice)
def _sanitize_tool_call_deltas(delta: JsonDict) -> None:
"""清理流式 tool_calls 中的空白字段。
某些 OpenAI 兼容提供商在后续 tool_calls chunk 中错误地发送空字符串的
id/type/function.name,导致 Cursor 用空值覆盖真实值。
不处理 function.arguments,因为空字符串是合法的增量拼接值。
"""
for tc in delta.get('tool_calls') or []:
if not isinstance(tc, dict):
continue
if 'id' in tc and not str(tc['id']).strip():
del tc['id']
if 'type' in tc and not str(tc['type']).strip():
del tc['type']
func = tc.get('function')
if isinstance(func, dict) and 'name' in func and not str(func['name']).strip():
del func['name']
def _ensure_stream_tool_calls(delta: JsonDict) -> None:
"""补全流式 tool_calls 的最小必需字段。
流式增量中的 tool_calls 往往是不完整片段,这里只补齐索引、ID、类型等元信息,
不主动改写 arguments 内容,避免破坏增量拼接语义。
"""
for tool_call in delta.get('tool_calls') or []:
if 'index' not in tool_call:
tool_call['index'] = 0
function_data = tool_call.get('function') or {}
if 'id' in tool_call or 'name' in function_data:
if not tool_call.get('id'):
tool_call['id'] = gen_id('call_')
if 'type' not in tool_call:
tool_call['type'] = 'function'
# ─── tool_calls 修复 ──────────────────────────────
def _fix_tool_calls(message: JsonDict, choice: JsonDict) -> None:
"""修复非流式消息中的 tool_calls 字段。"""
tool_calls = message.get('tool_calls')
if not tool_calls:
return
for index, tool_call in enumerate(tool_calls):
_fill_tool_call_metadata(tool_call, index=index)
_normalize_tool_call_arguments(tool_call)
if choice.get('finish_reason') not in ('tool_calls', 'function_call'):
choice['finish_reason'] = 'tool_calls'
def _fill_tool_call_metadata(tool_call: JsonDict, *, index: int) -> None:
"""补齐非流式 tool_call 的通用元数据。"""
if not tool_call.get('id'):
tool_call['id'] = gen_id('call_')
if 'index' not in tool_call:
tool_call['index'] = index
if tool_call.get('type') != 'function':
tool_call['type'] = 'function'
def _normalize_tool_call_arguments(tool_call: JsonDict) -> None:
"""规范化 tool_call 参数。
这里会顺带调用工具参数修复器,原因是很多兼容性问题不在协议层,而在工具参数本身:
比如 `file_path`/`path` 命名差异、智能引号、StrReplace 精确匹配失败等。
"""
function_data = tool_call.get('function') or {}
raw_arguments = function_data.get('arguments', '{}')
try:
arguments = (
json.loads(raw_arguments)
if isinstance(raw_arguments, str)
else (raw_arguments or {})
)
except json.JSONDecodeError:
arguments = {}
arguments = normalize_args(arguments)
arguments = repair_str_replace_args(function_data.get('name', ''), arguments)
function_data['arguments'] = json.dumps(arguments, ensure_ascii=False)
def _rewrite_function_call_finish_reason(choice: JsonDict) -> None:
"""将旧版 finish_reason=function_call 升级为 tool_calls。"""
if choice.get('finish_reason') == 'function_call':
choice['finish_reason'] = 'tool_calls'