初始化提交

This commit is contained in:
h88782481 2026-03-09 14:18:42 +08:00
commit 202731df74
28 changed files with 3140 additions and 0 deletions

31
.dockerignore Normal file
View file

@ -0,0 +1,31 @@
# 版本管理
.git/
.gitignore
# 环境变量
.env
.env.*
!.env.example
# Python 缓存
__pycache__/
*.pyc
*.pyo
*.pyd
# 虚拟环境
.venv/
venv/
# IDE
.vscode/
.idea/
.cursor/
# Docker 自身
Dockerfile
compose.yml
.dockerignore
# 文档
README.md

6
.env.example Normal file
View file

@ -0,0 +1,6 @@
PROXY_TARGET_URL=https://your-relay-station.com
PROXY_API_KEY=sk-xxx
PROXY_PORT=3029
API_TIMEOUT=300
ACCESS_API_KEY=your-access-key
DEBUG=false

20
.gitignore vendored Normal file
View file

@ -0,0 +1,20 @@
# Python 缓存
__pycache__/
*.pyc
*.pyo
*.pyd
# 环境变量
.env
# 虚拟环境
.venv/
venv/
# 运行时数据
data/
# IDE
.vscode/
.idea/
.cursor/

25
Dockerfile Normal file
View file

@ -0,0 +1,25 @@
# ---- 阶段 1: 安装依赖 ----
FROM python:3.13-alpine AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# ---- 阶段 2: 运行环境 ----
FROM python:3.13-alpine
WORKDIR /app
COPY --from=builder /install /usr/local
COPY *.py ./
COPY routes/ routes/
COPY adapters/ adapters/
COPY utils/ utils/
COPY static/ static/
RUN mkdir -p data
EXPOSE 3029
CMD ["python", "start.py"]

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

129
README.md Normal file
View file

@ -0,0 +1,129 @@
# API 2 Cursor
让 Cursor 通过第三方中转站使用任意 LLM 模型的 API 代理服务。
## 它解决什么问题
Cursor 根据模型名发送不同格式的请求:
| Cursor 模型名风格 | 请求格式 |
|---|---|
| `claude-sonnet-*``glm-*` | `/v1/chat/completions` (OpenAI CC) |
| `gpt-*``claude-opus-*` | `/v1/responses` (OpenAI Responses) |
而中转站通常只支持 `/v1/chat/completions``/v1/messages`
本项目在中间做协议转换,**不管 Cursor 发什么格式,都能正确转发到中转站;不管中转站返回什么格式,都让 Cursor 能正确接收**。
## 架构
```
Cursor API 2 Cursor 中转站
│ │ │
├─ /v1/chat/completions ──→ chat.py ─┬─ openai 后端 ────────→ /v1/chat/completions
│ └─ anthropic 后端 ────→ /v1/messages
│ │
├─ /v1/responses ──────→ responses.py → 转为 CC → 同上 → 转回 Responses
│ │
└─ /v1/messages ───────→ messages.py → 直接透传 ────────────→ /v1/messages
```
## 快速开始
### 直接运行
```bash
cd api2cursor
pip install -r requirements.txt
cp .env.example .env
# 编辑 .env 填入中转站地址和密钥
python start.py
```
### Docker 部署
```bash
cd api2cursor
cp .env.example .env
# 编辑 .env
docker compose up -d
```
服务启动后访问 `http://localhost:3029/admin` 进入管理面板。
## 配置
### 环境变量
| 变量 | 说明 | 默认值 |
|---|---|---|
| `PROXY_TARGET_URL` | 上游中转站地址 | `https://api.anthropic.com` |
| `PROXY_API_KEY` | 上游 API 密钥 | |
| `PROXY_PORT` | 服务监听端口 | `3029` |
| `API_TIMEOUT` | 请求超时(秒) | `300` |
| `ACCESS_API_KEY` | 访问鉴权密钥,留空不启用 | |
| `DEBUG` | 调试模式,输出详细请求/响应日志 | `false` |
### 模型映射
在管理面板 (`/admin`) 中配置模型映射:
- **Cursor 模型名** — 在 Cursor 自定义模型中填入的名称
- **上游模型名** — 发送到中转站的实际模型名
- **后端类型**`openai` (CC 格式) / `anthropic` (Messages 格式) / `auto` (自动检测)
- **自定义地址/密钥** — 可选,覆盖全局设置,实现分流到不同中转站
**示例**:在 Cursor 中添加 `claude-sonnet-4-5-20250929`,映射到上游 `gpt-5.3-codex`,后端选 `openai`。Cursor 会用 CC 格式发送请求,代理直接转发到中转站的 `/v1/chat/completions`
> **提示**:使用 Claude 风格的模型名(如 `claude-sonnet-4-5-20250929`)可以让 Cursor 显示思考过程thinking
### 在 Cursor 中配置
1. 打开 Cursor 设置 → Models
2. 添加自定义模型,名称填映射中配置的 Cursor 模型名
3. Override OpenAI Base URL 填 `http://localhost:3029`
4. API Key 填 `ACCESS_API_KEY` 的值(未配置则随意填)
## 项目结构
```
api2cursor/
├── start.py # 启动入口
├── app.py # Flask 应用工厂
├── config.py # 环境变量配置
├── settings.py # 持久化配置管理
├── routes/ # 路由层
│ ├── chat.py # /v1/chat/completions
│ ├── responses.py # /v1/responses
│ ├── messages.py # /v1/messages (透传)
│ └── admin.py # 管理面板 + API
├── adapters/ # 适配层(格式转换)
│ ├── openai_anthropic.py# CC ↔ Messages 双向转换
│ ├── openai_fixer.py # OpenAI 请求/响应修复
│ └── responses_adapter.py# Responses ↔ CC 双向转换
├── utils/ # 工具层
│ ├── http.py # 请求转发、SSE 解析
│ ├── tool_fixer.py # 工具参数修复
│ └── think_tag.py # <think> 标签提取
└── static/ # 管理面板前端
├── admin.html
├── admin.css
└── admin.js
```
## 兼容性修复
代理自动处理以下兼容性问题:
- Cursor 扁平格式 tools → 标准 OpenAI 嵌套格式
- `reasoningContent``reasoning_content`
- `<think>` 标签 → `reasoning_content`
- 旧版 `function_call` → 新版 `tool_calls`
- `tool_calls` 缺失 `id` / `index` / `type` 字段补全
- 智能引号 → 普通引号StrReplace 工具精确匹配修复)
- `file_path``path` 字段映射
- `finish_reason` 修正
## 许可证
[MIT](LICENSE)

0
adapters/__init__.py Normal file
View file

View file

@ -0,0 +1,350 @@
"""OpenAI Chat Completions ↔ Anthropic Messages 格式转换
请求方向: CC MessagesCursor CC 请求转为 Anthropic 格式发给上游
响应方向: Messages CC上游 Anthropic 响应转为 CC 格式返回给 Cursor
包含非流式和流式两种转换
"""
import json
import uuid
import logging
from utils.tool_fixer import normalize_args, repair_str_replace_args, fix_anthropic_tool_use
from utils.http import gen_id
logger = logging.getLogger(__name__)
# Anthropic stop_reason → OpenAI finish_reason
_STOP_REASON_MAP = {
'end_turn': 'stop',
'max_tokens': 'length',
'tool_use': 'tool_calls',
'stop_sequence': 'stop',
}
# ═══════════════════════════════════════════════════════════
# 请求转换: CC → Messages
# ═══════════════════════════════════════════════════════════
def cc_to_messages_request(payload):
"""将 OpenAI CC 格式请求转换为 Anthropic Messages 格式"""
messages = payload.get('messages', [])
anthropic_msgs = []
system_parts = []
for msg in messages:
role = msg.get('role', '')
content = msg.get('content', '')
# system 消息提取到顶层
if role == 'system':
system_parts.append(_flatten_text(content))
continue
anthropic_role = 'assistant' if role == 'assistant' else 'user'
anthropic_content = _convert_content(msg)
# assistant 的 tool_calls → tool_use content blocks
if role == 'assistant' and 'tool_calls' in msg:
blocks = _to_blocks(anthropic_content)
for tc in msg['tool_calls']:
func = tc.get('function', {})
arguments = func.get('arguments', '{}')
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError:
arguments = {}
blocks.append({
'type': 'tool_use',
'id': tc.get('id', f'toolu_{uuid.uuid4().hex[:24]}'),
'name': func.get('name', ''),
'input': arguments,
})
anthropic_content = blocks
# tool 角色 → user + tool_result
if role == 'tool':
text = content if isinstance(content, str) else json.dumps(content)
anthropic_content = [{
'type': 'tool_result',
'tool_use_id': msg.get('tool_call_id', ''),
'content': text,
}]
anthropic_role = 'user'
if not anthropic_content and anthropic_content != 0:
continue
anthropic_msgs.append({'role': anthropic_role, 'content': anthropic_content})
# Anthropic 要求角色必须交替
anthropic_msgs = _merge_same_role(anthropic_msgs)
result = {
'model': payload.get('model', 'claude-sonnet-4-20250514'),
'messages': anthropic_msgs,
'max_tokens': max(payload.get('max_tokens') or 8192, 8192),
}
if system_parts:
result['system'] = '\n\n'.join(system_parts)
if 'tools' in payload:
result['tools'] = _convert_tools(payload['tools'])
for key in ('temperature', 'top_p', 'stream'):
if key in payload:
result[key] = payload[key]
return result
# ═══════════════════════════════════════════════════════════
# 非流式响应转换: Messages → CC
# ═══════════════════════════════════════════════════════════
def messages_to_cc_response(data, request_id=None):
"""将 Anthropic Messages 响应转换为 OpenAI CC 格式"""
request_id = request_id or gen_id('chatcmpl-')
data = fix_anthropic_tool_use(data)
content_text = ''
reasoning = ''
tool_calls = []
for block in data.get('content', []):
if not isinstance(block, dict):
continue
btype = block.get('type', '')
if btype == 'text':
content_text += block.get('text', '')
elif btype == 'thinking':
reasoning += block.get('thinking', '')
elif btype == 'tool_use':
args = block.get('input', {})
if isinstance(args, dict):
args = normalize_args(args)
args = repair_str_replace_args(block.get('name', ''), args)
tool_calls.append({
'index': len(tool_calls),
'id': block.get('id', f'toolu_{uuid.uuid4().hex[:24]}'),
'type': 'function',
'function': {
'name': block.get('name', ''),
'arguments': json.dumps(args, ensure_ascii=False) if isinstance(args, dict) else str(args),
},
})
stop_reason = data.get('stop_reason', 'end_turn')
message = {'role': 'assistant', 'content': content_text or None}
if reasoning:
message['reasoning_content'] = reasoning
if tool_calls:
message['tool_calls'] = tool_calls
usage = data.get('usage', {})
return {
'id': request_id,
'object': 'chat.completion',
'model': data.get('model', 'claude'),
'choices': [{
'index': 0,
'message': message,
'finish_reason': _STOP_REASON_MAP.get(stop_reason, 'stop'),
}],
'usage': {
'prompt_tokens': usage.get('input_tokens', 0),
'completion_tokens': usage.get('output_tokens', 0),
'total_tokens': usage.get('input_tokens', 0) + usage.get('output_tokens', 0),
},
}
# ═══════════════════════════════════════════════════════════
# 流式响应转换: Anthropic SSE → CC chunks
# ═══════════════════════════════════════════════════════════
class AnthropicStreamConverter:
"""将 Anthropic SSE 事件逐个转换为 OpenAI CC 流式 chunk"""
def __init__(self, request_id=None):
self._id = request_id or gen_id('chatcmpl-')
self._tool_index = -1
self._input_tokens = 0
self._output_tokens = 0
def process_event(self, event_type, event_data):
"""处理一个 Anthropic SSE 事件,返回 CC chunk JSON 字符串列表"""
chunks = []
if event_type == 'message_start':
msg = event_data.get('message', {})
self._input_tokens = msg.get('usage', {}).get('input_tokens', 0)
chunk = self._make_chunk(delta={'role': 'assistant', 'content': ''})
if msg.get('model'):
chunk['model'] = msg['model']
chunks.append(json.dumps(chunk))
elif event_type == 'content_block_start':
block = event_data.get('content_block', {})
if block.get('type') == 'tool_use':
self._tool_index += 1
chunks.append(json.dumps(self._make_chunk(delta={
'tool_calls': [{
'index': self._tool_index,
'id': block.get('id', f'toolu_{uuid.uuid4().hex[:24]}'),
'type': 'function',
'function': {'name': block.get('name', ''), 'arguments': ''},
}]
})))
elif event_type == 'content_block_delta':
delta = event_data.get('delta', {})
dtype = delta.get('type', '')
if dtype == 'text_delta' and delta.get('text'):
chunks.append(json.dumps(self._make_chunk(
delta={'content': delta['text']})))
elif dtype == 'thinking_delta' and delta.get('thinking'):
chunks.append(json.dumps(self._make_chunk(
delta={'reasoning_content': delta['thinking']})))
elif dtype == 'input_json_delta' and delta.get('partial_json'):
chunks.append(json.dumps(self._make_chunk(delta={
'tool_calls': [{
'index': self._tool_index,
'function': {'arguments': delta['partial_json']},
}]
})))
elif event_type == 'message_delta':
delta = event_data.get('delta', {})
usage = event_data.get('usage', {})
self._output_tokens = usage.get('output_tokens', 0)
finish = _STOP_REASON_MAP.get(delta.get('stop_reason', ''), 'stop')
chunk = self._make_chunk(delta={}, finish_reason=finish)
chunk['usage'] = {
'prompt_tokens': self._input_tokens,
'completion_tokens': self._output_tokens,
'total_tokens': self._input_tokens + self._output_tokens,
}
chunks.append(json.dumps(chunk))
return chunks
def _make_chunk(self, delta, finish_reason=None):
choice = {'index': 0, 'delta': delta}
if finish_reason:
choice['finish_reason'] = finish_reason
return {
'id': self._id,
'object': 'chat.completion.chunk',
'model': 'claude',
'choices': [choice],
}
# ═══════════════════════════════════════════════════════════
# 内部辅助函数
# ═══════════════════════════════════════════════════════════
def _flatten_text(content):
"""将 content 扁平化为纯文本"""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, str):
parts.append(p)
elif isinstance(p, dict) and p.get('type') == 'text':
parts.append(p.get('text', ''))
return '\n'.join(parts)
return str(content)
def _convert_content(msg):
"""将 OpenAI 消息的 content 字段转为 Anthropic 格式"""
content = msg.get('content', '')
if content is None:
return ''
if isinstance(content, str):
return content
if isinstance(content, list):
blocks = []
for part in content:
if isinstance(part, str):
blocks.append({'type': 'text', 'text': part})
elif isinstance(part, dict):
ptype = part.get('type', '')
if ptype == 'text':
blocks.append({'type': 'text', 'text': part.get('text', '')})
elif ptype == 'image_url':
blocks.append(_convert_image(part))
elif ptype in ('tool_use', 'tool_result'):
blocks.append(part)
return blocks
return str(content)
def _convert_image(part):
"""将 OpenAI image_url 格式转为 Anthropic image 格式"""
url_data = part.get('image_url', {})
url = url_data.get('url', '') if isinstance(url_data, dict) else str(url_data)
if url.startswith('data:'):
media_type, _, b64 = url.partition(';base64,')
return {
'type': 'image',
'source': {
'type': 'base64',
'media_type': media_type.replace('data:', '') or 'image/png',
'data': b64,
},
}
return {'type': 'image', 'source': {'type': 'url', 'url': url}}
def _convert_tools(tools):
"""将 OpenAI tools 转为 Anthropic tools 格式(兼容 Cursor 扁平格式)"""
result = []
for tool in tools:
if tool.get('type') == 'function' and 'function' in tool:
func = tool['function']
result.append({
'name': func.get('name', ''),
'description': func.get('description', ''),
'input_schema': func.get('parameters', {'type': 'object', 'properties': {}}),
})
elif 'name' in tool and 'input_schema' in tool:
result.append({
'name': tool.get('name', ''),
'description': tool.get('description', ''),
'input_schema': tool.get('input_schema', {'type': 'object', 'properties': {}}),
})
return result
def _to_blocks(content):
"""将 content 统一转为 blocks 列表"""
if isinstance(content, str):
return [{'type': 'text', 'text': content}] if content else []
if isinstance(content, list):
return list(content)
return [{'type': 'text', 'text': str(content)}] if content else []
def _merge_same_role(messages):
"""合并相邻同角色消息Anthropic 要求角色必须交替)"""
if not messages:
return messages
merged = [messages[0]]
for msg in messages[1:]:
if msg['role'] == merged[-1]['role']:
prev = _to_blocks(merged[-1]['content'])
curr = _to_blocks(msg['content'])
merged[-1]['content'] = prev + curr
else:
merged.append(msg)
return merged

