"""轻量 Thinking 缓存 纯内存缓存,在多轮对话中保存和恢复 thinking/reasoning 内容。 解决 Cursor 不会把 thinking 内容回传给 API 的问题, 某些模型(如推理模型)在缺少历史 thinking 时表现会下降。 """ from __future__ import annotations import hashlib import json import logging import re import time from typing import Any logger = logging.getLogger(__name__) _THINK_RE = re.compile(r'.*?', re.DOTALL) _UNCLOSED_THINK_RE = re.compile(r'.*$', re.DOTALL) _TOOL_ID_RE = re.compile(r'[^a-zA-Z0-9_-]') _TTL = 86400 # 24 hours class ThinkingCache: """纯内存 thinking 缓存,TTL 2 小时。""" def __init__(self): self._store: dict[str, tuple[str, float]] = {} def inject(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]: """遍历 assistant 消息,缺少 reasoning_content 时从缓存注入。""" sid = self._session_id(messages) if not sid: return messages now = time.time() for msg in messages: if msg.get('role') != 'assistant': continue if msg.get('reasoning_content'): continue key = sid + ':' + self._message_hash(msg) entry = self._store.get(key) if entry and (now - entry[1]) < _TTL: msg['reasoning_content'] = entry[0] logger.debug('已从缓存注入 thinking (%d 字符)', len(entry[0])) return messages def store_from_response( self, messages: list[dict[str, Any]], reasoning_content: str, ) -> None: """将响应中的 thinking 内容存入缓存。""" if not reasoning_content: return sid = self._session_id(messages) if not sid: return fake_msg: dict[str, Any] = {'role': 'assistant', 'content': '', 'tool_calls': []} key = sid + ':' + self._message_hash(fake_msg) self._store[key] = (reasoning_content, time.time()) self._cleanup() def store_assistant_thinking( self, messages: list[dict[str, Any]], assistant_msg: dict[str, Any], ) -> None: """从完整的 assistant 消息中提取并缓存 thinking。""" rc = assistant_msg.get('reasoning_content', '') if not rc: return sid = self._session_id(messages) if not sid: return key = sid + ':' + self._message_hash(assistant_msg) self._store[key] = (rc, time.time()) self._cleanup() def _session_id(self, messages: list[dict[str, Any]]) -> str: first_user = '' first_assistant = '' for msg in messages: role = msg.get('role', '') if role in ('system', 'developer'): continue if role == 'user' and not first_user: first_user = self._normalize_content( msg.get('content', '') ) elif role == 'assistant' and not first_assistant: first_assistant = self._normalize_content( msg.get('content', '') ) if first_user and first_assistant: break if not first_user or not first_assistant: return '' raw = first_user + '|' + first_assistant return hashlib.sha256(raw.encode()).hexdigest()[:16] def _message_hash(self, msg: dict[str, Any]) -> str: content = self._normalize_content(msg.get('content', '')) tool_ids = sorted( self._normalize_tool_id(tc.get('id', '')) for tc in msg.get('tool_calls', []) if isinstance(tc, dict) ) raw = json.dumps({'c': content, 't': tool_ids}, ensure_ascii=False) return hashlib.sha256(raw.encode()).hexdigest()[:16] @staticmethod def _normalize_content(content: Any) -> str: if isinstance(content, list): parts = [] for p in content: if isinstance(p, dict) and p.get('type') == 'text': parts.append(p.get('text', '')) elif isinstance(p, str): parts.append(p) text = '\n'.join(parts) elif isinstance(content, str): text = content else: text = str(content) if content else '' text = _THINK_RE.sub('', text) text = _UNCLOSED_THINK_RE.sub('', text) return text.strip() @staticmethod def _normalize_tool_id(tid: str) -> str: return _TOOL_ID_RE.sub('', tid) def _cleanup(self) -> None: """惰性清理过期条目(每 100 次写入触发一次全量扫描)。""" if len(self._store) < 100: return now = time.time() expired = [k for k, (_, ts) in self._store.items() if (now - ts) >= _TTL] for k in expired: del self._store[k] thinking_cache = ThinkingCache()