diff --git a/routes/chat.py b/routes/chat.py index 4fc03e8..3f695a4 100644 --- a/routes/chat.py +++ b/routes/chat.py @@ -165,6 +165,9 @@ def _handle_openai_stream( for chunk in iter_openai_sse(resp): if chunk is None: _dbg(f'流式响应结束,共 {chunk_count} 个数据片段') + close_chunk = think_extractor.finalize() + if close_chunk: + yield sse_data_message(close_chunk) yield sse_data_message('[DONE]') return diff --git a/routes/messages.py b/routes/messages.py index e552683..0a010ec 100644 --- a/routes/messages.py +++ b/routes/messages.py @@ -93,9 +93,14 @@ def _inject_thinking(data): def _process_stream(resp): - """处理 /v1/messages 流式响应,检测并注入 thinking 事件""" + """处理 /v1/messages 流式响应,检测并注入 thinking 事件 + + 追踪上游 content block 的 index,在注入 thinking blocks 时使用独立的 index, + 并将后续上游 block 的 index 偏移,避免冲突。 + """ reasoning_buf = '' injected = False + index_offset = 0 for line in resp.iter_lines(): if not line: @@ -119,7 +124,6 @@ def _process_stream(resp): modified = False - # 提取 reasoning_content for container_key in ('message', 'delta'): container = event_data.get(container_key) if not container: @@ -129,13 +133,17 @@ def _process_stream(resp): reasoning_buf += rc modified = True - # 在首个 text_delta 前注入 thinking blocks if reasoning_buf and not injected: if event_data.get('delta', {}).get('type') == 'text_delta': injected = True yield from _emit_thinking_blocks(reasoning_buf) + index_offset = 1 reasoning_buf = '' + if index_offset and 'index' in event_data: + event_data['index'] = event_data['index'] + index_offset + modified = True + yield f'data: {json.dumps(event_data)}\n\n' if modified else decoded + '\n\n' diff --git a/settings.py b/settings.py index 9580d21..58e657e 100644 --- a/settings.py +++ b/settings.py @@ -5,13 +5,13 @@ - model_mappings: Cursor 模型名 → {upstream_model, backend, target_url, api_key, custom_instructions} """ +import copy import json import os import threading from config import Config -# 数据目录放在项目根目录下,便于 Docker 卷挂载 _ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(_ROOT_DIR, 'data') SETTINGS_FILE = os.path.join(DATA_DIR, 'settings.json') @@ -38,10 +38,10 @@ def load(): with open(SETTINGS_FILE, 'r', encoding='utf-8') as f: _cache = {**_DEFAULTS, **json.load(f)} except (json.JSONDecodeError, OSError): - _cache = dict(_DEFAULTS) + _cache = copy.deepcopy(_DEFAULTS) else: - _cache = dict(_DEFAULTS) - return dict(_cache) + _cache = copy.deepcopy(_DEFAULTS) + return copy.deepcopy(_cache) def save(data): @@ -58,10 +58,13 @@ def save(data): def get(): - """获取当前配置快照,优先返回内存缓存中的结果。""" - if _cache is None: - return load() - return dict(_cache) + """获取当前配置的深拷贝快照,保证调用方修改不影响缓存。""" + with _lock: + if _cache is None: + pass + else: + return copy.deepcopy(_cache) + return load() def get_url(): diff --git a/utils/http.py b/utils/http.py index 3b5d8c2..c0f4506 100644 --- a/utils/http.py +++ b/utils/http.py @@ -126,10 +126,15 @@ def iter_responses_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: def _iter_event_sse(response) -> Iterator[tuple[str, dict[str, Any]]]: - """解析带 event/data 的通用 SSE 流。""" + """解析带 event/data 的通用 SSE 流。 + + SSE 规范中空行是事件分隔符,遇到空行时重置 event_type, + 避免前一个事件的类型泄漏到下一个事件。 + """ event_type = '' for line in response.iter_lines(): if not line: + event_type = '' continue decoded = line.decode('utf-8', errors='replace') if decoded.startswith('event:'): diff --git a/utils/think_tag.py b/utils/think_tag.py index 269e7b6..2a783e0 100644 --- a/utils/think_tag.py +++ b/utils/think_tag.py @@ -30,24 +30,84 @@ class ThinkTagExtractor: 处理跨 chunk 的 ... 标签,将标签内的文本 转为 reasoning_content delta,标签外的文本保持为 content delta。 + + 额外处理: + - content 和 tool_calls 同时出现时拆分为两个独立 chunk(Cursor 会丢弃同时包含两者的 content) + - tool_calls 首次出现时在前面插入换行,确保文本以换行结束 + - 流结束时如果 think 标签仍未关闭,自动合成关闭 chunk """ def __init__(self): """初始化跨 chunk 的 thinking 状态跟踪。""" self._in_thinking = False + self._tool_calls_seen = False def process_chunk(self, chunk): """处理一个流式 chunk,返回转换后的 chunk 列表""" for choice in (chunk.get('choices') or []): delta = choice.get('delta') or {} + + has_tool_calls = bool(delta.get('tool_calls')) + has_content = delta.get('content') is not None and delta.get('content') != '' + + # content 和 tool_calls 同时出现:拆分为两个独立事件 + if has_content and has_tool_calls: + results = [] + content_chunk = self._make(chunk, content=delta['content']) + results.extend(self._process_content(content_chunk, delta['content'])) + tc_chunk = { + 'id': chunk.get('id', ''), + 'object': 'chat.completion.chunk', + 'model': chunk.get('model', ''), + 'choices': [{'index': 0, 'delta': {'tool_calls': delta['tool_calls']}, + 'finish_reason': choice.get('finish_reason')}], + } + results.extend(self._handle_tool_calls_chunk(tc_chunk)) + return results + + if has_tool_calls: + return self._handle_tool_calls_chunk(chunk) + if delta.get('reasoning_content'): return [chunk] content = delta.get('content') if content is None or content == '': return [chunk] - return self._split(chunk, content) + return self._process_content(chunk, content) return [chunk] + def finalize(self): + """流结束时调用,如果 think 标签仍未关闭则返回关闭 chunk""" + if not self._in_thinking: + return None + self._in_thinking = False + return { + 'id': '', + 'object': 'chat.completion.chunk', + 'model': '', + 'choices': [{'index': 0, 'delta': {'content': '\n\n\n'}, 'finish_reason': None}], + } + + def _process_content(self, chunk, content): + """处理包含 content 的 chunk""" + return self._split(chunk, content) + + def _handle_tool_calls_chunk(self, chunk): + """处理包含 tool_calls 的 chunk,首次出现时在前面插入换行""" + results = [] + if not self._tool_calls_seen: + self._tool_calls_seen = True + if self._in_thinking: + self._in_thinking = False + results.append(self._make(chunk, content='\n\n\n')) + else: + results.append(self._make(chunk, content='\n')) + elif self._in_thinking: + self._in_thinking = False + results.append(self._make(chunk, content='\n\n\n')) + results.append(chunk) + return results + def _split(self, chunk, text): """根据 标签拆分文本为多个 chunk""" results = []