267
adapters/openai_fixer.py Normal file
View file

@ -0,0 +1,267 @@
"""OpenAI 格式修复
修复 Cursor 发出的 OpenAI 格式请求和上游返回的响应中的各种兼容性问题
请求修复: Cursor 扁平格式 tools 标准嵌套格式, tool_choice 规范化
响应修复: reasoningContent reasoning_content, <think> 标签提取,
function_call tool_calls, tool_calls 字段补全, 参数修复
"""
import json
import logging
from utils.http import gen_id
from utils.tool_fixer import normalize_args, repair_str_replace_args
from utils.think_tag import extract_from_text
logger = logging.getLogger(__name__)
# ─── 请求预处理 ───────────────────────────────────
def normalize_request(payload, upstream_model=None):
"""预处理 Cursor 发来的 OpenAI 格式请求"""
if upstream_model:
payload['model'] = upstream_model
# Cursor 可能在 CC 端点发送 Anthropic 格式的 tool_use/tool_result 消息
if 'messages' in payload:
payload['messages'] = _convert_anthropic_messages(payload['messages'])
if 'tools' not in payload:
return payload
# 修复 Cursor 可能发出的扁平格式 tools
normalized = []
for tool in payload['tools']:
if tool.get('type') == 'function' and 'function' in tool:
normalized.append(tool)
elif 'name' in tool:
normalized.append({
'type': 'function',
'function': {
'name': tool.get('name', ''),
'description': tool.get('description', ''),
'parameters': tool.get('input_schema')
or tool.get('parameters')
or {'type': 'object', 'properties': {}},
},
})
else:
normalized.append(tool)
payload['tools'] = normalized
# tool_choice 规范化
tc = payload.get('tool_choice')
if isinstance(tc, dict):
if tc.get('type') == 'auto':
payload['tool_choice'] = 'auto'
elif tc.get('type') == 'any':
payload['tool_choice'] = 'required'
return payload
def _convert_anthropic_messages(messages):
"""将消息中的 Anthropic 格式 tool_use/tool_result 转为 OpenAI 格式
Cursor 有时在 CC 端点中发送 Anthropic 风格的内容块
assistant: [{"type":"tool_use", "id":"...", "name":"Read", "input":{...}}]
user: [{"type":"tool_result", "tool_use_id":"...", "content":[...]}]
OpenAI 格式应为
assistant: {"tool_calls":[{"id":"...", "function":{"name":"Read","arguments":"..."}}]}
tool: {"tool_call_id":"...", "content":"..."}
"""
converted = []
for msg in messages:
content = msg.get('content')
if not isinstance(content, list):
converted.append(msg)
continue
has_tool_use = any(
isinstance(b, dict) and b.get('type') == 'tool_use' for b in content
)
has_tool_result = any(
isinstance(b, dict) and b.get('type') == 'tool_result' for b in content
)
if not has_tool_use and not has_tool_result:
converted.append(msg)
continue
role = msg.get('role', '')
if role == 'assistant' and has_tool_use:
text_parts = []
tool_calls = []
for block in content:
if not isinstance(block, dict):
continue
if block.get('type') == 'text':
text_parts.append(block.get('text', ''))
elif block.get('type') == 'tool_use':
tool_calls.append({
'id': block.get('id', gen_id('call_')),
'type': 'function',
'function': {
'name': block.get('name', ''),
'arguments': json.dumps(
block.get('input', {}), ensure_ascii=False
),
},
})
new_msg = {'role': 'assistant'}
new_msg['content'] = '\n'.join(text_parts) if text_parts else None
if tool_calls:
new_msg['tool_calls'] = tool_calls
converted.append(new_msg)
elif has_tool_result:
other_parts = []
for block in content:
if not isinstance(block, dict):
continue
if block.get('type') == 'tool_result':
rc = block.get('content', '')
if isinstance(rc, list):
rc = '\n'.join(
b.get('text', '') for b in rc
if isinstance(b, dict) and b.get('type') == 'text'
)
elif not isinstance(rc, str):
rc = str(rc)
converted.append({
'role': 'tool',
'tool_call_id': block.get('tool_use_id', ''),
'content': rc,
})
else:
other_parts.append(block)
if other_parts:
converted.append({'role': role, 'content': other_parts})
else:
converted.append(msg)
return converted
# ─── 非流式响应修复 ───────────────────────────────
def fix_response(data):
"""修复上游返回的非流式 OpenAI 响应"""
if not isinstance(data, dict):
return data
for choice in (data.get('choices') or []):
msg = choice.get('message') or {}
# reasoningContent → reasoning_content
if 'reasoningContent' in msg and 'reasoning_content' not in msg:
msg['reasoning_content'] = msg.pop('reasoningContent')
# <think> 标签 → reasoning_content
content = msg.get('content') or ''
if isinstance(content, str) and '<think>' in content and not msg.get('reasoning_content'):
cleaned, reasoning = extract_from_text(content)
if reasoning:
msg['reasoning_content'] = reasoning
msg['content'] = cleaned
logger.info(f'提取 <think> 标签 → reasoning_content ({len(reasoning)} 字符)')
# 旧版 function_call → 新版 tool_calls
if 'function_call' in msg and 'tool_calls' not in msg:
fc = msg.pop('function_call')
msg['tool_calls'] = [{
'id': gen_id('call_'),
'type': 'function',
'function': {
'name': fc.get('name', ''),
'arguments': fc.get('arguments', '{}'),
},
}]
if choice.get('finish_reason') == 'function_call':
choice['finish_reason'] = 'tool_calls'
# 修复 tool_calls 字段
_fix_tool_calls(msg, choice)
return data
# ─── 流式 chunk 修复 ──────────────────────────────
def fix_stream_chunk(data):
"""修复上游返回的流式 OpenAI chunk"""
if not isinstance(data, dict):
return data
for choice in (data.get('choices') or []):
delta = choice.get('delta') or {}
# reasoningContent → reasoning_content
if 'reasoningContent' in delta and 'reasoning_content' not in delta:
delta['reasoning_content'] = delta.pop('reasoningContent')
# 旧版 function_call → tool_calls
if 'function_call' in delta and 'tool_calls' not in delta:
fc = delta.pop('function_call')
tc = {'index': 0, 'type': 'function', 'function': {}}
if 'name' in fc:
tc['id'] = gen_id('call_')
tc['function']['name'] = fc['name']
if 'arguments' in fc:
tc['function']['arguments'] = fc['arguments']
delta['tool_calls'] = [tc]
if choice.get('finish_reason') == 'function_call':
choice['finish_reason'] = 'tool_calls'
# 补全 tool_calls 字段
for tc in (delta.get('tool_calls') or []):
if 'index' not in tc:
tc['index'] = 0
func = tc.get('function') or {}
if 'id' in tc or 'name' in func:
if not tc.get('id'):
tc['id'] = gen_id('call_')
if 'type' not in tc:
tc['type'] = 'function'
if choice.get('finish_reason') == 'function_call':
choice['finish_reason'] = 'tool_calls'
return data
# ─── 内部辅助 ─────────────────────────────────────
def _fix_tool_calls(msg, choice):
"""修复消息中的 tool_calls 字段"""
tool_calls = msg.get('tool_calls')
if not tool_calls:
return
for i, tc in enumerate(tool_calls):
if not tc.get('id'):
tc['id'] = gen_id('call_')
if 'index' not in tc:
tc['index'] = i
if tc.get('type') != 'function':
tc['type'] = 'function'
func = tc.get('function', {})
args_raw = func.get('arguments', '{}')
try:
args = json.loads(args_raw) if isinstance(args_raw, str) else (args_raw or {})
except json.JSONDecodeError:
args = {}
args = normalize_args(args)
args = repair_str_replace_args(func.get('name', ''), args)
func['arguments'] = json.dumps(args, ensure_ascii=False)
if choice.get('finish_reason') not in ('tool_calls', 'function_call'):
choice['finish_reason'] = 'tool_calls'

