File size: 51,358 Bytes
9752c22 f0cacfe 2ae0d3b f0cacfe 6e09c2f f0cacfe d382277 f0cacfe d382277 f0cacfe 9752c22 f0cacfe 2ae0d3b 02896ca 2ae0d3b 6e09c2f d382277 6e09c2f 2ae0d3b 6e09c2f 2ae0d3b 6e09c2f 2ae0d3b 6e09c2f 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b f0cacfe d382277 2ae0d3b f0cacfe d382277 2ae0d3b f0cacfe 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 02896ca 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 2ae0d3b 9752c22 02896ca 2ae0d3b f0cacfe 2ae0d3b 02896ca f0cacfe 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b 02896ca f0cacfe 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b 02896ca 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b f0cacfe 9752c22 2ae0d3b f0cacfe 2ae0d3b f0cacfe 2ae0d3b f0cacfe 02896ca 2ae0d3b f0cacfe 02896ca 2ae0d3b f0cacfe 02896ca 2ae0d3b f0cacfe 9752c22 f0cacfe 2ae0d3b f0cacfe 9752c22 f0cacfe 2ae0d3b f0cacfe 02896ca 6e09c2f 02896ca f0cacfe 02896ca f0cacfe d382277 2ae0d3b f0cacfe 02896ca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 | # -*- coding: utf-8 -*-
"""
OpenAI Compatible API Server for Z.AI
=====================================
This module provides an OpenAI-compatible API server that forwards requests
to the Z.AI chat service with proper authentication and response formatting.
"""
import json
import os
import re
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any, Union, Generator, Tuple, Literal
import requests
from fastapi import FastAPI, Request, Response, HTTPException, Header
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
# =============================================================================
# Configuration Constants
# =============================================================================
class ServerConfig:
"""Centralized server configuration"""
# API Configuration
API_ENDPOINT: str = os.getenv("API_ENDPOINT", "https://chat.z.ai/api/chat/completions")
AUTH_TOKEN: str = os.getenv("AUTH_TOKEN", "sk-your-api-key")
ANTHROPIC_API_KEY: str = os.getenv("ANTHROPIC_API_KEY", AUTH_TOKEN)
BACKUP_TOKEN: str = os.getenv("BACKUP_TOKEN", "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjMxNmJjYjQ4LWZmMmYtNGExNS04NTNkLWYyYTI5YjY3ZmYwZiIsImVtYWlsIjoiR3Vlc3QtMTc1NTg0ODU4ODc4OEBndWVzdC5jb20ifQ.PktllDySS3trlyuFpTeIZf-7hl8Qu1qYF3BxjgIul0BrNux2nX9hVzIjthLXKMWAf9V0qM8Vm_iyDqkjPGsaiQ")
# Model Configuration
PRIMARY_MODEL: str = os.getenv("PRIMARY_MODEL", "GLM-4.5")
THINKING_MODEL: str = os.getenv("THINKING_MODEL", "GLM-4.5-Thinking")
SEARCH_MODEL: str = os.getenv("SEARCH_MODEL", "GLM-4.5-Search")
# Server Configuration
LISTEN_PORT: int = int(os.getenv("LISTEN_PORT", "8080"))
DEBUG_LOGGING: bool = os.getenv("DEBUG_LOGGING", "true").lower() == "true"
# Feature Configuration
THINKING_PROCESSING: str = os.getenv("THINKING_PROCESSING", "think") # strip: 去除<details>标签;think: 转为</think>标签;raw: 保留原样
ANONYMOUS_MODE: bool = os.getenv("ANONYMOUS_MODE", "true").lower() == "true"
TOOL_SUPPORT: bool = os.getenv("TOOL_SUPPORT", "true").lower() == "true"
SCAN_LIMIT: int = int(os.getenv("SCAN_LIMIT", "200000"))
# Browser Headers
CLIENT_HEADERS: Dict[str, str] = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0",
"Accept-Language": "zh-CN",
"sec-ch-ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"X-FE-Version": "prod-fe-1.0.70",
"Origin": "https://chat.z.ai",
}
# =============================================================================
# Data Models
# =============================================================================
class Message(BaseModel):
"""Chat message model"""
role: str
content: Optional[str] = None
reasoning_content: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
class OpenAIRequest(BaseModel):
"""OpenAI-compatible request model"""
model: str
messages: List[Message]
stream: Optional[bool] = False
temperature: Optional[float] = None
max_tokens: Optional[int] = None
tools: Optional[List[Dict[str, Any]]] = None
tool_choice: Optional[Any] = None
class ModelItem(BaseModel):
"""Model information item"""
id: str
name: str
owned_by: str
class UpstreamRequest(BaseModel):
"""Upstream service request model"""
stream: bool
model: str
messages: List[Message]
params: Dict[str, Any] = {}
features: Dict[str, Any] = {}
background_tasks: Optional[Dict[str, bool]] = None
chat_id: Optional[str] = None
id: Optional[str] = None
mcp_servers: Optional[List[str]] = None
model_item: Optional[ModelItem] = None
tool_servers: Optional[List[str]] = None
variables: Optional[Dict[str, str]] = None
model_config = {'protected_namespaces': ()}
class Delta(BaseModel):
"""Stream delta model"""
role: Optional[str] = None
content: Optional[str] = None
reasoning_content: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
class Choice(BaseModel):
"""Response choice model"""
index: int
message: Optional[Message] = None
delta: Optional[Delta] = None
finish_reason: Optional[str] = None
class Usage(BaseModel):
"""Token usage statistics"""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
class OpenAIResponse(BaseModel):
"""OpenAI-compatible response model"""
id: str
object: str
created: int
model: str
choices: List[Choice]
usage: Optional[Usage] = None
class UpstreamError(BaseModel):
"""Upstream error model"""
detail: str
code: int
class UpstreamDataInner(BaseModel):
"""Inner upstream data model"""
error: Optional[UpstreamError] = None
class UpstreamDataData(BaseModel):
"""Upstream data content model"""
delta_content: str = ""
edit_content: str = ""
phase: str = ""
done: bool = False
usage: Optional[Usage] = None
error: Optional[UpstreamError] = None
inner: Optional[UpstreamDataInner] = None
class UpstreamData(BaseModel):
"""Upstream data model"""
type: str
data: UpstreamDataData
error: Optional[UpstreamError] = None
class Model(BaseModel):
"""Model information for listing"""
id: str
object: str = "model"
created: int
owned_by: str
# ANTHROPIC API 兼容性模型
class ContentBlock(BaseModel):
type: str
text: str
class AnthropicMessage(BaseModel):
role: Literal["user", "assistant"]
content: Union[str, List[ContentBlock]]
class AnthropicRequest(BaseModel):
model: str
messages: List[AnthropicMessage]
system: Optional[Union[str, List[ContentBlock]]] = None
max_tokens: int = 1024
stream: bool = False
temperature: Optional[float] = None
class ModelsResponse(BaseModel):
"""Models list response model"""
object: str = "list"
data: List[Model]
# ANTHROPIC API 兼容性函数
def stream_anthropic_generator(upstream_response: requests.Response, request_id: str, requested_model: str):
"""生成 Anthropic 兼容的流式响应事件"""
usage = {"input_tokens": 0, "output_tokens": 0}
start_event = {
"type": "message_start",
"message": {
"id": request_id,
"type": "message",
"role": "assistant",
"content": [],
"model": requested_model,
"stop_reason": None,
"stop_sequence": None,
"usage": usage
}
}
yield f"event: {start_event['type']}\ndata: {json.dumps(start_event['message'])}\n\n"
# 发送 content_block_start 事件
content_start_data = {
"type": "content_block_start",
"index": 0,
"content_block": {
"type": "text",
"text": ""
}
}
yield f"event: content_block_start\ndata: {json.dumps(content_start_data)}\n\n"
# 处理上游响应
for line in upstream_response.iter_lines():
if not line.startswith(b"data:"): continue
data_str = line[5:].strip()
if not data_str: continue
try:
data = json.loads(data_str.decode('utf-8'))
delta_content = data.get("data", {}).get("delta_content", "")
phase = data.get("data", {}).get("phase", "")
# 处理内容增量
if delta_content:
out_content = transform_thinking_content(delta_content) if phase == "thinking" else delta_content
if out_content:
usage["output_tokens"] += len(out_content) // 4 # 简单估算
delta_data = {
"type": "content_block_delta",
"index": 0,
"delta": {
"type": "text_delta",
"text": out_content
}
}
yield f"event: content_block_delta\ndata: {json.dumps(delta_data)}\n\n"
# 处理结束
if data.get("data", {}).get("done", False) or phase == "done":
# 发送 content_block_stop
content_stop_data = {
"type": "content_block_stop",
"index": 0
}
yield f"event: content_block_stop\ndata: {json.dumps(content_stop_data)}\n\n"
# 发送 message_delta
message_delta_data = {
"type": "message_delta",
"delta": {
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {
"input_tokens": usage["input_tokens"],
"output_tokens": usage["output_tokens"]
}
}
}
yield f"event: message_delta\ndata: {json.dumps(message_delta_data)}\n\n"
# 发送 message_stop
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
break
except json.JSONDecodeError:
continue
def transform_thinking_content(content: str) -> str:
"""Transform thinking content according to configuration"""
# Remove summary tags
content = re.sub(r'(?s)<summary>.*?</summary>', '', content)
# Clean up remaining tags
content = content.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "")
content = content.strip()
if ServerConfig.THINKING_PROCESSING == "think":
content = re.sub(r'<details[^>]*>', '<think>', content)
content = content.replace("</details>", "</think>")
elif ServerConfig.THINKING_PROCESSING == "strip":
content = re.sub(r'<details[^>]*>', '', content)
content = content.replace("</details>", "")
# Remove line prefixes
content = content.lstrip("> ")
content = content.replace("\n> ", "\n")
return content.strip()
# =============================================================================
# SSE Parser
# =============================================================================
class SSEParser:
"""Server-Sent Events parser for streaming responses"""
def __init__(self, response: requests.Response, debug_mode: bool = False):
"""Initialize SSE parser
Args:
response: requests.Response object with stream=True
debug_mode: Enable debug logging
"""
self.response = response
self.debug_mode = debug_mode
self.buffer = ""
self.line_count = 0
def debug_log(self, format_str: str, *args) -> None:
"""Log debug message if debug mode is enabled"""
if self.debug_mode:
if args:
print(f"[SSE_PARSER] {format_str % args}")
else:
print(f"[SSE_PARSER] {format_str}")
def iter_events(self) -> Generator[Dict[str, Any], None, None]:
"""Iterate over SSE events
Yields:
dict: Parsed SSE event data
"""
self.debug_log("开始解析 SSE 流")
for line in self.response.iter_lines():
self.line_count += 1
# Skip empty lines
if not line:
continue
# Decode bytes
if isinstance(line, bytes):
try:
line = line.decode('utf-8')
except UnicodeDecodeError:
self.debug_log(f"第{self.line_count}行解码失败,跳过")
continue
# Skip comment lines
if line.startswith(':'):
continue
# Parse field-value pairs
if ':' in line:
field, value = line.split(':', 1)
field = field.strip()
value = value.lstrip()
if field == 'data':
self.debug_log(f"收到数据 (第{self.line_count}行): {value}")
# Try to parse JSON
try:
data = json.loads(value)
yield {
'type': 'data',
'data': data,
'raw': value
}
except json.JSONDecodeError:
yield {
'type': 'data',
'data': value,
'raw': value,
'is_json': False
}
elif field == 'event':
yield {'type': 'event', 'event': value}
elif field == 'id':
yield {'type': 'id', 'id': value}
elif field == 'retry':
try:
retry = int(value)
yield {'type': 'retry', 'retry': retry}
except ValueError:
self.debug_log(f"无效的 retry 值: {value}")
def iter_data_only(self) -> Generator[Dict[str, Any], None, None]:
"""Iterate only over data events"""
for event in self.iter_events():
if event['type'] == 'data':
yield event
def iter_json_data(self, model_class: Optional[type] = None) -> Generator[Dict[str, Any], None, None]:
"""Iterate only over JSON data events with optional validation
Args:
model_class: Optional Pydantic model class for validation
Yields:
dict: JSON data events
"""
for event in self.iter_events():
if event['type'] == 'data' and event.get('is_json', True):
try:
if model_class:
data = model_class.model_validate_json(event['raw'])
yield {
'type': 'data',
'data': data,
'raw': event['raw']
}
else:
yield event
except Exception as e:
self.debug_log(f"数据验证失败: {e}")
continue
def close(self) -> None:
"""Close the response connection"""
if hasattr(self.response, 'close'):
self.response.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit"""
self.close()
# =============================================================================
# Function Call Utilities
# =============================================================================
def generate_tool_prompt(tools: List[Dict[str, Any]]) -> str:
"""Generate tool injection prompt with enhanced formatting"""
if not tools:
return ""
tool_definitions = []
for tool in tools:
if tool.get("type") != "function":
continue
function_spec = tool.get("function", {}) or {}
function_name = function_spec.get("name", "unknown")
function_description = function_spec.get("description", "")
parameters = function_spec.get("parameters", {}) or {}
# Create structured tool definition
tool_info = [f"## {function_name}", f"**Purpose**: {function_description}"]
# Add parameter details
parameter_properties = parameters.get("properties", {}) or {}
required_parameters = set(parameters.get("required", []) or [])
if parameter_properties:
tool_info.append("**Parameters**:")
for param_name, param_details in parameter_properties.items():
param_type = (param_details or {}).get("type", "any")
param_desc = (param_details or {}).get("description", "")
requirement_flag = "**Required**" if param_name in required_parameters else "*Optional*"
tool_info.append(f"- `{param_name}` ({param_type}) - {requirement_flag}: {param_desc}")
tool_definitions.append("\n".join(tool_info))
if not tool_definitions:
return ""
# Build comprehensive tool prompt
prompt_template = (
"\n\n# AVAILABLE FUNCTIONS\n" +
"\n\n---\n".join(tool_definitions) +
"\n\n# USAGE INSTRUCTIONS\n"
"When you need to execute a function, respond ONLY with a JSON object containing tool_calls:\n"
"```json\n"
"{\n"
' "tool_calls": [\n'
" {\n"
' "id": "call_" + unique_id,\n'
' "type": "function",\n'
' "function": {\n'
' "name": "function_name",\n'
' "arguments": {\n'
' "param1": "value1"\n'
' }\n'
" }\n"
" }\n"
" ]\n"
"}\n"
"```\n"
"Important: No explanatory text before or after the JSON.\n"
)
return prompt_template
def process_messages_with_tools(
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[Any] = None
) -> List[Dict[str, Any]]:
"""Process messages and inject tool prompts"""
processed: List[Dict[str, Any]] = []
if tools and ServerConfig.TOOL_SUPPORT and (tool_choice != "none"):
tools_prompt = generate_tool_prompt(tools)
has_system = any(m.get("role") == "system" for m in messages)
if has_system:
for m in messages:
if m.get("role") == "system":
mm = dict(m)
content = mm.get("content", "")
if content is None:
content = ""
mm["content"] = content + tools_prompt
processed.append(mm)
else:
processed.append(m)
else:
processed = [{"role": "system", "content": "你是一个有用的助手。" + tools_prompt}] + messages
# Add tool choice hints
if tool_choice in ("required", "auto"):
if processed and processed[-1].get("role") == "user":
last = dict(processed[-1])
content = last.get("content", "")
if content is None:
content = ""
last["content"] = content + "\n\n请根据需要使用提供的工具函数。"
processed[-1] = last
elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
fname = (tool_choice.get("function") or {}).get("name")
if fname and processed and processed[-1].get("role") == "user":
last = dict(processed[-1])
content = last.get("content", "")
if content is None:
content = ""
last["content"] = content + f"\n\n请使用 {fname} 函数来处理这个请求。"
processed[-1] = last
else:
processed = list(messages)
# Handle tool/function messages
final_msgs: List[Dict[str, Any]] = []
for m in processed:
role = m.get("role")
if role in ("tool", "function"):
tool_name = m.get("name", "unknown")
tool_content = m.get("content", "")
if isinstance(tool_content, dict):
tool_content = json.dumps(tool_content, ensure_ascii=False)
elif tool_content is None:
tool_content = ""
# 确保内容不为空且不包含 None
content = f"工具 {tool_name} 返回结果:\n```json\n{tool_content}\n```"
if not content.strip():
content = f"工具 {tool_name} 执行完成"
final_msgs.append({
"role": "assistant",
"content": content,
})
else:
final_msgs.append(m)
return final_msgs
# Tool Extraction Patterns
TOOL_CALL_FENCE_PATTERN = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL)
TOOL_CALL_INLINE_PATTERN = re.compile(r"(\{[^{}]{0,10000}\"tool_calls\".*?\})", re.DOTALL)
FUNCTION_CALL_PATTERN = re.compile(r"调用函数\s*[::]\s*([\w\-\.]+)\s*(?:参数|arguments)[::]\s*(\{.*?\})", re.DOTALL)
def extract_tool_invocations(text: str) -> Optional[List[Dict[str, Any]]]:
"""Extract tool invocations from response text"""
if not text:
return None
# Limit scan size for performance
scannable_text = text[:ServerConfig.SCAN_LIMIT]
# Attempt 1: Extract from JSON code blocks
json_blocks = TOOL_CALL_FENCE_PATTERN.findall(scannable_text)
for json_block in json_blocks:
try:
parsed_data = json.loads(json_block)
tool_calls = parsed_data.get("tool_calls")
if tool_calls and isinstance(tool_calls, list):
return tool_calls
except (json.JSONDecodeError, AttributeError):
continue
# Attempt 2: Extract inline JSON objects
inline_match = TOOL_CALL_INLINE_PATTERN.search(scannable_text)
if inline_match:
try:
inline_json = inline_match.group(1)
parsed_data = json.loads(inline_json)
tool_calls = parsed_data.get("tool_calls")
if tool_calls and isinstance(tool_calls, list):
return tool_calls
except (json.JSONDecodeError, AttributeError):
pass
# Attempt 3: Parse natural language function calls
natural_lang_match = FUNCTION_CALL_PATTERN.search(scannable_text)
if natural_lang_match:
function_name = natural_lang_match.group(1).strip()
arguments_str = natural_lang_match.group(2).strip()
try:
# Validate JSON format
json.loads(arguments_str)
return [{
"id": f"invoke_{int(time.time() * 1000000)}",
"type": "function",
"function": {
"name": function_name,
"arguments": arguments_str
}
}]
except json.JSONDecodeError:
return None
return None
def remove_tool_json_content(text: str) -> str:
"""Remove tool JSON content from response text"""
def remove_tool_call_block(match: re.Match) -> str:
json_content = match.group(1)
try:
parsed_data = json.loads(json_content)
if "tool_calls" in parsed_data:
return ""
except (json.JSONDecodeError, AttributeError):
pass
return match.group(0)
# Remove fenced tool JSON blocks
cleaned_text = TOOL_CALL_FENCE_PATTERN.sub(remove_tool_call_block, text)
# Remove inline tool JSON
cleaned_text = TOOL_CALL_INLINE_PATTERN.sub("", cleaned_text)
return cleaned_text.strip()
# =============================================================================
# Utility Functions
# =============================================================================
def debug_log(message: str, *args) -> None:
"""Log debug message if debug mode is enabled"""
if ServerConfig.DEBUG_LOGGING:
if args:
print(f"[DEBUG] {message % args}")
else:
print(f"[DEBUG] {message}")
def generate_request_ids() -> Tuple[str, str]:
"""Generate unique IDs for chat and message"""
timestamp = int(time.time())
chat_id = f"{timestamp * 1000}-{timestamp}"
msg_id = str(timestamp * 1000000)
return chat_id, msg_id
def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]:
"""Get browser headers for API requests"""
headers = ServerConfig.CLIENT_HEADERS.copy()
if referer_chat_id:
headers["Referer"] = f"{ServerConfig.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}"
return headers
def get_anonymous_token() -> str:
"""Get anonymous token for authentication"""
headers = get_browser_headers()
headers.update({
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Referer": f"{ServerConfig.CLIENT_HEADERS['Origin']}/",
})
try:
response = requests.get(
f"{ServerConfig.CLIENT_HEADERS['Origin']}/api/v1/auths/",
headers=headers,
timeout=10.0
)
if response.status_code != 200:
raise Exception(f"anon token status={response.status_code}")
data = response.json()
token = data.get("token")
if not token:
raise Exception("anon token empty")
return token
except Exception as e:
debug_log(f"获取匿名token失败: {e}")
raise
def get_auth_token() -> str:
"""Get authentication token (anonymous or fixed)"""
if ServerConfig.ANONYMOUS_MODE:
try:
token = get_anonymous_token()
debug_log(f"匿名token获取成功: {token[:10]}...")
return token
except Exception as e:
debug_log(f"匿名token获取失败,回退固定token: {e}")
return ServerConfig.BACKUP_TOKEN
def create_openai_response_chunk(
model: str,
delta: Optional[Delta] = None,
finish_reason: Optional[str] = None
) -> OpenAIResponse:
"""Create OpenAI response chunk for streaming"""
return OpenAIResponse(
id=f"chatcmpl-{int(time.time())}",
object="chat.completion.chunk",
created=int(time.time()),
model=model,
choices=[Choice(
index=0,
delta=delta or Delta(),
finish_reason=finish_reason
)]
)
def handle_upstream_error(error: UpstreamError) -> Generator[str, None, None]:
"""Handle upstream error response"""
debug_log(f"上游错误: code={error.code}, detail={error.detail}")
# Send end chunk
end_chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
finish_reason="stop"
)
yield f"data: {end_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
def call_upstream_api(
upstream_req: UpstreamRequest,
chat_id: str,
auth_token: str
) -> requests.Response:
"""Call upstream API with proper headers"""
headers = get_browser_headers(chat_id)
headers["Authorization"] = f"Bearer {auth_token}"
debug_log(f"调用上游API: {ServerConfig.API_ENDPOINT}")
debug_log(f"上游请求体: {upstream_req.model_dump_json()}")
response = requests.post(
ServerConfig.API_ENDPOINT,
json=upstream_req.model_dump(exclude_none=True),
headers=headers,
timeout=60.0,
stream=True
)
debug_log(f"上游响应状态: {response.status_code}")
return response
# =============================================================================
# Response Handlers
# =============================================================================
class ResponseHandler:
"""Base class for response handling"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str):
self.upstream_req = upstream_req
self.chat_id = chat_id
self.auth_token = auth_token
def _call_upstream(self) -> requests.Response:
"""Call upstream API with error handling"""
try:
return call_upstream_api(self.upstream_req, self.chat_id, self.auth_token)
except Exception as e:
debug_log(f"调用上游失败: {e}")
raise
def _handle_upstream_error(self, response: requests.Response) -> None:
"""Handle upstream error response"""
debug_log(f"上游返回错误状态: {response.status_code}")
if ServerConfig.DEBUG_LOGGING:
debug_log(f"上游错误响应: {response.text}")
class StreamResponseHandler(ResponseHandler):
"""Handler for streaming responses"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False):
super().__init__(upstream_req, chat_id, auth_token)
self.has_tools = has_tools
self.buffered_content = ""
self.tool_calls = None
def handle(self) -> Generator[str, None, None]:
"""Handle streaming response"""
debug_log(f"开始处理流式响应 (chat_id={self.chat_id})")
try:
response = self._call_upstream()
except Exception:
yield "data: {\"error\": \"Failed to call upstream\"}\n\n"
return
if response.status_code != 200:
self._handle_upstream_error(response)
yield "data: {\"error\": \"Upstream error\"}\n\n"
return
# Send initial role chunk
first_chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
delta=Delta(role="assistant")
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Process stream
debug_log("开始读取上游SSE流")
sent_initial_answer = False
with SSEParser(response, debug_mode=ServerConfig.DEBUG_LOGGING) as parser:
for event in parser.iter_json_data(UpstreamData):
upstream_data = event['data']
# Check for errors
if self._has_error(upstream_data):
error = self._get_error(upstream_data)
yield from handle_upstream_error(error)
break
debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, "
f"内容长度: {len(upstream_data.data.delta_content)}, 完成: {upstream_data.data.done}")
# Process content
yield from self._process_content(upstream_data, sent_initial_answer)
# Check if done
if upstream_data.data.done or upstream_data.data.phase == "done":
debug_log("检测到流结束信号")
yield from self._send_end_chunk()
break
def _has_error(self, upstream_data: UpstreamData) -> bool:
"""Check if upstream data contains error"""
return bool(
upstream_data.error or
upstream_data.data.error or
(upstream_data.data.inner and upstream_data.data.inner.error)
)
def _get_error(self, upstream_data: UpstreamData) -> UpstreamError:
"""Get error from upstream data"""
return (
upstream_data.error or
upstream_data.data.error or
(upstream_data.data.inner.error if upstream_data.data.inner else None)
)
def _process_content(
self,
upstream_data: UpstreamData,
sent_initial_answer: bool
) -> Generator[str, None, None]:
"""Process content from upstream data"""
content = upstream_data.data.delta_content or upstream_data.data.edit_content
if not content:
return
# Transform thinking content
if upstream_data.data.phase == "thinking":
content = transform_thinking_content(content)
# Buffer content if tools are enabled
if self.has_tools:
self.buffered_content += content
else:
# Handle initial answer content
if (not sent_initial_answer and
upstream_data.data.edit_content and
upstream_data.data.phase == "answer"):
content = self._extract_edit_content(upstream_data.data.edit_content)
if content:
debug_log(f"发送普通内容: {content}")
chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
delta=Delta(content=content)
)
yield f"data: {chunk.model_dump_json()}\n\n"
sent_initial_answer = True
# Handle delta content
if upstream_data.data.delta_content:
if content:
if upstream_data.data.phase == "thinking":
debug_log(f"发送思考内容: {content}")
chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
delta=Delta(reasoning_content=content)
)
else:
debug_log(f"发送普通内容: {content}")
chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
delta=Delta(content=content)
)
yield f"data: {chunk.model_dump_json()}\n\n"
def _extract_edit_content(self, edit_content: str) -> str:
"""Extract content from edit_content field"""
parts = edit_content.split("</details>")
return parts[1] if len(parts) > 1 else ""
def _send_end_chunk(self) -> Generator[str, None, None]:
"""Send end chunk and DONE signal"""
if self.has_tools:
# Try to extract tool calls from buffered content
self.tool_calls = extract_tool_invocations(self.buffered_content)
if self.tool_calls:
# Send tool calls
tool_calls_list = []
for i, tc in enumerate(self.tool_calls):
tool_calls_list.append({
"index": i,
"id": tc.get("id"),
"type": tc.get("type", "function"),
"function": tc.get("function", {}),
})
out_chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
delta=Delta(tool_calls=tool_calls_list)
)
yield f"data: {out_chunk.model_dump_json()}\n\n"
finish_reason = "tool_calls"
else:
# Send regular content
trimmed_content = remove_tool_json_content(self.buffered_content)
if trimmed_content:
content_chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
delta=Delta(content=trimmed_content)
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
finish_reason = "stop"
else:
finish_reason = "stop"
# Send final chunk
end_chunk = create_openai_response_chunk(
model=ServerConfig.PRIMARY_MODEL,
finish_reason=finish_reason
)
yield f"data: {end_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
debug_log("流式响应完成")
class NonStreamResponseHandler(ResponseHandler):
"""Handler for non-streaming responses"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False):
super().__init__(upstream_req, chat_id, auth_token)
self.has_tools = has_tools
def handle(self) -> JSONResponse:
"""Handle non-streaming response"""
debug_log(f"开始处理非流式响应 (chat_id={self.chat_id})")
try:
response = self._call_upstream()
except Exception as e:
debug_log(f"调用上游失败: {e}")
raise HTTPException(status_code=502, detail="Failed to call upstream")
if response.status_code != 200:
self._handle_upstream_error(response)
raise HTTPException(status_code=502, detail="Upstream error")
# Collect full response
full_content = []
debug_log("开始收集完整响应内容")
with SSEParser(response, debug_mode=ServerConfig.DEBUG_LOGGING) as parser:
for event in parser.iter_json_data(UpstreamData):
upstream_data = event['data']
if upstream_data.data.delta_content:
content = upstream_data.data.delta_content
if upstream_data.data.phase == "thinking":
content = transform_thinking_content(content)
if content:
full_content.append(content)
if upstream_data.data.done or upstream_data.data.phase == "done":
debug_log("检测到完成信号,停止收集")
break
final_content = "".join(full_content)
debug_log(f"内容收集完成,最终长度: {len(final_content)}")
# Handle tool calls for non-streaming
tool_calls = None
finish_reason = "stop"
message_content = final_content
if self.has_tools:
tool_calls = extract_tool_invocations(final_content)
if tool_calls:
# Content must be null when tool_calls are present (OpenAI spec)
message_content = None
finish_reason = "tool_calls"
else:
# Remove tool JSON from content
message_content = remove_tool_json_content(final_content)
# Build response
response_data = OpenAIResponse(
id=f"chatcmpl-{int(time.time())}",
object="chat.completion",
created=int(time.time()),
model=ServerConfig.PRIMARY_MODEL,
choices=[Choice(
index=0,
message=Message(
role="assistant",
content=message_content,
tool_calls=tool_calls
),
finish_reason=finish_reason
)],
usage=Usage()
)
debug_log("非流式响应发送完成")
return JSONResponse(content=response_data.model_dump(exclude_none=True))
# =============================================================================
# FastAPI Application
# =============================================================================
app = FastAPI(
title="OpenAI Compatible API Server",
description="An OpenAI-compatible API server for Z.AI chat service",
version="1.0.0"
)
# CORS middleware
@app.middleware("http")
async def add_cors_headers(request: Request, call_next):
"""Add CORS headers to responses"""
response = await call_next(request)
response.headers.update({
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Allow-Credentials": "true"
})
return response
# =============================================================================
# API Endpoints
# =============================================================================
@app.options("/")
async def handle_options():
"""Handle OPTIONS requests"""
return Response(status_code=200)
@app.get("/")
async def root():
"""Root endpoint"""
return {"message": "OpenAI Compatible API Server"}
@app.get("/v1/models")
async def list_models():
"""List available models"""
current_time = int(time.time())
response = ModelsResponse(
data=[
Model(
id=ServerConfig.PRIMARY_MODEL,
created=current_time,
owned_by="z.ai"
),
Model(
id=ServerConfig.THINKING_MODEL,
created=current_time,
owned_by="z.ai"
),
Model(
id=ServerConfig.SEARCH_MODEL,
created=current_time,
owned_by="z.ai"
),
]
)
return response
@app.post("/v1/chat/completions")
async def chat_completions(
request: OpenAIRequest,
authorization: str = Header(...)
):
"""Handle chat completion requests"""
debug_log("收到chat completions请求")
try:
# Validate API key
if not authorization.startswith("Bearer "):
debug_log("缺少或无效的Authorization头")
raise HTTPException(status_code=401, detail="Missing or invalid Authorization header")
api_key = authorization[7:]
if api_key != ServerConfig.AUTH_TOKEN:
debug_log(f"无效的API key: {api_key}")
raise HTTPException(status_code=401, detail="Invalid API key")
debug_log(f"API key验证通过,AUTH_TOKEN={api_key[:8]}......")
debug_log(f"请求解析成功 - 模型: {request.model}, 流式: {request.stream}, 消息数: {len(request.messages)}")
# Generate IDs
chat_id, msg_id = generate_request_ids()
# Process messages with tools
processed_messages = process_messages_with_tools(
[m.model_dump() for m in request.messages],
request.tools,
request.tool_choice
)
# Convert back to Message objects
upstream_messages = []
for msg in processed_messages:
content = msg.get("content")
# Ensure content is not None for Message model
if content is None:
content = ""
upstream_messages.append(Message(
role=msg["role"],
content=content,
reasoning_content=msg.get("reasoning_content")
))
# Determine model features
is_thinking = request.model == ServerConfig.THINKING_MODEL
is_search = request.model == ServerConfig.SEARCH_MODEL
search_mcp = "deep-web-search" if is_search else ""
# Build upstream request
upstream_req = UpstreamRequest(
stream=True, # Always use streaming from upstream
chat_id=chat_id,
id=msg_id,
model="0727-360B-API", # Actual upstream model ID
messages=upstream_messages,
params={},
features={
"enable_thinking": is_thinking,
"web_search": is_search,
"auto_web_search": is_search,
},
background_tasks={
"title_generation": False,
"tags_generation": False,
},
mcp_servers=[search_mcp] if search_mcp else [],
model_item=ModelItem(
id="0727-360B-API",
name="GLM-4.5",
owned_by="openai"
),
tool_servers=[],
variables={
"{{USER_NAME}}": "User",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
)
# Get authentication token
auth_token = get_auth_token()
# Check if tools are enabled and present
has_tools = (ServerConfig.TOOL_SUPPORT and
request.tools and
len(request.tools) > 0 and
request.tool_choice != "none")
# Handle response based on stream flag
if request.stream:
handler = StreamResponseHandler(upstream_req, chat_id, auth_token, has_tools)
return StreamingResponse(
handler.handle(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
else:
handler = NonStreamResponseHandler(upstream_req, chat_id, auth_token, has_tools)
return handler.handle()
except HTTPException:
raise
except Exception as e:
debug_log(f"处理请求时发生错误: {str(e)}")
import traceback
debug_log(f"错误堆栈: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
# ANTHROPIC API 兼容端点
@app.post("/v1/messages")
async def handle_anthropic_message(
req: AnthropicRequest,
x_api_key: str = Header(None, alias="x-api-key"),
authorization: str = Header(None, alias="authorization")
):
"""Handle Anthropic message requests"""
debug_log("收到 Anthropic message 请求")
# 验证 API key
api_key = None
if x_api_key:
api_key = x_api_key
elif authorization and authorization.startswith("Bearer "):
api_key = authorization[7:]
if not api_key or api_key != ServerConfig.ANTHROPIC_API_KEY:
debug_log(f"无效的 API key: {api_key}")
raise HTTPException(status_code=401, detail="Invalid API key")
debug_log(f"API key 验证通过")
debug_log(f"请求解析成功 - 模型: {req.model}, 流式: {req.stream}, 消息数: {len(req.messages)}")
# 确定上游模型和功能
upstream_model = "GLM-4.5"
if req.model == ServerConfig.THINKING_MODEL:
upstream_model = "GLM-4.5-Thinking"
elif req.model == ServerConfig.SEARCH_MODEL:
upstream_model = "GLM-4.5-Search"
debug_log(f"收到请求 (模型: {req.model}) -> 代理到上游 (模型: {upstream_model})")
# 生成 ID
chat_id, msg_id = generate_request_ids()
# 转换消息格式
openai_messages = []
if req.system:
# 处理两种格式的 system 内容
if isinstance(req.system, str):
# 字符串格式
system_content = req.system
else:
# 对象数组格式
system_content = ""
for block in req.system:
if block.type == "text":
system_content += block.text
openai_messages.append({"role": "system", "content": system_content})
for msg in req.messages:
# 处理两种格式的内容
if isinstance(msg.content, str):
# 字符串格式
text_content = msg.content
else:
# 对象数组格式
text_content = ""
for block in msg.content:
if block.type == "text":
text_content += block.text
openai_messages.append({
"role": msg.role,
"content": text_content
})
# 构建上游请求
upstream_messages = []
for msg in openai_messages:
content = msg.get("content", "")
if content is None:
content = ""
upstream_messages.append(Message(
role=msg["role"],
content=content
))
upstream_req = UpstreamRequest(
stream=True, # 总是使用上游的流式
chat_id=chat_id,
id=msg_id,
model="0727-360B-API", # 实际的上游模型 ID
messages=upstream_messages,
params={},
features={"enable_thinking": True},
background_tasks={
"title_generation": False,
"tags_generation": False,
},
mcp_servers=[],
model_item=ModelItem(
id="0727-360B-API",
name="GLM-4.5",
owned_by="openai"
),
tool_servers=[],
variables={
"{{USER_NAME}}": "User",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
)
# 获取认证 token
auth_token = get_auth_token()
try:
# 调用上游 API
headers = get_browser_headers(chat_id)
headers["Authorization"] = f"Bearer {auth_token}"
response = requests.post(
ServerConfig.API_ENDPOINT,
json=upstream_req.model_dump(exclude_none=True),
headers=headers,
timeout=60.0,
stream=True
)
response.raise_for_status()
except requests.HTTPError as e:
debug_log(f"上游 API 返回错误状态: {e.response.status_code}, 响应: {e.response.text}")
raise HTTPException(status_code=502, detail="Upstream API error")
except requests.RequestException as e:
debug_log(f"请求上游 API 失败: {e}")
raise HTTPException(status_code=502, detail=f"Failed to call upstream API: {e}")
request_id = f"msg_{uuid.uuid4().hex}"
if req.stream:
# 流式响应
return StreamingResponse(
stream_anthropic_generator(response, request_id, req.model),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
else:
# 非流式响应
full_content = ""
for line in response.iter_lines():
if not line.startswith(b"data:"): continue
data_str = line[5:].strip()
if not data_str: continue
try:
data = json.loads(data_str.decode('utf-8'))
delta_content = data.get("data", {}).get("delta_content", "")
phase = data.get("data", {}).get("phase", "")
if delta_content:
out_content = transform_thinking_content(delta_content) if phase == "thinking" else delta_content
if out_content: full_content += out_content
if data.get("data", {}).get("done", False) or phase == "done":
break
except json.JSONDecodeError:
continue
return {
"id": request_id,
"type": "message",
"role": "assistant",
"model": req.model,
"content": [{"type": "text", "text": full_content}],
"stop_reason": "end_turn",
"usage": {"input_tokens": 0, "output_tokens": len(full_content) // 4}
}
# =============================================================================
# Main Entry Point
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=ServerConfig.LISTEN_PORT, reload=True) |