import os import json import uuid import tempfile from datetime import datetime from typing import List, Dict, Any, Optional from fastapi import FastAPI, HTTPException, File, UploadFile from fastapi.middleware.cors import CORSMiddleware from pymongo import MongoClient from pydantic import BaseModel from google import genai from google.genai import types app = FastAPI(title="Nomus AI Agent Calendar (V2 - Sync Tools)") # Allow CORS for Frontend integration app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # MongoDB Configuration (Sync - Using pymongo for stable AI tools) MONGO_URI = os.environ.get("MONGO_URI") client = MongoClient(MONGO_URI) db = client.nomus_db tasks_collection = db.tasks chat_collection = db.chat_history # Gemini Configuration (New SDK: google-genai) GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") model_client = genai.Client(api_key=GOOGLE_API_KEY) MODEL_NAME = "gemini-1.5-flash-lite-latest" # --- TOOLS (SKILLS - SYNC) --- def list_tasks() -> List[Dict]: """Trả về danh sách tất cả công việc hiện tại trong lịch.""" print("Executing tool: list_tasks") tasks = list(tasks_collection.find({}, {"_id": 0})) return tasks def check_conflicts(start_time: str, end_time: str) -> List[Dict]: """Kiểm tra xem có việc nào trùng trong khoảng thời gian (ISO format). Trả về danh sách xung đột.""" print(f"Executing tool: check_conflicts ({start_time} to {end_time})") conflicts = list(tasks_collection.find({ "$or": [ {"start_time": {"$lt": end_time}, "end_time": {"$gt": start_time}} ] }, {"_id": 0})) return conflicts def add_task(title: str, description: str, start_time: str, end_time: str, priority: str = "medium", tags: List[str] = [], reminder: str = "") -> str: """Thêm một công việc mới vào lịch. Yêu cầu tiêu đề, mô tả, giờ bắt đầu và kết thúc (ISO).""" print(f"Executing tool: add_task ({title})") task_data = { "id": str(uuid.uuid4()), "title": title, "description": description, "start_time": start_time, "end_time": end_time, "priority": priority, "tags": tags, "reminder": reminder or start_time } tasks_collection.insert_one(task_data) return f"Đã thêm thành công: {title}" # --- DB HELPERS --- def save_chat_message(role: str, content: str): chat_collection.insert_one({ "role": role, "content": content, "timestamp": datetime.now().isoformat(), "date": datetime.now().strftime("%Y-%m-%d") }) def get_daily_chat() -> List[Dict]: today = datetime.now().strftime("%Y-%m-%d") chats = list(chat_collection.find({"date": today}, {"_id": 0}).sort("timestamp", 1)) return chats # --- AGENT LOGIC --- SYSTEM_PROMPT = """Bạn là Trợ lý Lịch Nomus (Nomus AI Agent). Nhiệm vụ: Giúp người dùng sắp xếp cuộc sống, kiểm tra xung đột và đề xuất lịch trình tối ưu. Ngôn ngữ: Tiếng Việt. HƯỚNG DẪN: 1. Khi người dùng muốn tạo lịch, hãy liệt kê công việc (list_tasks) và kiểm tra xung đột (check_conflicts) trước. 2. Sau khi sắp xếp xong, hãy gọi add_task để lưu vào DB. 3. Luôn phản hồi lịch sự, ngắn gọn và xác nhận các việc đã làm. 4. Trả về kết quả cuối cùng theo định dạng hội thoại. """ class ScheduleRequest(BaseModel): text: str current_time: Optional[str] = None @app.post("/schedule") async def handle_agent_request(req: ScheduleRequest): if not GOOGLE_API_KEY: raise HTTPException(status_code=500, detail="GOOGLE_API_KEY not set.") curr_time = req.current_time or datetime.now().isoformat() save_chat_message("user", req.text) try: # Using NEW google-genai SDK response = model_client.models.generate_content( model=MODEL_NAME, contents=req.text, config=types.GenerateContentConfig( system_instruction=SYSTEM_PROMPT + f"\nThời gian hiện tại: {curr_time}", tools=[list_tasks, check_conflicts, add_task], automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=False) ) ) assistant_msg = response.text save_chat_message("assistant", assistant_msg) # Fetch current tasks for FE sync all_tasks = list(tasks_collection.find({}, {"_id": 0})) return { "message": assistant_msg, "tasks": all_tasks, "suggestions": [] } except Exception as e: print(f"Gemini Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/chat") async def get_chat(): return {"history": get_daily_chat()} @app.get("/tasks") async def get_tasks(): tasks = list(tasks_collection.find({}, {"_id": 0})) return {"tasks": tasks} @app.get("/health") async def health_check(): return {"status": "ok", "message": f"Nomus AI (GenAI SDK) is ready using {MODEL_NAME}"} class ManualTaskRequest(BaseModel): title: str description: str start_time: str end_time: str priority: str = "medium" tags: List[str] = [] reminder: Optional[str] = None @app.post("/tasks") async def create_manual_task(task: ManualTaskRequest): task_data = task.dict() task_data["id"] = str(uuid.uuid4()) if not task_data["reminder"]: task_data["reminder"] = task_data["start_time"] tasks_collection.insert_one(task_data) return {"message": "Task created", "id": task_data["id"]} @app.patch("/tasks/{task_id}") async def update_task(task_id: str, update: Dict): if not update: return {"message": "No data"} result = tasks_collection.update_one({"id": task_id}, {"$set": update}) if result.modified_count == 0: raise HTTPException(status_code=404, detail="Task not found") return {"message": "Task updated"} @app.delete("/tasks/{task_id}") async def delete_task(task_id: str): result = tasks_collection.delete_one({"id": task_id}) return {"message": "Task deleted"} @app.post("/transcribe") async def transcribe_audio(file: UploadFile = File(...)): if not GOOGLE_API_KEY: raise HTTPException(status_code=500, detail="GOOGLE_API_KEY not set.") try: with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as tmp: tmp.write(await file.read()) tmp_path = tmp.name # New SDK upload and generate content with open(tmp_path, "rb") as audio_file: response = model_client.models.generate_content( model=MODEL_NAME, contents=[ "Chuyển đoạn âm thanh này thành văn bản tiếng Việt chính xác nhất. Chỉ trả về văn bản.", types.Part.from_bytes(data=audio_file.read(), mime_type="audio/webm") ] ) os.remove(tmp_path) return {"text": response.text} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)