View file

@ -0,0 +1,533 @@
"""Responses API 适配
Cursor GPT/Claude-Opus 等模型使用 /v1/responses 格式
本模块将 Responses 格式与 Chat Completions 格式互相转换
请求: Responses CC
响应: CC Responses非流式 + 流式
流式: 支持从 CC chunks Anthropic SSE 事件直接转换
"""
import json
import logging
from utils.http import gen_id
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════
# 请求转换: Responses → CC
# ═══════════════════════════════════════════════════════════
def responses_to_cc(payload):
"""将 /v1/responses 请求转换为 /v1/chat/completions 格式"""
messages = []
if payload.get('instructions'):
messages.append({'role': 'system', 'content': payload['instructions']})
input_data = payload.get('input', [])
if isinstance(input_data, str):
messages.append({'role': 'user', 'content': input_data})
elif isinstance(input_data, list):
_convert_input_items(input_data, messages)
result = {
'model': payload.get('model', ''),
'messages': messages,
'stream': payload.get('stream', False),
}
if 'tools' in payload:
result['tools'] = _convert_tools(payload['tools'])
for key in ('temperature', 'top_p'):
if key in payload:
result[key] = payload[key]
if 'max_output_tokens' in payload:
result['max_tokens'] = payload['max_output_tokens']
if 'tool_choice' in payload:
result['tool_choice'] = payload['tool_choice']
return result
# ═══════════════════════════════════════════════════════════
# 非流式响应转换: CC → Responses
# ═══════════════════════════════════════════════════════════
def cc_to_responses(cc_resp, model=''):
"""将 CC 响应转换为 Responses 格式"""
choice = (cc_resp.get('choices') or [{}])[0]
msg = choice.get('message') or {}
finish = choice.get('finish_reason', 'stop')
output = []
if msg.get('reasoning_content'):
output.append({
'type': 'reasoning',
'id': gen_id('rs_'),
'summary': [{'type': 'summary_text', 'text': msg['reasoning_content']}],
})
if msg.get('content'):
output.append({
'type': 'message',
'id': gen_id('msg_'),
'status': 'completed',
'role': 'assistant',
'content': [{'type': 'output_text', 'text': msg['content']}],
})
for tc in (msg.get('tool_calls') or []):
func = tc.get('function') or {}
output.append({
'type': 'function_call',
'id': gen_id('fc_'),
'status': 'completed',
'call_id': tc.get('id', gen_id('call_')),
'name': func.get('name', ''),
'arguments': func.get('arguments', '{}'),
})
usage = cc_resp.get('usage', {})
return {
'id': cc_resp.get('id', gen_id('resp_')),
'object': 'response',
'status': 'incomplete' if finish == 'length' else 'completed',
'model': model or cc_resp.get('model', ''),
'output': output,
'usage': {
'input_tokens': usage.get('prompt_tokens', 0),
'output_tokens': usage.get('completion_tokens', 0),
'total_tokens': usage.get('total_tokens', 0),
},
}
# ═══════════════════════════════════════════════════════════
# 流式转换器: CC chunks / Anthropic SSE → Responses SSE
# ═══════════════════════════════════════════════════════════
class ResponsesStreamConverter:
"""有状态转换器:将 CC 流式 chunk 或 Anthropic SSE 事件转为 Responses SSE 事件"""
def __init__(self, response_id=None, model=''):
self.resp_id = response_id or gen_id('resp_')
self.model = model
# 思考内容缓冲
self._rs_buf = ''
self._rs_started = False
self._rs_closed = False
self._rs_id = gen_id('rs_')
# 文本内容缓冲
self._text_buf = ''
self._text_started = False
self._text_closed = False
self._msg_id = gen_id('msg_')
# 工具调用缓冲 {index: {name, args, call_id, fc_id}}
self._tools = {}
self._output_items = []
self._finished = False
self._input_tokens = 0
# ─── 公开接口 ─────────────────────────────────
def start_events(self):
"""生成流开始事件"""
return [self._sse('response.created', {
'id': self.resp_id, 'object': 'response',
'status': 'in_progress', 'model': self.model, 'output': [],
})]
def process_cc_chunk(self, chunk):
"""处理 CC 格式的流式 chunk返回 Responses SSE 事件列表"""
events = []
for choice in (chunk.get('choices') or []):
delta = choice.get('delta') or {}
finish = choice.get('finish_reason')
if delta.get('reasoning_content'):
events.extend(self._on_reasoning(delta['reasoning_content']))
if delta.get('content') is not None and delta['content'] != '':
events.extend(self._on_text(delta['content']))
for tc in (delta.get('tool_calls') or []):
events.extend(self._on_tool_call(tc))
if finish and not self._finished:
self._finished = True
events.extend(self._do_finish(finish, chunk.get('usage')))
return events
def process_anthropic_event(self, event_type, event_data):
"""直接处理 Anthropic SSE 事件(跳过 CC 中间转换,更高效)"""
events = []
if event_type == 'message_start':
usage = event_data.get('message', {}).get('usage', {})
self._input_tokens = usage.get('input_tokens', 0)
elif event_type == 'content_block_start':
block = event_data.get('content_block', {})
btype = block.get('type', '')
if btype == 'thinking' and not self._rs_started:
self._rs_started = True
events.append(self._sse('response.output_item.added', {
'type': 'reasoning', 'id': self._rs_id, 'summary': [],
}))
elif btype == 'text':
events.extend(self._ensure_text_started())
elif btype == 'tool_use':
events.extend(self._start_tool_from_block(block))
elif event_type == 'content_block_delta':
delta = event_data.get('delta', {})
dtype = delta.get('type', '')
if dtype == 'thinking_delta' and delta.get('thinking'):
self._rs_buf += delta['thinking']
events.append(self._sse('response.reasoning_summary_text.delta', {
'type': 'summary_text', 'delta': delta['thinking'],
}))
elif dtype == 'text_delta' and delta.get('text'):
self._text_buf += delta['text']
events.append(self._sse('response.output_text.delta', {
'type': 'output_text', 'delta': delta['text'],
}))
elif dtype == 'input_json_delta' and delta.get('partial_json') and self._tools:
idx = max(self._tools.keys())
self._tools[idx]['args'] += delta['partial_json']
events.append(self._sse('response.function_call_arguments.delta', {
'type': 'function_call', 'delta': delta['partial_json'],
}))
elif event_type == 'message_delta':
delta = event_data.get('delta', {})
stop = delta.get('stop_reason', 'end_turn')
usage = event_data.get('usage', {})
finish = {'tool_use': 'tool_calls', 'max_tokens': 'length'}.get(stop, 'stop')
if not self._finished:
self._finished = True
u = {
'input_tokens': self._input_tokens,
'output_tokens': usage.get('output_tokens', 0),
'total_tokens': self._input_tokens + usage.get('output_tokens', 0),
}
events.extend(self._do_finish(finish, u))
return events
def finalize(self):
"""流结束时补发未关闭的事件"""
if self._finished:
return []
self._finished = True
return self._do_finish('stop', None)
# ─── 内部事件处理 ─────────────────────────────
def _on_reasoning(self, text):
"""处理思考内容 delta"""
events = []
if not self._rs_started:
self._rs_started = True
events.append(self._sse('response.output_item.added', {
'type': 'reasoning', 'id': self._rs_id, 'summary': [],
}))
self._rs_buf += text
events.append(self._sse('response.reasoning_summary_text.delta', {
'type': 'summary_text', 'delta': text,
}))
return events
def _on_text(self, text):
"""处理文本内容 delta"""
events = self._ensure_text_started()
self._text_buf += text
events.append(self._sse('response.output_text.delta', {
'type': 'output_text', 'delta': text,
}))
return events
def _on_tool_call(self, tc):
"""处理工具调用 delta"""
events = []
idx = tc.get('index', 0)
func = tc.get('function') or {}
if idx not in self._tools:
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if self._text_started and not self._text_closed:
events.extend(self._close_text())
call_id = tc.get('id', gen_id('call_'))
name = func.get('name', '')
fc_id = gen_id('fc_')
self._tools[idx] = {'name': name, 'args': '', 'call_id': call_id, 'fc_id': fc_id}
events.append(self._sse('response.output_item.added', {
'type': 'function_call', 'id': fc_id,
'status': 'in_progress', 'call_id': call_id,
'name': name, 'arguments': '',
}))
if func.get('name'):
self._tools[idx]['name'] = func['name']
if func.get('arguments', ''):
self._tools[idx]['args'] += func['arguments']
events.append(self._sse('response.function_call_arguments.delta', {
'type': 'function_call', 'delta': func['arguments'],
}))
return events
def _ensure_text_started(self):
"""确保文本输出项已开始"""
events = []
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if not self._text_started:
self._text_started = True
events.append(self._sse('response.output_item.added', {
'type': 'message', 'id': self._msg_id,
'status': 'in_progress', 'role': 'assistant', 'content': [],
}))
events.append(self._sse('response.content_part.added', {
'type': 'output_text', 'text': '',
}))
return events
def _start_tool_from_block(self, block):
"""从 Anthropic tool_use block 开始新的工具调用"""
events = []
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if self._text_started and not self._text_closed:
events.extend(self._close_text())
idx = len(self._tools)
tool_id = block.get('id', gen_id('toolu_'))
name = block.get('name', '')
fc_id = gen_id('fc_')
self._tools[idx] = {'name': name, 'args': '', 'call_id': tool_id, 'fc_id': fc_id}
events.append(self._sse('response.output_item.added', {
'type': 'function_call', 'id': fc_id,
'status': 'in_progress', 'call_id': tool_id,
'name': name, 'arguments': '',
}))
return events
# ─── 关闭/结束事件 ────────────────────────────
def _close_reasoning(self):
if self._rs_closed:
return []
self._rs_closed = True
rs = {
'type': 'reasoning', 'id': self._rs_id,
'summary': [{'type': 'summary_text', 'text': self._rs_buf}],
}
self._output_items.append(rs)
return [
self._sse('response.reasoning_summary_text.done', {
'type': 'summary_text', 'text': self._rs_buf,
}),
self._sse('response.output_item.done', rs),
]
def _close_text(self):
if self._text_closed:
return []
self._text_closed = True
msg = {
'type': 'message', 'id': self._msg_id,
'status': 'completed', 'role': 'assistant',
'content': [{'type': 'output_text', 'text': self._text_buf}],
}
self._output_items.append(msg)
return [
self._sse('response.output_text.done', {'type': 'output_text', 'text': self._text_buf}),
self._sse('response.output_item.done', msg),
]
def _do_finish(self, finish_reason, usage):
"""生成流结束的所有关闭事件"""
events = []
if self._rs_started and not self._rs_closed:
events.extend(self._close_reasoning())
if self._text_started and not self._text_closed:
events.extend(self._close_text())
for idx in sorted(self._tools.keys()):
buf = self._tools[idx]
events.append(self._sse('response.function_call_arguments.done', {
'type': 'function_call', 'arguments': buf['args'],
}))
fc = {
'type': 'function_call', 'id': buf['fc_id'],
'status': 'completed', 'call_id': buf['call_id'],
'name': buf['name'], 'arguments': buf['args'],
}
events.append(self._sse('response.output_item.done', fc))
self._output_items.append(fc)
usage_data = usage if isinstance(usage, dict) else {}
events.append(self._sse('response.completed', {
'id': self.resp_id, 'object': 'response',
'status': 'incomplete' if finish_reason == 'length' else 'completed',
'model': self.model, 'output': self._output_items, 'usage': usage_data,
}))
return events
def _sse(self, event_type, data):
"""构建 SSE 事件字符串"""
return f'event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n'
# ═══════════════════════════════════════════════════════════
# 内部辅助函数
# ═══════════════════════════════════════════════════════════
def _convert_input_items(items, messages):
"""将 Responses input 数组转换为 CC messages"""
i = 0
while i < len(items):
item = items[i]
if isinstance(item, str):
messages.append({'role': 'user', 'content': item})
i += 1
continue
if not isinstance(item, dict):
i += 1
continue
item_type = item.get('type', '')
role = item.get('role', '')
# 简单角色消息(无 type 字段)
if role and not item_type:
content = item.get('content', '')
if isinstance(content, list):
content = _extract_text(content)
messages.append({'role': role, 'content': content or ''})
i += 1
continue
# Responses message 对象
if item_type == 'message' or (role and not item_type):
role = item.get('role', 'assistant')
content = _extract_text(item.get('content', []))
msg = {'role': role, 'content': content or ''}
if role == 'assistant':
tool_calls, consumed = _collect_function_calls(items, i + 1)
if tool_calls:
msg['tool_calls'] = tool_calls
if not msg['content']:
msg['content'] = None
messages.append(msg)
i += 1 + consumed
continue
messages.append(msg)
i += 1
continue
# function_call工具调用
if item_type == 'function_call':
tc = {
'id': item.get('call_id') or gen_id('call_'),
'type': 'function',
'function': {
'name': item.get('name', ''),
'arguments': item.get('arguments', '{}'),
},
}
if messages and messages[-1]['role'] == 'assistant':
messages[-1].setdefault('tool_calls', []).append(tc)
if not messages[-1].get('content'):
messages[-1]['content'] = None
else:
messages.append({'role': 'assistant', 'content': None, 'tool_calls': [tc]})
i += 1
continue
# function_call_output工具结果
if item_type == 'function_call_output':
output = item.get('output', '')
if not isinstance(output, str):
output = json.dumps(output, ensure_ascii=False)
messages.append({
'role': 'tool',
'tool_call_id': item.get('call_id', ''),
'content': output,
})
i += 1
continue
if role:
messages.append({'role': role, 'content': str(item.get('content', ''))})
i += 1
def _collect_function_calls(items, start):
"""收集紧随 assistant message 之后的连续 function_call 项"""
tool_calls = []
j = start
while j < len(items):
nxt = items[j]
if isinstance(nxt, dict) and nxt.get('type') == 'function_call':
tool_calls.append({
'id': nxt.get('call_id') or gen_id('call_'),
'type': 'function',
'function': {
'name': nxt.get('name', ''),
'arguments': nxt.get('arguments', '{}'),
},
})
j += 1
else:
break
return tool_calls, j - start
def _extract_text(content):
"""从 content 中提取纯文本"""
if isinstance(content, str):
return content
if not isinstance(content, list):
return str(content) if content else ''
texts = []
for part in content:
if isinstance(part, str):
texts.append(part)
elif isinstance(part, dict):
t = part.get('type', '')
if t in ('output_text', 'input_text', 'text'):
texts.append(part.get('text', ''))
elif t == 'refusal':
texts.append(part.get('refusal', ''))
return '\n'.join(texts) if texts else ''
def _convert_tools(tools):
"""将 Responses tools 转为 CC tools 格式"""
result = []
for t in tools:
if t.get('type') != 'function':
continue
if 'function' in t:
result.append(t)
else:
result.append({
'type': 'function',
'function': {
'name': t.get('name', ''),
'description': t.get('description', ''),
'parameters': t.get('parameters', {'type': 'object', 'properties': {}}),
},
})
return result

