""" Utility functions for the application """ import json import re import time from typing import Dict, List, Optional, Any, Tuple, Generator import requests from app.core.config import settings def debug_log(message: str, *args) -> None: """Log debug message if debug mode is enabled""" if settings.DEBUG_LOGGING: if args: print(f"[DEBUG] {message % args}") else: print(f"[DEBUG] {message}") def generate_request_ids() -> Tuple[str, str]: """Generate unique IDs for chat and message""" timestamp = int(time.time()) chat_id = f"{timestamp * 1000}-{timestamp}" msg_id = str(timestamp * 1000000) return chat_id, msg_id def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]: """Get browser headers for API requests""" headers = settings.CLIENT_HEADERS.copy() if referer_chat_id: headers["Referer"] = f"{settings.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}" return headers def get_anonymous_token() -> str: """Get anonymous token for authentication""" headers = get_browser_headers() headers.update({ "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Referer": f"{settings.CLIENT_HEADERS['Origin']}/", }) try: response = requests.get( f"{settings.CLIENT_HEADERS['Origin']}/api/v1/auths/", headers=headers, timeout=10.0 ) if response.status_code != 200: raise Exception(f"anon token status={response.status_code}") data = response.json() token = data.get("token") if not token: raise Exception("anon token empty") return token except Exception as e: debug_log(f"获取匿名token失败: {e}") raise def get_auth_token() -> str: """Get authentication token (anonymous or fixed)""" if settings.ANONYMOUS_MODE: try: token = get_anonymous_token() debug_log(f"匿名token获取成功: {token[:10]}...") return token except Exception as e: debug_log(f"匿名token获取失败,回退固定token: {e}") return settings.BACKUP_TOKEN def transform_thinking_content(content: str) -> str: """Transform thinking content according to configuration""" # Remove summary tags content = re.sub(r'(?s).*?', '', content) # Clean up remaining tags content = content.replace("", "").replace("", "").replace("", "") content = content.strip() if settings.THINKING_PROCESSING == "think": content = re.sub(r']*>', '', content) content = content.replace("", "") elif settings.THINKING_PROCESSING == "strip": content = re.sub(r']*>', '', content) content = content.replace("", "") # Remove line prefixes content = content.lstrip("> ") content = content.replace("\n> ", "\n") return content.strip() def call_upstream_api( upstream_req: Any, chat_id: str, auth_token: str ) -> requests.Response: """Call upstream API with proper headers""" headers = get_browser_headers(chat_id) headers["Authorization"] = f"Bearer {auth_token}" debug_log(f"调用上游API: {settings.API_ENDPOINT}") debug_log(f"上游请求体: {upstream_req.model_dump_json()}") response = requests.post( settings.API_ENDPOINT, json=upstream_req.model_dump(exclude_none=True), headers=headers, timeout=60.0, stream=True ) debug_log(f"上游响应状态: {response.status_code}") return response