| import json
|
| import re
|
| import asyncio
|
| import logging
|
| import os
|
| import time
|
| from typing import Any, Dict, List, Optional
|
|
|
| from bson import ObjectId
|
| from fastapi import APIRouter, File, Form, Header, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
|
|
|
| from core import TEAM_AGENT_MODEL, projects_collection, team_chat_collection, teams_collection
|
| from prompts import TEAM_AGENT_SYSTEM_PROMPT
|
| from schemas import TeamChatMessageRequest, TeamChatRequest
|
| from services import (
|
| build_document_grounded_answer,
|
| build_requirement_node_options_from_documents,
|
| compact_team_doc_qa_memory,
|
| create_issue_for_project,
|
| create_task_from_agent,
|
| get_team_doc_qa_memory,
|
| resolve_requirement_node_reference_from_documents,
|
| get_selected_team_messages,
|
| get_team_agent_context,
|
| get_team_chat_context,
|
| get_team_documents_by_ids,
|
| get_vn_now,
|
| list_team_documents,
|
| require_session_user,
|
| retrieve_document_context_with_tree,
|
| run_team_agent_with_nvidia,
|
| save_team_chat_message,
|
| save_team_document,
|
| store_uploaded_image,
|
| unique_ids,
|
| update_issue_from_agent,
|
| )
|
|
|
| router = APIRouter()
|
| team_doc_route_logger = logging.getLogger("nomus.team_documents.route")
|
| team_doc_route_logger.setLevel(
|
| getattr(logging, os.environ.get("TEAM_DOC_LOG_LEVEL", "INFO").upper(), logging.INFO)
|
| )
|
|
|
|
|
| def _json_safe(value: Any) -> Any:
|
| if isinstance(value, ObjectId):
|
| return str(value)
|
| if isinstance(value, list):
|
| return [_json_safe(item) for item in value]
|
| if isinstance(value, dict):
|
| return {key: _json_safe(item) for key, item in value.items()}
|
| return value
|
|
|
|
|
| def _team_chat_room_key(team_id: str, project_id: Optional[str]) -> str:
|
| return f"{team_id}::{project_id or 'global'}"
|
|
|
|
|
| class TeamChatRealtimeHub:
|
| def __init__(self) -> None:
|
| self._connections: Dict[str, List[WebSocket]] = {}
|
| self._lock = asyncio.Lock()
|
|
|
| async def connect(self, room_key: str, websocket: WebSocket) -> None:
|
| await websocket.accept()
|
| async with self._lock:
|
| room = self._connections.setdefault(room_key, [])
|
| room.append(websocket)
|
|
|
| async def disconnect(self, room_key: str, websocket: WebSocket) -> None:
|
| async with self._lock:
|
| room = self._connections.get(room_key, [])
|
| self._connections[room_key] = [ws for ws in room if ws is not websocket]
|
| if not self._connections[room_key]:
|
| self._connections.pop(room_key, None)
|
|
|
| async def broadcast(self, room_key: str, payload: Dict[str, Any]) -> None:
|
| async with self._lock:
|
| sockets = list(self._connections.get(room_key, []))
|
|
|
| dead_sockets: List[WebSocket] = []
|
| for ws in sockets:
|
| try:
|
| await ws.send_json(payload)
|
| except Exception:
|
| dead_sockets.append(ws)
|
|
|
| if dead_sockets:
|
| async with self._lock:
|
| current = self._connections.get(room_key, [])
|
| self._connections[room_key] = [ws for ws in current if ws not in dead_sockets]
|
| if not self._connections[room_key]:
|
| self._connections.pop(room_key, None)
|
|
|
|
|
| team_chat_realtime_hub = TeamChatRealtimeHub()
|
|
|
|
|
| async def _broadcast_team_chat_message(team_id: str, project_id: Optional[str], message: Dict[str, Any]) -> None:
|
| room_key = _team_chat_room_key(team_id, project_id)
|
| await team_chat_realtime_hub.broadcast(
|
| room_key,
|
| _json_safe({
|
| "type": "team_chat_message",
|
| "team_id": team_id,
|
| "project_id": project_id,
|
| "message": message,
|
| }),
|
| )
|
|
|
|
|
| def _looks_like_action_request(message: str) -> bool:
|
| text = (message or "").lower()
|
| action_patterns = [
|
| r"\btạo\b",
|
| r"\bcreate\b",
|
| r"\bcập nhật\b",
|
| r"\bupdate\b",
|
| r"\bassign\b",
|
| r"\bgiao\b",
|
| r"\bthực thi\b",
|
| r"\blàm luôn\b",
|
| r"\bissue\b",
|
| r"\btask\b",
|
| ]
|
| return any(re.search(pattern, text) for pattern in action_patterns)
|
|
|
|
|
| def _extract_json_payload(raw_text: str) -> Optional[Dict[str, Any]]:
|
| if not raw_text:
|
| return None
|
|
|
| candidates: List[str] = []
|
| stripped = raw_text.strip()
|
| if stripped:
|
| candidates.append(stripped)
|
|
|
| fenced_match = re.search(r"```(?:json)?\s*([\s\S]*?)```", stripped, re.IGNORECASE)
|
| if fenced_match:
|
| candidates.insert(0, fenced_match.group(1).strip())
|
|
|
| object_match = re.search(r"\{[\s\S]*\}", stripped)
|
| if object_match:
|
| candidates.insert(0, object_match.group())
|
|
|
| for candidate in candidates:
|
| try:
|
| parsed = json.loads(candidate)
|
| if isinstance(parsed, dict):
|
| return parsed
|
| except Exception:
|
| continue
|
| return None
|
|
|
|
|
| def _normalize_actions(parsed: Dict[str, Any], req: TeamChatRequest) -> List[Dict[str, Any]]:
|
| actions = parsed.get("actions")
|
| if isinstance(actions, list):
|
| normalized = [action for action in actions if isinstance(action, dict)]
|
| elif parsed.get("action"):
|
| normalized = [{"type": parsed.get("action"), "payload": parsed.get("payload") or {}}]
|
| else:
|
| normalized = []
|
|
|
| allowed_action_types = {"create_issue", "update_issue", "create_task"}
|
| filtered: List[Dict[str, Any]] = []
|
| for action in normalized:
|
| action_type = str(action.get("type") or "").strip()
|
| if action_type not in allowed_action_types:
|
| continue
|
| payload = action.get("payload") or {}
|
| if not isinstance(payload, dict):
|
| payload = {}
|
| if action_type == "update_issue":
|
| payload.setdefault("issue_id", req.issue_anchor_id)
|
| filtered.append({"type": action_type, "payload": payload})
|
| return filtered
|
|
|
|
|
| def _execute_agent_action(action: Dict[str, Any], user_id: str, req: TeamChatRequest) -> Dict[str, Any]:
|
| action_type = action.get("type")
|
| payload = action.get("payload") or {}
|
|
|
| if action_type == "create_issue":
|
| if not req.project_id:
|
| raise HTTPException(status_code=400, detail="Missing project_id for create_issue")
|
| issue = create_issue_for_project(req.project_id, user_id, payload)
|
| return {"type": action_type, "status": "ok", "issue": issue}
|
|
|
| if action_type == "update_issue":
|
| issue_id = payload.get("issue_id") or req.issue_anchor_id
|
| if not issue_id:
|
| raise HTTPException(status_code=400, detail="Missing issue_id for update_issue")
|
| issue = update_issue_from_agent(issue_id, payload)
|
| return {"type": action_type, "status": "ok", "issue": issue}
|
|
|
| if action_type == "create_task":
|
| task = create_task_from_agent(payload)
|
| return {"type": action_type, "status": "ok", "task": task}
|
|
|
| raise HTTPException(status_code=400, detail=f"Unsupported action: {action_type}")
|
|
|
|
|
| def _merge_requirement_node_reference(payload: Dict[str, Any], node_ref: Dict[str, Any]) -> Dict[str, Any]:
|
| if not isinstance(node_ref, dict) or not node_ref:
|
| return payload
|
|
|
| merged = dict(payload)
|
| merged.setdefault("requirement_node_id", node_ref.get("node_id") or node_ref.get("section_id"))
|
| merged.setdefault("requirement_node_title", node_ref.get("node_title") or node_ref.get("section_title"))
|
| merged.setdefault("requirement_node_path", node_ref.get("node_path"))
|
| merged.setdefault("requirement_node_path_titles", node_ref.get("node_path_titles", []))
|
| merged.setdefault("requirement_node_path_ids", node_ref.get("node_path_ids", []))
|
| merged.setdefault("requirement_node_depth", node_ref.get("node_depth"))
|
| merged.setdefault("requirement_document_id", node_ref.get("document_id"))
|
| merged.setdefault("requirement_document_name", node_ref.get("document_name"))
|
| return merged
|
|
|
|
|
| def _has_requirement_node_payload(payload: Dict[str, Any]) -> bool:
|
| return any(
|
| str(payload.get(field_name) or "").strip()
|
| for field_name in ("requirement_node_id", "requirement_node_title", "requirement_node_path")
|
| )
|
|
|
|
|
| def _clip(text: str, limit: int) -> str:
|
| return text if len(text) <= limit else text[:limit] + "…[truncated]"
|
|
|
|
|
| def _truncate_prompt_context(
|
| ctx: Dict[str, Any],
|
| *,
|
| max_section_content: int = 1500,
|
| max_section_summary: int = 300,
|
| max_sections: int = 8,
|
| max_messages: int = 12,
|
| max_msg_content: int = 400,
|
| max_grounded_answer: int = 2000,
|
| max_qa_memory: int = 1500,
|
| max_index_nodes: int = 25,
|
| max_index_summary: int = 200,
|
| ) -> Dict[str, Any]:
|
| """Return a size-bounded copy of prompt_context before sending to NVIDIA."""
|
| ctx = dict(ctx)
|
|
|
|
|
| sections = list(ctx.get("documents_sections") or [])[:max_sections]
|
| safe_sections = []
|
| for sec in sections:
|
| sec = dict(sec)
|
|
|
| for field in ("section_content", "content"):
|
| val = sec.get(field)
|
| if val and len(str(val)) > max_section_content:
|
| sec[field] = _clip(str(val), max_section_content)
|
| for field in ("section_summary", "summary", "section_context"):
|
| val = sec.get(field)
|
| if val and len(str(val)) > max_section_summary:
|
| sec[field] = _clip(str(val), max_section_summary)
|
| safe_sections.append(sec)
|
| ctx["documents_sections"] = safe_sections
|
|
|
|
|
| for key in ("selected_messages", "fallback_messages"):
|
| msgs = list(ctx.get(key) or [])
|
| if len(msgs) > max_messages:
|
| msgs = msgs[-max_messages:]
|
| trimmed = []
|
| for m in msgs:
|
| m = dict(m)
|
| content = str(m.get("content") or "")
|
| if len(content) > max_msg_content:
|
| m["content"] = _clip(content, max_msg_content)
|
| trimmed.append(m)
|
| ctx[key] = trimmed
|
|
|
|
|
| answer = str(ctx.get("document_grounded_answer") or "")
|
| if len(answer) > max_grounded_answer:
|
| ctx["document_grounded_answer"] = _clip(answer, max_grounded_answer)
|
|
|
|
|
| memory = str(ctx.get("doc_qa_memory") or "")
|
| if len(memory) > max_qa_memory:
|
| ctx["doc_qa_memory"] = _clip(memory, max_qa_memory)
|
|
|
|
|
| doc_indexes = list(ctx.get("documents_index") or [])
|
| for di in doc_indexes:
|
| nodes = di.get("nodes") or []
|
| if len(nodes) > max_index_nodes:
|
| di["nodes"] = nodes[:max_index_nodes]
|
| for node in di.get("nodes", []):
|
| for field in ("summary", "scope"):
|
| val = node.get(field)
|
| if val and len(str(val)) > max_index_summary:
|
| node[field] = _clip(str(val), max_index_summary)
|
| ctx["documents_index"] = doc_indexes
|
|
|
|
|
| ctx.pop("documents_retrieval_meta", None)
|
|
|
| ctx.pop("documents_citations", None)
|
| ctx.pop("document_grounded_citations", None)
|
|
|
|
|
| _MAX_PROMPT_CHARS = 180_000
|
| serialized = json.dumps(ctx, ensure_ascii=False, default=str)
|
| if len(serialized) > _MAX_PROMPT_CHARS:
|
|
|
| for drop_key in ("documents_sections", "documents_index", "agent_context", "doc_qa_memory"):
|
| if len(serialized) <= _MAX_PROMPT_CHARS:
|
| break
|
| if drop_key in ctx:
|
| ctx[drop_key] = [] if isinstance(ctx.get(drop_key), list) else ""
|
| serialized = json.dumps(ctx, ensure_ascii=False, default=str)
|
|
|
| return ctx
|
|
|
|
|
| def _assert_team_project_access(user: Dict[str, Any], team_id: str, project_id: Optional[str]) -> Optional[Dict[str, Any]]:
|
| team = teams_collection.find_one({"id": team_id}, {"_id": 0})
|
| if not team or user["id"] not in unique_ids([team.get("owner_id", "")], team.get("member_ids", [])):
|
| raise HTTPException(status_code=404, detail="Team not found")
|
|
|
| project = None
|
| if project_id:
|
| project = projects_collection.find_one({"id": project_id}, {"_id": 0})
|
| if not project or user["id"] not in unique_ids([project.get("owner_id", "")], project.get("member_ids", [])):
|
| raise HTTPException(status_code=404, detail="Project not found")
|
| return project
|
|
|
|
|
| @router.websocket("/ws/team-chat/{team_id}")
|
| async def ws_team_chat(
|
| websocket: WebSocket,
|
| team_id: str,
|
| project_id: Optional[str] = None,
|
| session_token: Optional[str] = None,
|
| ):
|
| if not session_token:
|
| await websocket.close(code=4401, reason="Missing session token")
|
| return
|
|
|
| try:
|
| user = require_session_user(session_token)
|
| _assert_team_project_access(user, team_id, project_id)
|
| except HTTPException as exc:
|
| await websocket.close(code=4403, reason=str(exc.detail))
|
| return
|
|
|
| room_key = _team_chat_room_key(team_id, project_id)
|
| await team_chat_realtime_hub.connect(room_key, websocket)
|
|
|
| try:
|
| await websocket.send_json(
|
| {
|
| "type": "connected",
|
| "team_id": team_id,
|
| "project_id": project_id,
|
| }
|
| )
|
| while True:
|
| incoming = await websocket.receive_text()
|
| if incoming.strip().lower() == "ping":
|
| await websocket.send_json({"type": "pong"})
|
| except WebSocketDisconnect:
|
| pass
|
| finally:
|
| await team_chat_realtime_hub.disconnect(room_key, websocket)
|
|
|
|
|
| @router.get("/teams/{team_id}/chat")
|
| async def get_team_chat(team_id: str, project_id: Optional[str] = None, x_session_token: Optional[str] = Header(None, alias="X-Session-Token")):
|
| user = require_session_user(x_session_token)
|
| _assert_team_project_access(user, team_id, project_id)
|
|
|
| query = {"team_id": team_id}
|
| if project_id:
|
| query["project_id"] = project_id
|
| history = list(team_chat_collection.find(query, {"_id": 0}).sort("timestamp", 1).limit(300))
|
| return {"history": history}
|
|
|
|
|
| @router.get("/teams/{team_id}/documents")
|
| async def get_team_documents(team_id: str, project_id: Optional[str] = None, x_session_token: Optional[str] = Header(None, alias="X-Session-Token")):
|
| started = time.perf_counter()
|
| user = require_session_user(x_session_token)
|
| _assert_team_project_access(user, team_id, project_id)
|
| docs = list_team_documents(team_id=team_id, project_id=project_id)
|
| elapsed = time.perf_counter() - started
|
| team_doc_route_logger.info(
|
| "[team-doc] list endpoint team_id=%s project_id=%s user_id=%s docs=%d duration=%.3fs",
|
| team_id,
|
| project_id,
|
| user.get("id"),
|
| len(docs),
|
| elapsed,
|
| )
|
| return {"documents": docs}
|
|
|
|
|
| @router.post("/teams/{team_id}/documents")
|
| async def upload_team_document(
|
| team_id: str,
|
| file: UploadFile = File(...),
|
| project_id: Optional[str] = Form(None),
|
| x_session_token: Optional[str] = Header(None, alias="X-Session-Token"),
|
| ):
|
| started = time.perf_counter()
|
| user = require_session_user(x_session_token)
|
| _assert_team_project_access(user, team_id, project_id)
|
|
|
| raw_read_started = time.perf_counter()
|
| raw_bytes = await file.read()
|
| raw_read_elapsed = time.perf_counter() - raw_read_started
|
| if not raw_bytes:
|
| raise HTTPException(status_code=400, detail="Empty file")
|
|
|
| file_name = file.filename or "document.txt"
|
| mime_type = file.content_type or "text/plain"
|
| team_doc_route_logger.info(
|
| "[team-doc] upload endpoint start team_id=%s project_id=%s user_id=%s file=%s mime=%s raw_bytes=%d read_duration=%.3fs",
|
| team_id,
|
| project_id,
|
| user.get("id"),
|
| file_name,
|
| mime_type,
|
| len(raw_bytes),
|
| raw_read_elapsed,
|
| )
|
|
|
| doc = save_team_document(
|
| team_id=team_id,
|
| project_id=project_id,
|
| uploader_id=user["id"],
|
| file_name=file_name,
|
| mime_type=mime_type,
|
| raw_bytes=raw_bytes,
|
| )
|
|
|
| elapsed = time.perf_counter() - started
|
| team_doc_route_logger.info(
|
| "[team-doc] upload endpoint done team_id=%s project_id=%s user_id=%s file=%s doc_id=%s total_nodes=%s duration=%.3fs",
|
| team_id,
|
| project_id,
|
| user.get("id"),
|
| file_name,
|
| doc.get("id"),
|
| (doc.get("tree") or {}).get("total_nodes", (doc.get("tree_meta") or {}).get("total_nodes", 0)),
|
| elapsed,
|
| )
|
|
|
| return {
|
| "document": {
|
| "id": doc["id"],
|
| "team_id": doc["team_id"],
|
| "project_id": doc.get("project_id"),
|
| "name": doc["name"],
|
| "mime_type": doc["mime_type"],
|
| "created_at": doc["created_at"],
|
| "updated_at": doc["updated_at"],
|
| "total_nodes": (doc.get("tree") or {}).get("total_nodes", (doc.get("tree_meta") or {}).get("total_nodes", 0)),
|
| }
|
| }
|
|
|
|
|
| @router.post("/teams/{team_id}/chat/images")
|
| async def upload_team_chat_images(
|
| team_id: str,
|
| files: List[UploadFile] = File(...),
|
| project_id: Optional[str] = Form(None),
|
| x_session_token: Optional[str] = Header(None, alias="X-Session-Token"),
|
| ):
|
| user = require_session_user(x_session_token)
|
| _assert_team_project_access(user, team_id, project_id)
|
|
|
| if not files:
|
| raise HTTPException(status_code=400, detail="No image files provided")
|
|
|
| scope_id = team_id if not project_id else f"{team_id}__{project_id}"
|
| assets: List[Dict[str, Any]] = []
|
| for file in files:
|
| raw_bytes = await file.read()
|
| if not raw_bytes:
|
| continue
|
| asset = store_uploaded_image(
|
| raw_bytes=raw_bytes,
|
| original_name=file.filename or "image",
|
| scope="team",
|
| scope_id=scope_id,
|
| )
|
| assets.append(asset)
|
|
|
| if not assets:
|
| raise HTTPException(status_code=400, detail="All uploaded files were empty")
|
|
|
| return {
|
| "assets": assets,
|
| "urls": [asset["url"] for asset in assets],
|
| }
|
|
|
|
|
| @router.post("/teams/{team_id}/chat/messages")
|
| async def post_team_chat_message(
|
| team_id: str,
|
| req: TeamChatMessageRequest,
|
| x_session_token: Optional[str] = Header(None, alias="X-Session-Token"),
|
| ):
|
| user = require_session_user(x_session_token)
|
| _assert_team_project_access(user, team_id, req.project_id)
|
|
|
| content = (req.message or "").strip()
|
| if not content and not (req.attachment_urls or []):
|
| raise HTTPException(status_code=400, detail="Message is empty")
|
|
|
| user_msg = save_team_chat_message(
|
| team_id,
|
| "user",
|
| content,
|
| req.project_id,
|
| attachment_urls=req.attachment_urls,
|
| )
|
| await _broadcast_team_chat_message(team_id, req.project_id, user_msg)
|
| return {"message": user_msg}
|
|
|
|
|
| @router.post("/teams/chat")
|
| async def team_chat(req: TeamChatRequest, x_session_token: Optional[str] = Header(None, alias="X-Session-Token")):
|
| user = require_session_user(x_session_token)
|
| project = _assert_team_project_access(user, req.team_id, req.project_id)
|
|
|
| user_msg = save_team_chat_message(
|
| req.team_id,
|
| "user",
|
| req.message,
|
| req.project_id,
|
| attachment_urls=req.attachment_urls,
|
| )
|
| await _broadcast_team_chat_message(req.team_id, req.project_id, user_msg)
|
|
|
| has_bot_mention = "@bot" in (req.message or "")
|
| if req.require_bot_mention and not has_bot_mention:
|
| assistant_doc = save_team_chat_message(
|
| req.team_id,
|
| "assistant",
|
| "Mình đã lưu tin nhắn team chat. Nếu muốn AI xử lý issue/task, hãy gọi bằng @bot và chọn các messages liên quan.",
|
| req.project_id,
|
| )
|
| await _broadcast_team_chat_message(req.team_id, req.project_id, assistant_doc)
|
| return {
|
| "message": assistant_doc,
|
| "tool_intent": None,
|
| "tool_intents": [],
|
| "tool_result": None,
|
| "tool_results": [],
|
| "missing_fields": [],
|
| "needs_confirmation": False,
|
| "context_window_size": 0,
|
| "selected_message_count": 0,
|
| "document_section_count": 0,
|
| "used_bot_mention": False,
|
| "agent_model": TEAM_AGENT_MODEL,
|
| "timestamp": get_vn_now().isoformat(),
|
| "saved_user_message": user_msg,
|
| }
|
|
|
| selected_messages = get_selected_team_messages(req.team_id, req.selected_message_ids, req.project_id)
|
| fallback_messages: List[Dict[str, Any]] = []
|
| if not selected_messages:
|
| fallback_messages = get_team_chat_context(req.team_id, req.issue_anchor_id, window=2)
|
| base_messages = selected_messages or fallback_messages
|
|
|
| team_docs = get_team_documents_by_ids(req.team_id, req.document_ids, req.project_id)
|
| doc_context = retrieve_document_context_with_tree(
|
| req.message,
|
| team_docs,
|
| selected_messages=base_messages,
|
| )
|
| preferred_requirement_node_reference = resolve_requirement_node_reference_from_documents(
|
| team_docs,
|
| req.preferred_requirement_node_id,
|
| )
|
| qa_memory = get_team_doc_qa_memory(req.team_id, req.project_id)
|
|
|
|
|
|
|
| _MAX_INDEX_NODES_PER_DOC = 25
|
| doc_indexes = []
|
| for doc in team_docs:
|
| tree = doc.get("tree") or {}
|
| nodes = tree.get("nodes") or []
|
| top_nodes = [n for n in nodes if (n.get("level") or 0) <= 2][:_MAX_INDEX_NODES_PER_DOC]
|
| if not top_nodes:
|
| top_nodes = nodes[:_MAX_INDEX_NODES_PER_DOC]
|
| doc_indexes.append(
|
| {
|
| "document_id": doc.get("id"),
|
| "document_name": doc.get("name"),
|
| "total_nodes": tree.get("total_nodes", len(nodes)),
|
| "nodes": [
|
| {
|
| "id": node.get("id"),
|
| "parent_id": node.get("parent_id"),
|
| "title": _clip(str(node.get("title") or ""), 120),
|
| "summary": _clip(str(node.get("summary") or ""), 200),
|
| "scope": _clip(str(node.get("scope") or ""), 100),
|
| "level": node.get("level"),
|
| }
|
| for node in top_nodes
|
| ],
|
| }
|
| )
|
|
|
| tools_description = """
|
| OUTPUT REQUIREMENTS:
|
| - Trả về JSON thuần túy.
|
| - Trường reply là câu trả lời ngắn gọn cho người dùng.
|
| - Trường actions là danh sách action có thể thực thi.
|
| - Nếu thiếu thông tin, đặt actions = [] và missing_fields ghi rõ thiếu gì.
|
| - Nếu user chưa cho phép tự động hóa, needs_confirmation = true khi có action cần làm.
|
| - Nếu câu hỏi thiên về tra cứu tài liệu, ưu tiên trả lời dựa trên documents_sections/document_grounded_answer và có thể để actions = [].
|
| - Nếu doc_qa_only=true thì bắt buộc actions = [] và chỉ tập trung trả lời theo tài liệu.
|
| - Nếu thiếu requirement node hợp lệ cho create_issue hoặc create_task, hãy để missing_fields có requirement_node_id thay vì tự đoán.
|
|
|
| ACTION PAYLOAD GỢI Ý:
|
| - create_issue: title, description, severity, status, assignee_id|assignee_email|assignee_name, tags, requirement_text, attachment_urls, requirement_node_id, requirement_node_title, requirement_node_path, requirement_node_path_titles, requirement_node_path_ids, requirement_node_depth, requirement_document_id, requirement_document_name
|
| - update_issue: issue_id, title, description, severity, status, assignee_id|assignee_email|assignee_name, tags, attachment_urls, requirement_text, requirement_node_id, requirement_node_title, requirement_node_path, requirement_node_path_titles, requirement_node_path_ids, requirement_node_depth, requirement_document_id, requirement_document_name
|
| - create_task: title, description, start_time, end_time, priority, tags, reminder, requirement_node_id, requirement_node_title, requirement_node_path, requirement_node_path_titles, requirement_node_path_ids, requirement_node_depth, requirement_document_id, requirement_document_name
|
|
|
| LƯU Ý CONTEXT:
|
| - selected_messages là nguồn hội thoại chính, ưu tiên tuyệt đối.
|
| - Nếu selected_messages rỗng thì mới dùng fallback_messages gần nhất.
|
| - documents_index là cây tài liệu; documents_sections là các section đã drill-down và lấy nguyên văn.
|
| - document_grounded_answer là bản nháp trả lời đã bám evidence; có thể tái sử dụng và tinh chỉnh.
|
|
|
| OUTPUT SHAPE:
|
| {
|
| "reply": "...",
|
| "needs_confirmation": false,
|
| "missing_fields": [],
|
| "actions": [{"type": "create_issue|update_issue|create_task", "payload": {...}}],
|
| "summary": "..."
|
| }
|
| """
|
|
|
| doc_context_for_answer = dict(doc_context)
|
| doc_sections = list(doc_context.get("sections", []))
|
| if preferred_requirement_node_reference:
|
| preferred_node_id = str(preferred_requirement_node_reference.get("node_id") or "").strip()
|
| preferred_section_id = str(preferred_requirement_node_reference.get("node_id") or "").strip()
|
| preferred_match = None
|
| for section in doc_sections:
|
| section_id = str(section.get("section_id") or "").strip()
|
| if section_id == preferred_section_id or section_id == preferred_node_id:
|
| preferred_match = section
|
| break
|
| if preferred_match:
|
| doc_sections = [preferred_match] + [section for section in doc_sections if section is not preferred_match]
|
| doc_context_for_answer["sections"] = doc_sections
|
| doc_context_for_answer["citations"] = [
|
| citation for citation in doc_context.get("citations", []) if str(citation.get("section_id") or "").strip() != preferred_section_id
|
| ]
|
| doc_context_for_answer["citations"].insert(0, {
|
| "document_id": preferred_match.get("document_id"),
|
| "document_name": preferred_match.get("document_name"),
|
| "section_id": preferred_match.get("section_id"),
|
| "section_title": preferred_match.get("section_title"),
|
| "section_path": preferred_match.get("section_path"),
|
| "section_path_titles": preferred_match.get("section_path_titles", []),
|
| "section_path_ids": preferred_match.get("section_path_ids", []),
|
| "source": "preferred_node",
|
| })
|
|
|
| document_grounded = build_document_grounded_answer(
|
| query=req.message,
|
| selected_messages=base_messages,
|
| doc_context=doc_context_for_answer,
|
| qa_memory=qa_memory,
|
| )
|
| requirement_node_reference = preferred_requirement_node_reference or document_grounded.get("requirement_node_reference") or doc_context.get("requirement_node_reference") or {}
|
|
|
| prompt_context = {
|
| "current_user": {"id": user["id"], "name": user.get("name"), "email": user.get("email")},
|
| "team_id": req.team_id,
|
| "project": project,
|
| "issue_anchor_id": req.issue_anchor_id,
|
| "selected_messages": base_messages,
|
| "selected_message_ids": req.selected_message_ids,
|
| "fallback_messages": fallback_messages,
|
| "documents_index": doc_indexes,
|
| "documents_sections": doc_context.get("sections", []),
|
| "documents_citations": doc_context.get("citations", []),
|
| "documents_retrieval_meta": doc_context.get("retrieval_meta", {}),
|
| "document_grounded_answer": document_grounded.get("answer", ""),
|
| "document_grounded_citations": document_grounded.get("citations", []),
|
| "document_grounded_confidence": document_grounded.get("confidence", "medium"),
|
| "doc_qa_memory": qa_memory,
|
| "requirement_node_reference": requirement_node_reference,
|
| "preferred_requirement_node_reference": preferred_requirement_node_reference,
|
| "new_message": req.message,
|
| "new_message_attachments": req.attachment_urls,
|
| "allow_auto_tool_call": req.allow_auto_tool_call,
|
| "doc_qa_only": req.doc_qa_only,
|
| "current_time": get_vn_now().isoformat(),
|
| "agent_context": get_team_agent_context(req.team_id, req.project_id, req.issue_anchor_id, window=2),
|
| }
|
|
|
| raw_text = run_team_agent_with_nvidia(
|
| system_prompt=TEAM_AGENT_SYSTEM_PROMPT + "\n" + tools_description,
|
| payload=_truncate_prompt_context(prompt_context),
|
| ).strip()
|
|
|
| parsed = _extract_json_payload(raw_text) or {}
|
| actions = _normalize_actions(parsed, req) if parsed else []
|
| reply_text = str(parsed.get("reply") or parsed.get("summary") or "").strip()
|
| if not reply_text:
|
| reply_text = "Mình đã đọc ngữ cảnh chọn lọc và tài liệu liên quan."
|
| missing_fields = parsed.get("missing_fields") if isinstance(parsed.get("missing_fields"), list) else []
|
| needs_confirmation = bool(parsed.get("needs_confirmation"))
|
|
|
|
|
| should_force_doc_qa = req.doc_qa_only or not _looks_like_action_request(req.message)
|
| if document_grounded.get("answer") and should_force_doc_qa:
|
| reply_text = str(document_grounded.get("answer") or reply_text).strip()
|
| actions = []
|
| missing_fields = []
|
| needs_confirmation = False
|
|
|
| if should_force_doc_qa:
|
| try:
|
| qa_memory_result = await compact_team_doc_qa_memory(
|
| req.team_id,
|
| req.project_id,
|
| req.message,
|
| str(document_grounded.get("answer") or ""),
|
| doc_context_for_answer,
|
| base_messages,
|
| citations=document_grounded.get("citations", []),
|
| )
|
| qa_memory = str(qa_memory_result.get("memory_summary") or qa_memory or "").strip()
|
| except Exception:
|
| pass
|
|
|
| grounded_confidence = str(document_grounded.get("confidence") or "medium").strip().lower()
|
| grounded_needs_clarification = bool(document_grounded.get("needs_clarification"))
|
| if should_force_doc_qa and (grounded_confidence == "low" or grounded_needs_clarification):
|
| followup = str(document_grounded.get("clarifying_question") or "").strip()
|
| if followup:
|
| reply_text = f"{reply_text}\n\n{followup}".strip()
|
| actions = []
|
| missing_fields = []
|
| needs_confirmation = False
|
|
|
| if req.doc_qa_only:
|
| actions = []
|
| missing_fields = []
|
| needs_confirmation = False
|
|
|
| tool_results: List[Dict[str, Any]] = []
|
| execution_errors: List[str] = []
|
| node_selection_options = build_requirement_node_options_from_documents(team_docs, limit=8)
|
| node_confirmation_required = False
|
| if actions:
|
| for action in actions:
|
| if action.get("type") not in {"create_issue", "create_task"}:
|
| continue
|
| merged_preview = _merge_requirement_node_reference(
|
| dict(action.get("payload") or {}),
|
| requirement_node_reference if isinstance(requirement_node_reference, dict) else {},
|
| )
|
| if not _has_requirement_node_payload(merged_preview):
|
| node_confirmation_required = True
|
| needs_confirmation = True
|
| missing_fields = list({*missing_fields, "requirement_node_id"})
|
| break
|
| if req.allow_auto_tool_call and actions:
|
| for action in actions:
|
| try:
|
| enriched_action = dict(action)
|
| enriched_action["payload"] = _merge_requirement_node_reference(
|
| dict(action.get("payload") or {}),
|
| requirement_node_reference if isinstance(requirement_node_reference, dict) else {},
|
| )
|
| if action.get("type") in {"create_issue", "create_task"} and node_confirmation_required and not _has_requirement_node_payload(enriched_action["payload"]):
|
| needs_confirmation = True
|
| tool_results.append(
|
| {
|
| "type": action.get("type"),
|
| "status": "needs_confirmation",
|
| "error": "missing_requirement_node",
|
| "node_selection_options": node_selection_options,
|
| }
|
| )
|
| continue
|
| tool_results.append(_execute_agent_action(enriched_action, user["id"], req))
|
| except HTTPException as exc:
|
| action_type = str(action.get("type") or "unknown")
|
| execution_errors.append(f"{action_type}: {exc.detail}")
|
| tool_results.append({"type": action_type, "status": "error", "error": exc.detail})
|
|
|
| assistant_text = reply_text
|
| if actions and not req.allow_auto_tool_call:
|
| confirmation_suffix = "Bạn có muốn mình tự xử lý luôn không?"
|
| if confirmation_suffix not in assistant_text:
|
| assistant_text = f"{assistant_text} {confirmation_suffix}".strip()
|
| if needs_confirmation and not assistant_text.endswith("?"):
|
| assistant_text = f"{assistant_text} Bạn có muốn mình tự xử lý luôn không?".strip()
|
|
|
| if not actions and isinstance(document_grounded.get("citations"), list) and document_grounded.get("citations"):
|
| section_lookup: Dict[str, Dict[str, str]] = {}
|
| for sec in doc_context.get("sections", []):
|
| sec_id = str(sec.get("section_id") or "").strip()
|
| if not sec_id:
|
| continue
|
| section_lookup[sec_id] = {
|
| "document_name": str(sec.get("document_name") or "Tài liệu"),
|
| "section_title": str(sec.get("section_title") or sec_id),
|
| }
|
|
|
| source_refs: List[str] = []
|
| for item in document_grounded.get("citations", [])[:3]:
|
| section_id = str(item.get("section_id") or "").strip()
|
| if not section_id:
|
| continue
|
| lookup = section_lookup.get(section_id)
|
| if lookup:
|
| source_refs.append(f"{lookup['document_name']} > {lookup['section_title']}")
|
| else:
|
| source_refs.append(f"Section {section_id}")
|
| if source_refs:
|
| assistant_text = f"{assistant_text}\n\nNguồn tham chiếu: {', '.join(source_refs)}"
|
|
|
| if requirement_node_reference and not actions:
|
| node_display = str(requirement_node_reference.get("node_display") or "").strip()
|
| if node_display:
|
| assistant_text = f"{assistant_text}\n\nNode đề xuất: {node_display}".strip()
|
|
|
| if tool_results:
|
| summary_bits = []
|
| for item in tool_results:
|
| if item.get("status") == "error":
|
| continue
|
| if item.get("type") == "create_issue":
|
| summary_bits.append(f"đã tạo issue {item['issue'].get('title', '')}".strip())
|
| elif item.get("type") == "update_issue":
|
| summary_bits.append(f"đã cập nhật issue {item['issue'].get('title', '')}".strip())
|
| elif item.get("type") == "create_task":
|
| summary_bits.append(f"đã tạo task {item['task'].get('title', '')}".strip())
|
| if summary_bits:
|
| assistant_text = f"{assistant_text}\n\nKết quả: {', '.join(summary_bits)}."
|
| if requirement_node_reference:
|
| node_display = str(requirement_node_reference.get("node_display") or "").strip()
|
| if node_display:
|
| assistant_text = f"{assistant_text}\nNode: {node_display}".strip()
|
| if execution_errors:
|
| assistant_text = f"{assistant_text}\n\nMột số action chưa xử lý được: {', '.join(execution_errors)}."
|
|
|
| if needs_confirmation and node_selection_options:
|
| assistant_text = f"{assistant_text}\n\nMình cần bạn chọn requirement node trước khi tạo issue/task.".strip()
|
|
|
| assistant_doc = save_team_chat_message(req.team_id, "assistant", assistant_text, req.project_id)
|
| await _broadcast_team_chat_message(req.team_id, req.project_id, assistant_doc)
|
|
|
| return {
|
| "message": assistant_doc,
|
| "tool_intent": actions[0] if len(actions) == 1 else actions,
|
| "tool_intents": actions,
|
| "tool_result": tool_results[0] if len(tool_results) == 1 else tool_results,
|
| "tool_results": tool_results,
|
| "missing_fields": missing_fields,
|
| "needs_confirmation": needs_confirmation,
|
| "context_window_size": len(base_messages),
|
| "selected_message_count": len(selected_messages),
|
| "document_section_count": len(doc_context.get("sections", [])),
|
| "document_citations": doc_context.get("citations", []),
|
| "document_retrieval_meta": doc_context.get("retrieval_meta", {}),
|
| "document_grounded": document_grounded,
|
| "document_grounded_confidence": document_grounded.get("confidence", "medium"),
|
| "document_grounded_confidence_score": document_grounded.get("confidence_score", 0.0),
|
| "requirement_node_reference": requirement_node_reference,
|
| "node_selection_required": bool(node_confirmation_required),
|
| "node_selection_options": node_selection_options if node_confirmation_required else [],
|
| "node_selection_reason": "missing_requirement_node" if node_confirmation_required else None,
|
| "doc_qa_only": req.doc_qa_only,
|
| "used_bot_mention": has_bot_mention,
|
| "agent_model": TEAM_AGENT_MODEL,
|
| "timestamp": get_vn_now().isoformat(),
|
| }
|
|
|