70
app.py Normal file
View file

@ -0,0 +1,70 @@
"""Flask 应用工厂
创建并配置 Flask 应用
- 注册所有路由蓝图
- 设置 JSON 错误处理器避免返回 HTML
- 配置全局鉴权中间件
"""
import logging
from flask import Flask, jsonify, request
from flask_cors import CORS
import settings
from config import Config
from routes import register_routes
logger = logging.getLogger(__name__)
def create_app():
app = Flask(__name__)
CORS(app)
settings.load()
# ─── JSON 错误处理器 ──────────────────────────
@app.errorhandler(404)
def not_found(e):
return jsonify({'error': {'message': '未找到', 'type': 'not_found'}}), 404
@app.errorhandler(405)
def method_not_allowed(e):
return jsonify({'error': {'message': '方法不允许', 'type': 'method_not_allowed'}}), 405
@app.errorhandler(500)
def internal_error(e):
return jsonify({'error': {'message': '服务器内部错误', 'type': 'server_error'}}), 500
# ─── 全局鉴权中间件 ──────────────────────────
@app.before_request
def check_access():
if not Config.ACCESS_API_KEY:
return
# 无需鉴权的路径
skip = ('/health', '/admin', '/static/', '/api/admin')
if any(request.path == p or request.path.startswith(p) for p in skip):
return
auth = request.headers.get('Authorization', '')
token = auth[7:] if auth.startswith('Bearer ') else request.headers.get('x-api-key', '')
if token != Config.ACCESS_API_KEY:
logger.warning(f'鉴权拒绝: {request.path}')
return jsonify({
'error': {'message': 'API 密钥无效', 'type': 'authentication_error'}
}), 401
# ─── 健康检查 ────────────────────────────────
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok', 'target': settings.get_url()})
# ─── 注册路由蓝图 ────────────────────────────
register_routes(app)
return app

16
compose.yml Normal file
View file

@ -0,0 +1,16 @@
services:
api2cursor:
build: .
ports:
- "${PROXY_PORT:-3029}:${PROXY_PORT:-3029}"
env_file:
- .env
volumes:
- ./data:/app/data
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:${PROXY_PORT:-3029}/health"]
interval: 30s
timeout: 5s
retries: 3
start_period: 10s

18
config.py Normal file
View file

@ -0,0 +1,18 @@
"""环境变量配置"""
import os
class Config:
# 上游 API 地址
PROXY_TARGET_URL = os.getenv('PROXY_TARGET_URL', 'https://api.anthropic.com')
# 上游 API 密钥
PROXY_API_KEY = os.getenv('PROXY_API_KEY', '')
# 服务监听端口
PROXY_PORT = int(os.getenv('PROXY_PORT', '3029'))
# 请求超时时间(秒)
API_TIMEOUT = int(os.getenv('API_TIMEOUT', '300'))
# 访问鉴权密钥,留空则不启用鉴权
ACCESS_API_KEY = os.getenv('ACCESS_API_KEY', '')
# 调试模式:开启后输出详细的请求/响应日志
DEBUG = os.getenv('DEBUG', '').lower() in ('1', 'true', 'yes', 'on')

5
requirements.txt Normal file
View file

@ -0,0 +1,5 @@
flask
flask-cors
requests
python-dotenv
waitress

14
routes/__init__.py Normal file
View file

@ -0,0 +1,14 @@
"""路由注册"""
def register_routes(app):
"""将所有路由蓝图注册到 Flask 应用"""
from routes.chat import bp as chat_bp
from routes.responses import bp as responses_bp
from routes.messages import bp as messages_bp
from routes.admin import bp as admin_bp
app.register_blueprint(chat_bp)
app.register_blueprint(responses_bp)
app.register_blueprint(messages_bp)
app.register_blueprint(admin_bp)

195
routes/admin.py Normal file
View file

@ -0,0 +1,195 @@
"""路由: 管理面板
提供 Web 管理界面和 API
- /admin 管理面板页面
- /v1/models 模型列表 Cursor 查询
- /api/admin/* 登录验证全局设置 CRUD模型映射 CRUD
"""
import os
import logging
from flask import Blueprint, request, jsonify, send_from_directory
import settings
from config import Config
logger = logging.getLogger(__name__)
_STATIC_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'static')
bp = Blueprint('admin', __name__)
# ─── 静态页面 ─────────────────────────────────────
@bp.route('/admin')
@bp.route('/admin/')
def admin_page():
return send_from_directory(_STATIC_DIR, 'admin.html')
@bp.route('/static/<path:filename>')
def static_files(filename):
return send_from_directory(_STATIC_DIR, filename)
# ─── 模型列表 ─────────────────────────────────────
@bp.route('/v1/models', methods=['GET'])
def list_models():
mappings = settings.get().get('model_mappings', {})
models = [{
'id': name,
'object': 'model',
'owned_by': info.get('backend', 'custom'),
} for name, info in mappings.items()]
if not models:
models.append({
'id': 'claude-sonnet-4-5-20250929',
'object': 'model',
'owned_by': 'anthropic',
})
return jsonify({'object': 'list', 'data': models})
# ─── 登录验证 ─────────────────────────────────────
@bp.route('/api/admin/login', methods=['POST'])
def admin_login():
data = request.get_json(force=True)
if not Config.ACCESS_API_KEY:
return jsonify({'ok': True, 'message': '未配置鉴权'})
if data.get('key', '') == Config.ACCESS_API_KEY:
return jsonify({'ok': True})
return jsonify({'ok': False, 'message': '密钥错误'}), 401
# ─── 全局设置 ─────────────────────────────────────
@bp.route('/api/admin/settings', methods=['GET'])
def get_settings():
err = _check_auth()
if err:
return err
s = settings.get()
return jsonify({
'proxy_target_url': s.get('proxy_target_url', ''),
'proxy_api_key': s.get('proxy_api_key', ''),
'env_target_url': Config.PROXY_TARGET_URL,
'env_api_key': '***' if Config.PROXY_API_KEY else '',
})
@bp.route('/api/admin/settings', methods=['PUT'])
def update_settings():
err = _check_auth()
if err:
return err
data = request.get_json(force=True)
s = settings.get()
for key in ('proxy_target_url', 'proxy_api_key'):
if key in data:
s[key] = data[key]
return _save_and_respond(s, '全局设置已更新')
# ─── 模型映射 CRUD ────────────────────────────────
@bp.route('/api/admin/mappings', methods=['GET'])
def list_mappings():
err = _check_auth()
if err:
return err
return jsonify(settings.get().get('model_mappings', {}))
@bp.route('/api/admin/mappings', methods=['POST'])
def add_mapping():
err = _check_auth()
if err:
return err
data = request.get_json(force=True)
name = data.get('name', '').strip()
if not name:
return jsonify({'error': '名称不能为空'}), 400
s = settings.get()
mappings = s.setdefault('model_mappings', {})
mappings[name] = {
'upstream_model': data.get('upstream_model', name),
'backend': data.get('backend', 'auto'),
'target_url': data.get('target_url', ''),
'api_key': data.get('api_key', ''),
}
return _save_and_respond(s, f'映射已添加: {name}')
@bp.route('/api/admin/mappings/<path:name>', methods=['PUT'])
def update_mapping(name):
err = _check_auth()
if err:
return err
data = request.get_json(force=True)
s = settings.get()
mappings = s.get('model_mappings', {})
if name not in mappings:
return jsonify({'error': '映射不存在'}), 404
new_name = data.get('name', name).strip()
entry = {
'upstream_model': data.get('upstream_model', name),
'backend': data.get('backend', 'auto'),
'target_url': data.get('target_url', ''),
'api_key': data.get('api_key', ''),
}
if new_name != name:
del mappings[name]
mappings[new_name] = entry
s['model_mappings'] = mappings
return _save_and_respond(s, f'映射已更新: {name}{new_name}')
@bp.route('/api/admin/mappings/<path:name>', methods=['DELETE'])
def delete_mapping(name):
err = _check_auth()
if err:
return err
s = settings.get()
mappings = s.get('model_mappings', {})
if name in mappings:
del mappings[name]
s['model_mappings'] = mappings
return _save_and_respond(s, f'映射已删除: {name}')
return jsonify({'ok': True})
# ─── 内部辅助 ─────────────────────────────────────
def _check_auth():
"""Admin API 鉴权,返回 None 表示通过"""
if not Config.ACCESS_API_KEY:
return None
auth = request.headers.get('Authorization', '')
token = auth[7:] if auth.startswith('Bearer ') else request.headers.get('x-api-key', '')
if token != Config.ACCESS_API_KEY:
return jsonify({'error': '未授权'}), 401
return None
def _save_and_respond(data, log_msg):
"""保存配置并返回响应"""
try:
settings.save(data)
except OSError as e:
logger.error(f'保存失败: {e}')
return jsonify({'error': {'message': f'保存失败: {e}', 'type': 'save_error'}}), 500
logger.info(log_msg)
return jsonify({'ok': True})

