import os import json import uuid import tempfile from datetime import datetime from typing import List, Dict, Any, Optional from fastapi import FastAPI, HTTPException, File, UploadFile from fastapi.middleware.cors import CORSMiddleware from motor.motor_asyncio import AsyncIOMotorClient from pydantic import BaseModel import google.generativeai as genai app = FastAPI(title="Nomus AI Agent Calendar (Gemini Edition)") # Allow CORS for Frontend integration app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # MongoDB Configuration MONGO_URI = os.environ.get("MONGO_URI") client = AsyncIOMotorClient(MONGO_URI) db = client.nomus_db tasks_collection = db.tasks chat_collection = db.chat_history # Gemini Configuration GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") genai.configure(api_key=GOOGLE_API_KEY) MODEL_NAME = "gemini-flash-lite-latest" # --- TOOLS (SKILLS) --- async def list_tasks() -> List[Dict]: """Trả về danh sách tất cả công việc hiện tại trong lịch.""" tasks = [] cursor = tasks_collection.find({}, {"_id": 0}) async for doc in cursor: tasks.append(doc) return tasks async def check_conflicts(start_time: str, end_time: str) -> List[Dict]: """Kiểm tra xem có việc nào trùng trong khoảng thời gian (ISO format). Trả về danh sách xung đột.""" cursor = tasks_collection.find({ "$or": [ {"start_time": {"$lt": end_time}, "end_time": {"$gt": start_time}} ] }, {"_id": 0}) conflicts = [] async for doc in cursor: conflicts.append(doc) return conflicts async def add_task(title: str, description: str, start_time: str, end_time: str, priority: str = "medium", tags: List[str] = [], reminder: str = "") -> str: """Thêm một công việc mới vào lịch. Yêu cầu tiêu đề, mô tả, giờ bắt đầu và kết thúc (ISO).""" task_data = { "id": str(uuid.uuid4()), "title": title, "description": description, "start_time": start_time, "end_time": end_time, "priority": priority, "tags": tags, "reminder": reminder or start_time } await tasks_collection.insert_one(task_data) return f"Đã thêm thành công: {title}" # Map tool names for manual or auto dispatch TOOLS = [list_tasks, check_conflicts, add_task] # --- DB HELPERS --- async def save_chat_message(role: str, content: str): await chat_collection.insert_one({ "role": role, "content": content, "timestamp": datetime.now().isoformat(), "date": datetime.now().strftime("%Y-%m-%d") }) async def get_daily_chat() -> List[Dict]: today = datetime.now().strftime("%Y-%m-%d") chats = [] cursor = chat_collection.find({"date": today}, {"_id": 0}).sort("timestamp", 1) async for doc in cursor: chats.append(doc) return chats # --- AGENT LOGIC --- SYSTEM_PROMPT = """Bạn là Trợ lý Lịch Nomus (Nomus AI Agent). Nhiệm vụ: Giúp người dùng sắp xếp cuộc sống, kiểm tra xung đột và đề xuất lịch trình tối ưu. Ngôn ngữ: Tiếng Việt. HƯỚNG DẪN: 1. Khi người dùng muốn tạo lịch, hãy liệt kê công việc (list_tasks) và kiểm tra xung đột (check_conflicts) trước. 2. Sau khi sắp xếp xong, hãy gọi add_task để lưu vào DB. 3. Luôn phản hồi lịch sự, ngắn gọn và xác nhận các việc đã làm. 4. Trả về kết quả cuối cùng theo định dạng JSON nếu cần đồng bộ FE (mặc dù Gemini Tool Call sẽ tự làm việc này). """ class ScheduleRequest(BaseModel): text: str current_time: Optional[str] = None @app.post("/schedule") async def handle_agent_request(req: ScheduleRequest): if not GOOGLE_API_KEY: raise HTTPException(status_code=500, detail="GOOGLE_API_KEY not set.") curr_time = req.current_time or datetime.now().isoformat() await save_chat_message("user", req.text) # Initialize Gemini Model with tools model = genai.GenerativeModel( model_name=MODEL_NAME, tools=TOOLS, system_instruction=SYSTEM_PROMPT + f"\nThời gian hiện tại: {curr_time}" ) # Use Automatic Function Calling (Chat Session) chat = model.start_chat(enable_automatic_function_calling=True) try: response = chat.send_message(req.text) assistant_msg = response.text await save_chat_message("assistant", assistant_msg) # We also want to return the tasks to the FE for instant update # Since tools were called, the DB is updated, we just fetch latest tasks all_tasks = [] cursor = tasks_collection.find({}, {"_id": 0}) async for doc in cursor: all_tasks.append(doc) return { "message": assistant_msg, "tasks": all_tasks, "suggestions": [] } except Exception as e: print(f"Gemini Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/chat") async def get_chat(): return {"history": await get_daily_chat()} @app.get("/tasks") async def get_tasks(): tasks = [] cursor = tasks_collection.find({}, {"_id": 0}) async for doc in cursor: tasks.append(doc) return {"tasks": tasks} @app.get("/health") async def health_check(): return {"status": "ok", "message": f"Nomus AI (Gemini) is ready using {MODEL_NAME}"} class ManualTaskRequest(BaseModel): title: str description: str start_time: str end_time: str priority: str = "medium" tags: List[str] = [] reminder: Optional[str] = None @app.post("/tasks") async def create_manual_task(task: ManualTaskRequest): task_data = task.dict() task_data["id"] = str(uuid.uuid4()) if not task_data["reminder"]: task_data["reminder"] = task_data["start_time"] await tasks_collection.insert_one(task_data) return {"message": "Task created", "id": task_data["id"]} @app.patch("/tasks/{task_id}") async def update_task(task_id: str, update: Dict): if not update: return {"message": "No data"} result = await tasks_collection.update_one({"id": task_id}, {"$set": update}) if result.modified_count == 0: raise HTTPException(status_code=404, detail="Task not found") return {"message": "Task updated"} @app.delete("/tasks/{task_id}") async def delete_task(task_id: str): result = await tasks_collection.delete_one({"id": task_id}) return {"message": "Task deleted"} @app.post("/transcribe") async def transcribe_audio(file: UploadFile = File(...)): if not GOOGLE_API_KEY: raise HTTPException(status_code=500, detail="GOOGLE_API_KEY not set.") try: # Save upload to temp file with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as tmp: tmp.write(await file.read()) tmp_path = tmp.name # Upload to Gemini gemini_file = genai.upload_file(path=tmp_path, mime_type="audio/webm") model = genai.GenerativeModel(model_name=MODEL_NAME) response = model.generate_content([ "Chuyển đoạn âm thanh này thành văn bản tiếng Việt chính xác nhất. Chỉ trả về văn bản.", gemini_file ]) # Cleanup os.remove(tmp_path) genai.delete_file(gemini_file.name) return {"text": response.text} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)