216
routes/chat.py Normal file
View file

@ -0,0 +1,216 @@
"""路由: /v1/chat/completions
处理 Cursor 发来的 OpenAI Chat Completions 格式请求
根据模型映射的 backend 字段分发到 OpenAI Anthropic 后端
"""
import json
import logging
from flask import Blueprint, request, jsonify
import settings
from config import Config
from adapters.openai_fixer import normalize_request, fix_response, fix_stream_chunk
from adapters.openai_anthropic import (
cc_to_messages_request, messages_to_cc_response, AnthropicStreamConverter,
)
from adapters.responses_adapter import responses_to_cc
from utils.http import (
build_openai_headers, build_anthropic_headers,
forward_request, sse_response,
iter_openai_sse, iter_anthropic_sse,
)
from utils.think_tag import ThinkTagExtractor
logger = logging.getLogger(__name__)
def _dbg(msg):
"""DEBUG 模式下输出详细日志"""
if Config.DEBUG:
logger.info(f'[调试] {msg}')
bp = Blueprint('chat', __name__)
@bp.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
payload = request.get_json(force=True)
is_stream = payload.get('stream', False)
# 保留 Cursor 发送的原始模型名,响应时需要回填
cursor_model = payload.get('model', 'unknown')
msg_count = len(payload.get('messages', []))
# 容错Responses 格式误入 CC 端点
if msg_count == 0 and 'input' in payload:
logger.info('检测到 Responses 格式(有 input 无 messages自动转换')
payload = responses_to_cc(payload)
msg_count = len(payload.get('messages', []))
elif msg_count == 0:
logger.warning(f'messages 为空, payload keys: {list(payload.keys())}')
mapping = settings.resolve_model(cursor_model)
backend = mapping['backend']
upstream = mapping['upstream_model']
url_base = mapping['target_url']
api_key = mapping['api_key']
logger.info(
f'[CC] {cursor_model}{upstream} '
f'后端={backend} 流式={is_stream} 消息数={msg_count}'
)
_log_messages(payload)
if backend == 'openai':
return _via_openai(payload, upstream, url_base, api_key, is_stream, cursor_model)
else:
return _via_anthropic(payload, upstream, url_base, api_key, is_stream, cursor_model)
# ─── OpenAI 后端 ──────────────────────────────────
def _via_openai(payload, upstream, url_base, api_key, is_stream, cursor_model):
"""通过 OpenAI 兼容后端转发"""
_dbg(f'Cursor 原始请求 keys={list(payload.keys())} '
f'其他字段={json.dumps({k: v for k, v in payload.items() if k != "messages"}, ensure_ascii=False, default=str)[:500]}')
payload = normalize_request(payload, upstream)
_dbg(f'normalize 后 model={payload.get("model")} tools数={len(payload.get("tools", []))}')
headers = build_openai_headers(api_key)
url = f'{url_base.rstrip("/")}/v1/chat/completions'
if not is_stream:
payload['stream'] = False
resp, err = forward_request(url, headers, payload)
if err:
return err
raw = resp.json()
_dbg(f'上游原始响应={json.dumps(raw, ensure_ascii=False, default=str)[:1000]}')
data = fix_response(raw)
data['model'] = cursor_model
_dbg(f'修复后响应={json.dumps(data, ensure_ascii=False, default=str)[:1000]}')
usage = data.get('usage', {})
logger.info(
f'[CC] 完成 prompt={usage.get("prompt_tokens", 0)} '
f'completion={usage.get("completion_tokens", 0)}'
)
return jsonify(data)
# 流式处理
payload['stream'] = True
_n = [0]
def generate():
resp, err = forward_request(url, headers, payload, stream=True)
if err:
yield f'data: {json.dumps({"error": {"message": err, "type": "upstream_error"}})}\n\n'
return
think_ext = ThinkTagExtractor()
for chunk in iter_openai_sse(resp):
if chunk is None: # [DONE]
_dbg(f'流结束,共 {_n[0]} 个 chunk')
yield 'data: [DONE]\n\n'
return
if _n[0] < 10:
_dbg(f'上游原始 chunk#{_n[0]}={json.dumps(chunk, ensure_ascii=False, default=str)[:500]}')
chunk = fix_stream_chunk(chunk)
chunk['model'] = cursor_model
for out in think_ext.process_chunk(chunk):
if _n[0] < 10:
_dbg(f'发给Cursor chunk#{_n[0]}={json.dumps(out, ensure_ascii=False, default=str)[:500]}')
yield f'data: {json.dumps(out)}\n\n'
_n[0] += 1
return sse_response(generate())
# ─── Anthropic 后端 ───────────────────────────────
def _via_anthropic(payload, upstream, url_base, api_key, is_stream, cursor_model):
"""通过 Anthropic 后端转发CC → Messages → CC"""
payload['model'] = upstream
anthropic_payload = cc_to_messages_request(payload)
_dbg(f'CC→Messages 转换后 keys={list(anthropic_payload.keys())} '
f'messages数={len(anthropic_payload.get("messages", []))}')
headers = build_anthropic_headers(api_key)
url = f'{url_base.rstrip("/")}/v1/messages'
if not is_stream:
anthropic_payload['stream'] = False
resp, err = forward_request(url, headers, anthropic_payload)
if err:
return err
raw = resp.json()
_dbg(f'上游原始响应={json.dumps(raw, ensure_ascii=False, default=str)[:1000]}')
data = messages_to_cc_response(raw)
data['model'] = cursor_model
_dbg(f'Messages→CC 转换后={json.dumps(data, ensure_ascii=False, default=str)[:1000]}')
usage = data.get('usage', {})
logger.info(
f'[CC] 完成 prompt={usage.get("prompt_tokens", 0)} '
f'completion={usage.get("completion_tokens", 0)}'
)
return jsonify(data)
# 流式处理
anthropic_payload['stream'] = True
converter = AnthropicStreamConverter()
_n = [0]
def generate():
resp, err = forward_request(url, headers, anthropic_payload, stream=True)
if err:
yield f'data: {json.dumps({"error": {"message": err, "type": "upstream_error"}})}\n\n'
return
for event_type, event_data in iter_anthropic_sse(resp):
if _n[0] < 10:
_dbg(f'上游事件#{_n[0]} {event_type}={json.dumps(event_data, ensure_ascii=False, default=str)[:500]}')
for chunk_str in converter.process_event(event_type, event_data):
try:
chunk_obj = json.loads(chunk_str)
chunk_obj['model'] = cursor_model
chunk_str = json.dumps(chunk_obj)
except (json.JSONDecodeError, TypeError):
pass
if _n[0] < 10:
_dbg(f'发给Cursor chunk#{_n[0]}={chunk_str[:500]}')
yield f'data: {chunk_str}\n\n'
_n[0] += 1
_dbg(f'流结束,共 {_n[0]} 个事件')
yield 'data: [DONE]\n\n'
return sse_response(generate())
def _log_messages(payload):
"""记录请求中的消息摘要"""
for i, msg in enumerate(payload.get('messages', [])):
role = msg.get('role', '?')
content = msg.get('content')
extra = ''
if 'tool_calls' in msg:
extra += f' tool_calls={len(msg["tool_calls"])}'
if msg.get('tool_call_id'):
extra += f' tool_call_id={msg["tool_call_id"]}'
if isinstance(content, list):
info = f'list[{len(content)}]'
elif isinstance(content, str):
info = f'str[{len(content)}]'
else:
info = type(content).__name__
logger.info(f' 消息[{i}] {role} {info}{extra}')

147
routes/messages.py Normal file
View file

@ -0,0 +1,147 @@
"""路由: /v1/messages
Anthropic Messages API 透传 Cursor 直接发送 Anthropic 格式请求时
直接转发到上游并原样返回处理非标准的 reasoning_content 字段
将其注入为标准的 thinking blocks
"""
import json
import logging
import requests as req_lib
from flask import Blueprint, request, jsonify
import settings
from config import Config
from utils.http import build_anthropic_headers, forward_request, sse_response
logger = logging.getLogger(__name__)
bp = Blueprint('messages', __name__)
@bp.route('/v1/messages', methods=['POST'])
def messages_passthrough():
payload = request.get_json(force=True)
model = payload.get('model', 'unknown')
is_stream = payload.get('stream', False)
logger.info(f'[透传] model={model} 流式={is_stream}')
url_base = settings.get_url()
api_key = settings.get_key()
headers = build_anthropic_headers(api_key)
url = f'{url_base.rstrip("/")}/v1/messages'
if not is_stream:
resp, err = forward_request(url, headers, payload)
if err:
return err
data = resp.json()
_inject_thinking(data)
return jsonify(data)
# 流式透传
def generate():
try:
resp = req_lib.post(
url, headers=headers, json=payload,
timeout=Config.API_TIMEOUT, stream=True,
)
if resp.status_code != 200:
body = resp.content.decode('utf-8', errors='replace')
logger.warning(f'上游返回 {resp.status_code}: {body[:300]}')
yield f'data: {json.dumps({"error": {"message": body, "type": "upstream_error"}})}\n\n'
return
yield from _process_stream(resp)
except req_lib.RequestException as e:
logger.error(f'请求上游失败: {e}')
yield f'data: {json.dumps({"error": {"message": str(e), "type": "proxy_error"}})}\n\n'
return sse_response(generate())
# ─── 内部辅助 ─────────────────────────────────────
def _inject_thinking(data):
"""将非标准 reasoning_content 字段注入为 Anthropic thinking block"""
rc = data.pop('reasoning_content', None) or data.pop('reasoningContent', None)
if not rc:
return
content = data.get('content')
if not isinstance(content, list):
content = []
# 避免重复注入
if any(isinstance(b, dict) and b.get('type') == 'thinking' for b in content):
return
content.insert(0, {'type': 'thinking', 'thinking': rc})
data['content'] = content
logger.info(f'已注入 thinking block ({len(rc)} 字符)')
def _process_stream(resp):
"""处理 /v1/messages 流式响应,检测并注入 thinking 事件"""
reasoning_buf = ''
injected = False
for line in resp.iter_lines():
if not line:
continue
decoded = line.decode('utf-8', errors='replace')
if not decoded.startswith('data:'):
yield decoded + '\n\n'
continue
data_str = decoded[5:].strip()
if not data_str:
yield decoded + '\n\n'
continue
try:
event_data = json.loads(data_str)
except json.JSONDecodeError:
yield decoded + '\n\n'
continue
modified = False
# 提取 reasoning_content
for container_key in ('message', 'delta'):
container = event_data.get(container_key)
if not container:
continue
rc = container.pop('reasoning_content', None) or container.pop('reasoningContent', None)
if rc:
reasoning_buf += rc
modified = True
# 在首个 text_delta 前注入 thinking blocks
if reasoning_buf and not injected:
if event_data.get('delta', {}).get('type') == 'text_delta':
injected = True
yield from _emit_thinking_blocks(reasoning_buf)
reasoning_buf = ''
yield f'data: {json.dumps(event_data)}\n\n' if modified else decoded + '\n\n'
def _emit_thinking_blocks(text):
"""生成 thinking block 的 SSE 事件序列"""
yield (
f'event: content_block_start\n'
f'data: {json.dumps({"type": "content_block_start", "index": 0, "content_block": {"type": "thinking", "thinking": ""}})}\n\n'
)
yield (
f'event: content_block_delta\n'
f'data: {json.dumps({"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "thinking": text}})}\n\n'
)
yield (
f'event: content_block_stop\n'
f'data: {json.dumps({"type": "content_block_stop", "index": 0})}\n\n'
)

126
routes/responses.py Normal file
View file

@ -0,0 +1,126 @@
"""路由: /v1/responses
处理 Cursor GPT/Claude-Opus 等模型发出的 Responses API 格式请求
转换为 CC 格式后分发到对应后端响应再转回 Responses 格式
"""
import json
import logging
from flask import Blueprint, request, jsonify
import settings
from adapters.responses_adapter import responses_to_cc, cc_to_responses, ResponsesStreamConverter
from adapters.openai_fixer import normalize_request, fix_response, fix_stream_chunk
from adapters.openai_anthropic import cc_to_messages_request, messages_to_cc_response
from utils.http import (
build_openai_headers, build_anthropic_headers,
forward_request, sse_response,
iter_openai_sse, iter_anthropic_sse,
)
from utils.think_tag import ThinkTagExtractor
logger = logging.getLogger(__name__)
bp = Blueprint('responses', __name__)
@bp.route('/v1/responses', methods=['POST'])
def responses_endpoint():
payload = request.get_json(force=True)
model = payload.get('model', 'unknown')
is_stream = payload.get('stream', False)
mapping = settings.resolve_model(model)
backend = mapping['backend']
upstream = mapping['upstream_model']
url_base = mapping['target_url']
api_key = mapping['api_key']
logger.info(f'[Responses] {model}{upstream} 后端={backend} 流式={is_stream}')
# Responses → CC
cc_payload = responses_to_cc(payload)
cc_payload['model'] = upstream
if backend == 'openai':
return _via_openai(cc_payload, url_base, api_key, is_stream, model)
else:
return _via_anthropic(cc_payload, url_base, api_key, is_stream, model)
# ─── OpenAI 后端 ──────────────────────────────────
def _via_openai(cc_payload, url_base, api_key, is_stream, display_model):
"""通过 OpenAI 后端处理"""
cc_payload = normalize_request(cc_payload)
headers = build_openai_headers(api_key)
url = f'{url_base.rstrip("/")}/v1/chat/completions'
if not is_stream:
cc_payload['stream'] = False
resp, err = forward_request(url, headers, cc_payload)
if err:
return err
return jsonify(cc_to_responses(fix_response(resp.json()), display_model))
# 流式处理
cc_payload['stream'] = True
converter = ResponsesStreamConverter(model=display_model)
def generate():
yield from converter.start_events()
resp, err = forward_request(url, headers, cc_payload, stream=True)
if err:
yield f'event: error\ndata: {json.dumps({"error": err})}\n\n'
return
think_ext = ThinkTagExtractor()
for chunk in iter_openai_sse(resp):
if chunk is None:
yield from converter.finalize()
return
chunk = fix_stream_chunk(chunk)
for out in think_ext.process_chunk(chunk):
yield from converter.process_cc_chunk(out)
return sse_response(generate())
# ─── Anthropic 后端 ───────────────────────────────
def _via_anthropic(cc_payload, url_base, api_key, is_stream, display_model):
"""通过 Anthropic 后端处理"""
anthropic_payload = cc_to_messages_request(cc_payload)
headers = build_anthropic_headers(api_key)
url = f'{url_base.rstrip("/")}/v1/messages'
if not is_stream:
anthropic_payload['stream'] = False
resp, err = forward_request(url, headers, anthropic_payload)
if err:
return err
cc_data = messages_to_cc_response(resp.json())
return jsonify(cc_to_responses(cc_data, display_model))
# 流式处理Anthropic SSE → Responses SSE跳过 CC 中间态)
anthropic_payload['stream'] = True
converter = ResponsesStreamConverter(model=display_model)
def generate():
yield from converter.start_events()
resp, err = forward_request(url, headers, anthropic_payload, stream=True)
if err:
yield f'event: error\ndata: {json.dumps({"error": err})}\n\n'
return
for event_type, event_data in iter_anthropic_sse(resp):
yield from converter.process_anthropic_event(event_type, event_data)
yield from converter.finalize()
return sse_response(generate())

97
settings.py Normal file
View file

@ -0,0 +1,97 @@
"""持久化配置管理
使用 data/settings.json 存储可通过管理面板修改的设置
- proxy_target_url / proxy_api_key: 可覆盖环境变量的全局配置
- model_mappings: Cursor 模型名 {upstream_model, backend, target_url, api_key}
"""
import json
import os
import threading
from config import Config
# 数据目录放在项目根目录下,便于 Docker 卷挂载
_ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(_ROOT_DIR, 'data')
SETTINGS_FILE = os.path.join(DATA_DIR, 'settings.json')
_lock = threading.Lock()
_cache = None
_DEFAULTS = {
'proxy_target_url': '',
'proxy_api_key': '',
'model_mappings': {},
}
def load():
"""从文件加载配置"""
global _cache
with _lock:
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r', encoding='utf-8') as f:
_cache = {**_DEFAULTS, **json.load(f)}
except (json.JSONDecodeError, OSError):
_cache = dict(_DEFAULTS)
else:
_cache = dict(_DEFAULTS)
return dict(_cache)
def save(data):
"""保存配置到文件"""
global _cache
with _lock:
os.makedirs(DATA_DIR, exist_ok=True)
_cache = {**_DEFAULTS, **data}
with open(SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump(_cache, f, ensure_ascii=False, indent=2)
def get():
"""获取当前配置(优先使用缓存)"""
if _cache is None:
return load()
return dict(_cache)
def get_url():
"""获取生效的上游 URL配置文件优先环境变量兜底"""
return get().get('proxy_target_url') or Config.PROXY_TARGET_URL
def get_key():
"""获取生效的 API 密钥:配置文件优先,环境变量兜底"""
return get().get('proxy_api_key') or Config.PROXY_API_KEY
def resolve_model(model_name):
"""解析模型映射,返回 {upstream_model, backend, target_url, api_key}"""
settings = get()
mappings = settings.get('model_mappings', {})
base_url, base_key = get_url(), get_key()
if model_name in mappings:
m = mappings[model_name]
return {
'upstream_model': m.get('upstream_model') or model_name,
'backend': m.get('backend') or _auto_detect(model_name),
'target_url': m.get('target_url') or base_url,
'api_key': m.get('api_key') or base_key,
}
return {
'upstream_model': model_name,
'backend': _auto_detect(model_name),
'target_url': base_url,
'api_key': base_key,
}
def _auto_detect(name):
"""根据模型名自动判断后端类型"""
lower = (name or '').lower()
return 'anthropic' if ('claude' in lower or 'anthropic' in lower) else 'openai'

38
start.py Normal file
View file

@ -0,0 +1,38 @@
"""启动入口
用法: python start.py
"""
import logging
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(name)s] %(levelname)s: %(message)s',
)
from config import Config
from app import create_app
def main():
app = create_app()
print(f'代理服务启动于 0.0.0.0:{Config.PROXY_PORT}')
print(f'上游地址: {Config.PROXY_TARGET_URL}')
print(f'管理面板: http://localhost:{Config.PROXY_PORT}/admin')
from waitress import serve
serve(
app,
host='0.0.0.0',
port=Config.PROXY_PORT,
channel_timeout=Config.API_TIMEOUT,
send_bytes=1,
)
if __name__ == '__main__':
main()

76
static/admin.css Normal file
View file

@ -0,0 +1,76 @@
*,*::before,*::after{box-sizing:border-box;margin:0;padding:0}
:root{
--bg:#0b1120;--surface:#151d2e;--card:#1a2332;--input:#212d3f;
--border:#2a3a50;--text:#e2e8f0;--muted:#8899ab;--primary:#3b82f6;
--primary-hover:#2563eb;--green:#22c55e;--red:#ef4444;--yellow:#eab308;
--radius:10px;
}
body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI','PingFang SC','Microsoft YaHei',sans-serif;background:var(--bg);color:var(--text);min-height:100vh;line-height:1.6}
input,select,button,textarea{font-family:inherit;font-size:inherit}
a{color:var(--primary);text-decoration:none}
code{background:var(--input);padding:1px 5px;border-radius:4px;font-size:12px;font-family:Consolas,Monaco,monospace}
.container{max-width:960px;margin:0 auto;padding:0 20px}
#login{display:flex;align-items:center;justify-content:center;min-height:100vh;background:linear-gradient(145deg,#0b1120 0%,#121a2e 50%,#0b1120 100%)}
.login-card{background:var(--card);border:1px solid var(--border);border-radius:16px;padding:40px;width:380px;box-shadow:0 20px 60px rgba(0,0,0,.4)}
.login-card h1{font-size:22px;font-weight:700;margin-bottom:6px;text-align:center}
.login-card p{color:var(--muted);font-size:13px;text-align:center;margin-bottom:28px}
.field{margin-bottom:16px}
.field label{display:block;font-size:13px;color:var(--muted);margin-bottom:6px;font-weight:500}
.input-wrap{position:relative}
.input-wrap input,.input-wrap select{width:100%;background:var(--input);border:1px solid var(--border);border-radius:8px;padding:10px 14px;color:var(--text);font-size:14px;outline:none;transition:border-color .2s}
.input-wrap input:focus,.input-wrap select:focus{border-color:var(--primary)}
.input-wrap .eye{position:absolute;right:10px;top:50%;transform:translateY(-50%);background:none;border:none;color:var(--muted);cursor:pointer;padding:4px;font-size:16px}
select{cursor:pointer;appearance:none;background-image:url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' fill='%238899ab'%3E%3Cpath d='M6 8L1 3h10z'/%3E%3C/svg%3E");background-repeat:no-repeat;background-position:right 12px center}
.btn{display:inline-flex;align-items:center;gap:6px;padding:9px 18px;border-radius:8px;border:none;font-size:14px;font-weight:500;cursor:pointer;transition:all .15s}
.btn-primary{background:var(--primary);color:#fff}.btn-primary:hover{background:var(--primary-hover)}
.btn-green{background:var(--green);color:#fff}.btn-green:hover{opacity:.9}
.btn-red{background:transparent;color:var(--red);border:1px solid var(--red)}.btn-red:hover{background:var(--red);color:#fff}
.btn-ghost{background:transparent;color:var(--muted);border:1px solid var(--border)}.btn-ghost:hover{color:var(--text);border-color:var(--muted)}
.btn-sm{padding:6px 12px;font-size:13px;border-radius:6px}
.btn-block{width:100%;justify-content:center}
.btn:disabled{opacity:.5;cursor:not-allowed}
#dashboard{display:none}
header{background:var(--surface);border-bottom:1px solid var(--border);padding:14px 0;position:sticky;top:0;z-index:50}
header .inner{display:flex;align-items:center}
header h1{font-size:17px;font-weight:700;flex:1}
header .right{display:flex;align-items:center;gap:12px}
.status{font-size:12px;padding:4px 10px;border-radius:20px;background:rgba(34,197,94,.15);color:var(--green)}
main{padding:28px 0 60px}
.card{background:var(--card);border:1px solid var(--border);border-radius:var(--radius);padding:24px;margin-bottom:24px}
.card-header{display:flex;align-items:center;justify-content:space-between;margin-bottom:20px}
.card-header h2{font-size:16px;font-weight:600}
.card-header .badge{font-size:12px;color:var(--muted);background:var(--input);padding:3px 10px;border-radius:12px}
.hint{font-size:12px;color:var(--muted);margin-top:4px}
.mapping-list{display:flex;flex-direction:column;gap:10px}
.mapping-item{background:var(--surface);border:1px solid var(--border);border-radius:8px;padding:16px;transition:border-color .2s}
.mapping-item:hover{border-color:var(--primary)}
.mapping-top{display:flex;align-items:center;gap:12px;flex-wrap:wrap}
.mapping-name{font-weight:600;font-size:15px;color:var(--primary)}
.mapping-arrow{color:var(--muted);font-size:13px}
.mapping-upstream{font-size:14px;color:var(--text)}
.mapping-meta{display:flex;gap:8px;flex-wrap:wrap}
.tag{font-size:11px;padding:2px 8px;border-radius:4px;font-weight:500}
.tag-anthropic{background:rgba(249,115,22,.15);color:#fb923c}
.tag-openai{background:rgba(16,185,129,.15);color:#34d399}
.tag-auto{background:rgba(139,92,246,.15);color:#a78bfa}
.tag-override{background:rgba(59,130,246,.1);color:var(--primary)}
.mapping-actions{margin-left:auto;display:flex;gap:6px}
.empty{text-align:center;padding:40px;color:var(--muted)}
.modal-overlay{display:none;position:fixed;inset:0;background:rgba(0,0,0,.6);z-index:100;align-items:center;justify-content:center}
.modal-overlay.active{display:flex}
.modal{background:var(--card);border:1px solid var(--border);border-radius:14px;padding:28px;width:520px;max-width:90vw;max-height:85vh;overflow-y:auto;box-shadow:0 20px 60px rgba(0,0,0,.5)}
.modal h3{font-size:17px;font-weight:600;margin-bottom:20px}
.modal-footer{display:flex;justify-content:flex-end;gap:10px;margin-top:24px;padding-top:16px;border-top:1px solid var(--border)}
.toast-area{position:fixed;top:20px;right:20px;z-index:200;display:flex;flex-direction:column;gap:8px}
.toast{padding:12px 20px;border-radius:8px;font-size:14px;font-weight:500;box-shadow:0 8px 24px rgba(0,0,0,.3);animation:slideIn .3s ease}
.toast-ok{background:#065f46;color:#a7f3d0}
.toast-err{background:#7f1d1d;color:#fca5a5}
@keyframes slideIn{from{transform:translateX(100px);opacity:0}to{transform:none;opacity:1}}

127
static/admin.html Normal file
View file

@ -0,0 +1,127 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>API 2 Cursor - 管理面板</title>
<link rel="stylesheet" href="/static/admin.css">
</head>
<body>
<!-- 登录 -->
<div id="login">
<div class="login-card">
<h1>API 2 Cursor</h1>
<p>模型映射管理面板</p>
<div class="field">
<label>ACCESS_API_KEY</label>
<div class="input-wrap">
<input type="password" id="loginKey" placeholder="请输入访问密钥" onkeydown="if(event.key==='Enter')doLogin()">
<button class="eye" onclick="togglePwd('loginKey')">&#128065;</button>
</div>
</div>
<button class="btn btn-primary btn-block" onclick="doLogin()">登 录</button>
</div>
</div>
<!-- 仪表盘 -->
<div id="dashboard">
<header>
<div class="container inner">
<h1>API 2 Cursor</h1>
<div class="right">
<span class="status" id="statusBadge">已连接</span>
<button class="btn btn-ghost btn-sm" onclick="doLogout()">退出</button>
</div>
</div>
</header>
<main class="container">
<!-- 全局设置 -->
<div class="card">
<div class="card-header">
<h2>全局设置</h2>
</div>
<div class="field">
<label>中转站地址 (Proxy Target URL)</label>
<div class="input-wrap"><input type="text" id="targetUrl" placeholder="https://your-relay.com"></div>
<div class="hint" id="envUrl"></div>
</div>
<div class="field">
<label>中转站 API Key</label>
<div class="input-wrap">
<input type="password" id="proxyKey" placeholder="sk-xxx 或 Bearer token">
<button class="eye" onclick="togglePwd('proxyKey')">&#128065;</button>
</div>
<div class="hint" id="envKey"></div>
</div>
<button class="btn btn-primary" onclick="saveSettings()">保存设置</button>
</div>
<!-- 模型映射 -->
<div class="card">
<div class="card-header">
<h2>模型映射</h2>
<button class="btn btn-green btn-sm" onclick="openAddModal()">+ 添加映射</button>
</div>
<div class="hint" style="margin-top:-12px;margin-bottom:16px">
Cursor 发送请求时会带上模型名 → 代理根据映射表找到上游实际模型名和后端协议进行转发。
<br>提示:建议在 Cursor 里使用 Claude 风格的模型名(如 <code>claude-sonnet-4-5-20250929</code>),这样 Cursor 会走 <code>/v1/chat/completions</code> 格式GPT 风格的模型名会走 <code>/v1/responses</code> 格式,两种都已支持。
</div>
<div id="mappingList"></div>
</div>
</main>
</div>
<!-- 弹窗 -->
<div class="modal-overlay" id="modal">
<div class="modal">
<h3 id="modalTitle">添加模型映射</h3>
<div class="field">
<label>Cursor 模型名 <span style="color:var(--red)">*</span></label>
<div class="input-wrap"><input type="text" id="mName" placeholder="例: claude-sonnet-4-5-20250929"></div>
<div class="hint">在 Cursor 自定义模型中添加的名称</div>
</div>
<div class="field">
<label>上游实际模型名 <span style="color:var(--red)">*</span></label>
<div class="input-wrap"><input type="text" id="mUpstream" placeholder="例: gpt-5.4 或 claude-sonnet-4-5-20250929"></div>
<div class="hint">发送到中转站的真实模型名称</div>
</div>
<div class="field">
<label>后端类型</label>
<div class="input-wrap">
<select id="mBackend">
<option value="auto">自动检测</option>
<option value="anthropic">Anthropic (/v1/messages)</option>
<option value="openai">OpenAI (/v1/chat/completions)</option>
</select>
</div>
<div class="hint">
<b>anthropic</b>:转换为 Anthropic Messages 格式 — 适用于中转站通过 <code>/v1/messages</code> 提供 Claude 模型<br>
<b>openai</b>:保持 OpenAI Chat Completions 格式 — 适用于 GPT、DeepSeek、Codex 或通过 <code>/v1/chat/completions</code> 提供所有模型的中转站<br>
<b>自动检测</b>:根据上游模型名判断(含 claude → anthropic其他 → openai
</div>
</div>
<div class="field">
<label>自定义地址 <span style="color:var(--muted)">(可选,留空使用全局设置)</span></label>
<div class="input-wrap"><input type="text" id="mUrl" placeholder="留空则使用全局中转站地址"></div>
</div>
<div class="field">
<label>自定义 API Key <span style="color:var(--muted)">(可选,留空使用全局设置)</span></label>
<div class="input-wrap">
<input type="password" id="mKey" placeholder="留空则使用全局 API Key">
<button class="eye" onclick="togglePwd('mKey')">&#128065;</button>
</div>
</div>
<div class="modal-footer">
<button class="btn btn-ghost" onclick="closeModal()">取消</button>
<button class="btn btn-primary" id="modalSaveBtn" onclick="saveMapping()">保存</button>
</div>
</div>
</div>
<div class="toast-area" id="toasts"></div>
<script src="/static/admin.js"></script>
</body>
</html>

247
static/admin.js Normal file
View file

@ -0,0 +1,247 @@
const API = '';
let authKey = '';
let editingName = null;
function togglePwd(id) {
const el = document.getElementById(id);
el.type = el.type === 'password' ? 'text' : 'password';
}
function toast(msg, ok = true) {
const area = document.getElementById('toasts');
const el = document.createElement('div');
el.className = 'toast ' + (ok ? 'toast-ok' : 'toast-err');
el.textContent = msg;
area.appendChild(el);
setTimeout(() => el.remove(), 3000);
}
async function api(path, opts = {}) {
const headers = { 'Content-Type': 'application/json' };
if (authKey) headers['Authorization'] = 'Bearer ' + authKey;
const res = await fetch(API + path, { ...opts, headers });
const ct = res.headers.get('content-type') || '';
if (!ct.includes('application/json')) {
const text = await res.text();
if (!res.ok) throw new Error('HTTP ' + res.status + ': ' + text.substring(0, 100));
throw new Error('服务器返回了非 JSON 响应');
}
const data = await res.json();
if (!res.ok) {
const e = data.error;
const msg = (typeof e === 'object' && e !== null) ? (e.message || JSON.stringify(e)) : (e || data.message || 'HTTP ' + res.status);
throw new Error(msg);
}
return data;
}
// ─── 登录 ───────────────────────────────────────────
async function doLogin() {
const key = document.getElementById('loginKey').value.trim();
if (!key) { toast('请输入密钥', false); return; }
try {
const r = await api('/api/admin/login', { method: 'POST', body: JSON.stringify({ key }) });
if (r.ok) {
authKey = key;
sessionStorage.setItem('_ak', key);
document.getElementById('login').style.display = 'none';
document.getElementById('dashboard').style.display = 'block';
loadDashboard();
}
} catch (e) {
toast('密钥无效', false);
}
}
function doLogout() {
authKey = '';
sessionStorage.removeItem('_ak');
document.getElementById('dashboard').style.display = 'none';
document.getElementById('login').style.display = 'flex';
}
// ─── 仪表盘 ─────────────────────────────────────────
async function loadDashboard() {
try {
const s = await api('/api/admin/settings');
document.getElementById('targetUrl').value = s.proxy_target_url || '';
document.getElementById('proxyKey').value = s.proxy_api_key || '';
document.getElementById('envUrl').textContent = s.env_target_url ? '环境变量: ' + s.env_target_url : '';
document.getElementById('envKey').textContent = s.env_api_key ? '环境变量: (已配置)' : '环境变量: (未设置)';
await loadMappings();
checkHealth();
} catch (e) {
toast('加载设置失败: ' + e.message, false);
}
}
async function checkHealth() {
try {
const r = await fetch(API + '/health');
const d = await r.json();
const b = document.getElementById('statusBadge');
if (d.status === 'ok') {
b.textContent = '已连接';
b.style.background = 'rgba(34,197,94,.15)';
b.style.color = 'var(--green)';
} else {
b.textContent = '异常';
}
} catch {
const b = document.getElementById('statusBadge');
b.textContent = '离线';
b.style.background = 'rgba(239,68,68,.15)';
b.style.color = 'var(--red)';
}
}
async function saveSettings() {
try {
await api('/api/admin/settings', {
method: 'PUT',
body: JSON.stringify({
proxy_target_url: document.getElementById('targetUrl').value.trim(),
proxy_api_key: document.getElementById('proxyKey').value.trim(),
}),
});
toast('设置已保存');
} catch (e) {
toast('保存失败: ' + e.message, false);
}
}
// ─── 模型映射 ───────────────────────────────────────
async function loadMappings() {
const mappings = await api('/api/admin/mappings');
const el = document.getElementById('mappingList');
const keys = Object.keys(mappings);
if (!keys.length) {
el.innerHTML = '<div class="empty">暂无模型映射<br><span style="font-size:13px">点击「+ 添加映射」开始配置</span></div>';
return;
}
el.innerHTML = '<div class="mapping-list">' + keys.map(name => {
const m = mappings[name];
const backend = m.backend || 'auto';
const tagClass = backend === 'anthropic' ? 'tag-anthropic' : backend === 'openai' ? 'tag-openai' : 'tag-auto';
const tagLabel = backend === 'auto' ? '自动' : backend;
const hasOverride = m.target_url || m.api_key;
return `<div class="mapping-item">
<div class="mapping-top">
<span class="mapping-name">${esc(name)}</span>
<span class="mapping-arrow">&rarr;</span>
<span class="mapping-upstream">${esc(m.upstream_model || name)}</span>
<div class="mapping-meta">
<span class="tag ${tagClass}">${tagLabel}</span>
${hasOverride ? '<span class="tag tag-override">自定义地址</span>' : ''}
</div>
<div class="mapping-actions">
<button class="btn btn-ghost btn-sm" onclick="openEditModal('${esc(name)}')">编辑</button>
<button class="btn btn-red btn-sm" onclick="deleteMapping('${esc(name)}')">删除</button>
</div>
</div>
</div>`;
}).join('') + '</div>';
}
function esc(s) { return String(s).replace(/&/g,'&amp;').replace(/</g,'&lt;').replace(/>/g,'&gt;').replace(/"/g,'&quot;').replace(/'/g,'&#39;'); }
// ─── 弹窗 ──────────────────────────────────────────
function openAddModal() {
editingName = null;
document.getElementById('modalTitle').textContent = '添加模型映射';
document.getElementById('mName').value = '';
document.getElementById('mName').disabled = false;
document.getElementById('mUpstream').value = '';
document.getElementById('mBackend').value = 'auto';
document.getElementById('mUrl').value = '';
document.getElementById('mKey').value = '';
document.getElementById('modal').classList.add('active');
}
async function openEditModal(name) {
editingName = name;
document.getElementById('modalTitle').textContent = '编辑模型映射';
try {
const mappings = await api('/api/admin/mappings');
const m = mappings[name];
if (!m) { toast('映射未找到', false); return; }
document.getElementById('mName').value = name;
document.getElementById('mName').disabled = false;
document.getElementById('mUpstream').value = m.upstream_model || '';
document.getElementById('mBackend').value = m.backend || 'auto';
document.getElementById('mUrl').value = m.target_url || '';
document.getElementById('mKey').value = m.api_key || '';
document.getElementById('modal').classList.add('active');
} catch (e) {
toast('错误: ' + e.message, false);
}
}
function closeModal() {
document.getElementById('modal').classList.remove('active');
editingName = null;
}
async function saveMapping() {
const name = document.getElementById('mName').value.trim();
const upstream = document.getElementById('mUpstream').value.trim();
if (!name) { toast('请填写 Cursor 模型名', false); return; }
if (!upstream) { toast('请填写上游模型名', false); return; }
const payload = {
name,
upstream_model: upstream,
backend: document.getElementById('mBackend').value,
target_url: document.getElementById('mUrl').value.trim(),
api_key: document.getElementById('mKey').value.trim(),
};
try {
if (editingName) {
await api('/api/admin/mappings/' + encodeURIComponent(editingName), {
method: 'PUT', body: JSON.stringify(payload),
});
toast('映射已更新');
} else {
await api('/api/admin/mappings', {
method: 'POST', body: JSON.stringify(payload),
});
toast('映射已添加');
}
closeModal();
await loadMappings();
} catch (e) {
toast('操作失败: ' + e.message, false);
}
}
async function deleteMapping(name) {
if (!confirm('确定要删除映射「' + name + '」吗?')) return;
try {
await api('/api/admin/mappings/' + encodeURIComponent(name), { method: 'DELETE' });
toast('映射已删除');
await loadMappings();
} catch (e) {
toast('删除失败: ' + e.message, false);
}
}
// ─── 初始化 ─────────────────────────────────────────
(function init() {
const saved = sessionStorage.getItem('_ak');
if (saved) {
authKey = saved;
document.getElementById('login').style.display = 'none';
document.getElementById('dashboard').style.display = 'block';
loadDashboard();
}
})();
document.getElementById('modal').addEventListener('click', function(e) {
if (e.target === this) closeModal();
});
document.addEventListener('keydown', function(e) {
if (e.key === 'Escape') closeModal();
});

0
utils/__init__.py Normal file
View file

131
utils/http.py Normal file
View file

@ -0,0 +1,131 @@
"""HTTP 工具 - 请求头构建、上游转发、SSE 流解析、响应构建"""
import json
import uuid
import logging
import requests
from flask import Response, jsonify
from config import Config
logger = logging.getLogger(__name__)
def gen_id(prefix=''):
"""生成唯一 ID"""
return f'{prefix}{uuid.uuid4().hex[:24]}'
# ─── 请求头构建 ────────────────────────────────────
def build_openai_headers(api_key):
"""构建 OpenAI 兼容请求头"""
return {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
def build_anthropic_headers(api_key):
"""构建 Anthropic 请求头,根据密钥前缀自动选择鉴权方式"""
headers = {
'anthropic-version': '2023-06-01',
'Content-Type': 'application/json',
}
if api_key.startswith('sk-'):
headers['x-api-key'] = api_key
else:
headers['Authorization'] = f'Bearer {api_key}'
return headers
# ─── 响应构建 ──────────────────────────────────────
def sse_response(generator):
"""将生成器包装为 SSE 流式响应"""
return Response(
generator,
content_type='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'},
)
def error_json(message, error_type='proxy_error', status=502):
"""构建 JSON 错误响应"""
return jsonify({'error': {'message': str(message), 'type': error_type}}), status
# ─── 上游请求转发 ──────────────────────────────────
def forward_request(url, headers, payload, stream=False):
"""转发请求到上游 API
返回值:
成功: (response, None)
失败流式: (None, error_body_str)
失败非流式: (None, Flask Response)
"""
try:
resp = requests.post(
url, headers=headers, json=payload,
timeout=Config.API_TIMEOUT, stream=stream,
)
if resp.status_code != 200:
body = resp.content.decode('utf-8', errors='replace')
logger.warning(f'上游返回 {resp.status_code}: {body[:300]}')
if stream:
return None, f'上游错误 {resp.status_code}: {body}'
return None, Response(
resp.content, status=resp.status_code,
content_type=resp.headers.get('Content-Type', 'application/json'),
)
return resp, None
except requests.RequestException as e:
logger.error(f'请求上游失败: {e}')
if stream:
return None, str(e)
return None, error_json(str(e))
# ─── SSE 流解析 ───────────────────────────────────
def iter_openai_sse(response):
"""解析 OpenAI SSE 流yield chunk 字典yield None 表示 [DONE]"""
for line in response.iter_lines():
if not line:
continue
decoded = line.decode('utf-8', errors='replace')
if not decoded.startswith('data:'):
continue
data_str = decoded[5:].strip()
if data_str == '[DONE]':
yield None
return
try:
yield json.loads(data_str)
except json.JSONDecodeError:
continue
def iter_anthropic_sse(response):
"""解析 Anthropic SSE 流yield (event_type, data_dict) 元组"""
event_type = ''
for line in response.iter_lines():
if not line:
continue
decoded = line.decode('utf-8', errors='replace')
if decoded.startswith('event:'):
event_type = decoded[6:].strip()
elif decoded.startswith('data:'):
data_str = decoded[5:].strip()
if not data_str:
continue
try:
yield event_type, json.loads(data_str)
except json.JSONDecodeError:
continue

101
utils/think_tag.py Normal file
View file

@ -0,0 +1,101 @@
"""<think> 标签提取器
部分上游 API DeepSeek使用 <think>...</think> 标签包裹思考内容
Cursor 期望 reasoning_content 字段本模块在流式和非流式响应中
提取 <think> 标签内容并转为 reasoning_content
"""
import re
_THINK_RE = re.compile(r'<think>(.*?)</think>', re.DOTALL)
def extract_from_text(content):
"""从文本中提取 <think> 标签(非流式)
返回: (cleaned_content, reasoning_content)
"""
if not isinstance(content, str) or '<think>' not in content:
return content, None
m = _THINK_RE.search(content)
if not m:
return content, None
reasoning = m.group(1).strip()
cleaned = (content[:m.start()] + content[m.end():]).strip() or None
return cleaned, reasoning
class ThinkTagExtractor:
"""流式 <think> 标签提取器
处理跨 chunk <think>...</think> 标签将标签内的文本
转为 reasoning_content delta标签外的文本保持为 content delta
"""
def __init__(self):
self._in_thinking = False
def process_chunk(self, chunk):
"""处理一个流式 chunk返回转换后的 chunk 列表"""
for choice in (chunk.get('choices') or []):
delta = choice.get('delta') or {}
if delta.get('reasoning_content'):
return [chunk]
content = delta.get('content')
if content is None or content == '':
return [chunk]
return self._split(chunk, content)
return [chunk]
def _split(self, chunk, text):
"""根据 <think> 标签拆分文本为多个 chunk"""
results = []
if self._in_thinking:
end = text.find('</think>')
if end >= 0:
self._in_thinking = False
if text[:end]:
results.append(self._make(chunk, reasoning=text[:end]))
rest = text[end + 8:].lstrip('\n')
if rest:
results.append(self._make(chunk, content=rest))
else:
results.append(self._make(chunk, reasoning=text))
else:
start = text.find('<think>')
if start >= 0:
before = text[:start]
after = text[start + 7:]
if before:
results.append(self._make(chunk, content=before))
end = after.find('</think>')
if end >= 0:
if after[:end]:
results.append(self._make(chunk, reasoning=after[:end]))
rest = after[end + 8:].lstrip('\n')
if rest:
results.append(self._make(chunk, content=rest))
else:
self._in_thinking = True
if after:
results.append(self._make(chunk, reasoning=after))
else:
results.append(chunk)
return results or [chunk]
@staticmethod
def _make(template, content=None, reasoning=None):
"""根据模板 chunk 构造新的 delta chunk"""
delta = {}
if content is not None:
delta['content'] = content
if reasoning is not None:
delta['reasoning_content'] = reasoning
return {
'id': template.get('id', ''),
'object': 'chat.completion.chunk',
'model': template.get('model', ''),
'choices': [{'index': 0, 'delta': delta, 'finish_reason': None}],
}

134
utils/tool_fixer.py Normal file
View file

@ -0,0 +1,134 @@
"""工具参数修复
修复 LLM 生成的工具调用参数中的常见问题
- 智能引号 普通引号
- file_path path 字段映射
- StrReplace 工具的 old_string 精确匹配修复
- Anthropic tool_use 块的 ID stop_reason 修复
"""
import os
import re
import uuid
# 智能引号字符集
_SMART_DOUBLE = frozenset('«»\u201c\u201d\u275e\u201f\u201e\u275d')
_SMART_SINGLE = frozenset('\u2018\u2019\u201a\u201b')
def normalize_args(args):
"""规范化工具参数file_path → path"""
if isinstance(args, dict) and 'file_path' in args and 'path' not in args:
args['path'] = args.pop('file_path')
return args
def repair_str_replace_args(tool_name, args):
"""修复 StrReplace/search_replace 工具的精确匹配问题
old_string 包含智能引号导致无法精确匹配文件内容时
用容错正则在文件中查找唯一匹配并替换为实际内容
"""
if not isinstance(args, dict):
return args
name_lower = (tool_name or '').lower()
if 'str_replace' not in name_lower and 'search_replace' not in name_lower:
return args
old_str = args.get('old_string') or args.get('old_str')
if not old_str:
return args
file_path = args.get('path') or args.get('file_path')
if not file_path or not os.path.isfile(file_path):
return args
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
except Exception:
return args
# 已精确匹配,无需修复
if old_str in content:
return args
# 构建容错正则尝试匹配
pattern = _build_fuzzy_pattern(old_str)
try:
matches = list(re.finditer(pattern, content))
except re.error:
return args
# 仅在唯一匹配时修复,避免歧义
if len(matches) != 1:
return args
matched = matches[0].group()
if 'old_string' in args:
args['old_string'] = matched
elif 'old_str' in args:
args['old_str'] = matched
# 同步修复 new_string 中的智能引号
new_str = args.get('new_string') or args.get('new_str')
if new_str:
fixed = _replace_smart_quotes(new_str)
if 'new_string' in args:
args['new_string'] = fixed
elif 'new_str' in args:
args['new_str'] = fixed
return args
def fix_anthropic_tool_use(response_data):
"""修复 Anthropic 响应中的 tool_use 块(补全 ID、修正 stop_reason"""
if not isinstance(response_data, dict):
return response_data
content = response_data.get('content', [])
if not isinstance(content, list):
return response_data
has_tool_use = False
for block in content:
if isinstance(block, dict) and block.get('type') == 'tool_use':
has_tool_use = True
if not block.get('id'):
block['id'] = f'toolu_{uuid.uuid4().hex[:24]}'
if has_tool_use and response_data.get('stop_reason') != 'tool_use':
response_data['stop_reason'] = 'tool_use'
return response_data
# ─── 内部辅助 ──────────────────────────────────────
def _build_fuzzy_pattern(text):
"""构建容错正则:智能引号可互换、空白可伸缩、反斜杠可重复"""
parts = []
for ch in text:
if ch in _SMART_DOUBLE or ch == '"':
parts.append('["\u00ab\u201c\u201d\u275e\u201f\u201e\u275d\u00bb]')
elif ch in _SMART_SINGLE or ch == "'":
parts.append("['\u2018\u2019\u201a\u201b]")
elif ch in (' ', '\t'):
parts.append(r'\s+')
elif ch == '\\':
parts.append(r'\\{1,2}')
else:
parts.append(re.escape(ch))
return ''.join(parts)
def _replace_smart_quotes(text):
"""将智能引号替换为普通 ASCII 引号"""
return ''.join(
'"' if ch in _SMART_DOUBLE else
"'" if ch in _SMART_SINGLE else
ch for ch in text
)