| |
| |
| |
|
|
| import os |
| import uuid |
| from fastapi import APIRouter, Depends, HTTPException, Request, status, UploadFile, File , Body |
| from fastapi.responses import StreamingResponse |
| from api.database import User, Conversation, Message |
| from api.models import QueryRequest, ConversationOut, ConversationCreate, UserUpdate |
| from api.auth import current_active_user |
| from api.database import get_db |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy import select, delete |
| from utils.generation import request_generation, select_model, check_model_availability |
| from utils.web_search import web_search |
| import io |
| import asyncio |
| import json |
| from openai import OpenAI |
| from motor.motor_asyncio import AsyncIOMotorClient |
| from datetime import datetime |
| import logging |
| from typing import List, Optional |
| |
| from utils.constants import MODEL_ALIASES, MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME, CLIP_BASE_MODEL, CLIP_LARGE_MODEL, ASR_MODEL, TTS_MODEL, IMAGE_GEN_MODEL, SECONDARY_IMAGE_GEN_MODEL, IMAGE_INFERENCE_API |
| import psutil |
| import time |
| router = APIRouter() |
| from pydantic import BaseModel |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| if not HF_TOKEN: |
| logger.error("HF_TOKEN is not set in environment variables.") |
| raise ValueError("HF_TOKEN is required for Inference API.") |
|
|
| BACKUP_HF_TOKEN = os.getenv("BACKUP_HF_TOKEN") |
| if not BACKUP_HF_TOKEN: |
| logger.warning("BACKUP_HF_TOKEN is not set. Fallback to secondary model will not work if primary token fails.") |
|
|
| ROUTER_API_URL = os.getenv("ROUTER_API_URL", "https://router.huggingface.co") |
| API_ENDPOINT = os.getenv("API_ENDPOINT", "https://router.huggingface.co/v1") |
| FALLBACK_API_ENDPOINT = os.getenv("FALLBACK_API_ENDPOINT", "https://api-inference.huggingface.co/v1") |
|
|
|
|
| print("=" * 100) |
| print("MODEL_NAME =", os.getenv("MODEL_NAME")) |
| print("SECONDARY_MODEL_NAME =", os.getenv("SECONDARY_MODEL_NAME")) |
| print("=" * 100) |
|
|
|
|
| print(f"🔑 FULL HF_TOKEN: {HF_TOKEN}") |
|
|
|
|
| |
| MONGO_URI = os.getenv("MONGODB_URI") |
| client = AsyncIOMotorClient(MONGO_URI) |
| db = client["hager"] |
| session_message_counts = db["session_message_counts"] |
|
|
| class ImageGenRequest(BaseModel): |
| prompt: str |
| output_format: str = "image" |
|
|
| |
| async def handle_session(request: Request): |
| if not hasattr(request, "session"): |
| raise HTTPException(status_code=500, detail="Session middleware not configured") |
| session_id = request.session.get("session_id") |
| if not session_id: |
| session_id = str(uuid.uuid4()) |
| request.session["session_id"] = session_id |
| await session_message_counts.insert_one({"session_id": session_id, "message_count": 0}) |
| |
| session_doc = await session_message_counts.find_one({"session_id": session_id}) |
| if not session_doc: |
| session_doc = {"session_id": session_id, "message_count": 0} |
| await session_message_counts.insert_one(session_doc) |
| |
| message_count = session_doc["message_count"] + 1 |
| await session_message_counts.update_one( |
| {"session_id": session_id}, |
| {"$set": {"message_count": message_count}} |
| ) |
| if message_count > 4: |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Message limit reached. Please log in to continue." |
| ) |
| return session_id |
|
|
| |
| def enhance_system_prompt(system_prompt: str, message: str, user: Optional[User] = None) -> str: |
| base_system = ( |
| "You are a distinguished senior backend engineer and systems architect with over a decade of experience " |
| "designing, deploying, and maintaining production systems that serve millions of users. " |
| "Your judgment is trusted because you consistently choose the right tradeoffs between simplicity, " |
| "scalability, cost, and operational reality.\n\n" |
|
|
| "====================\n" |
| "YOUR THINKING FRAMEWORK\n" |
| "====================\n" |
| "1. **Understand first, then answer** – grasp the intent, constraints, and what a good solution looks like.\n" |
| "2. **State your assumptions clearly before answering** – list any missing requirements, scale assumptions, or constraints you're assuming.\n" |
| "3. **Adapt to complexity** – simple questions get crisp answers; complex questions get structured reasoning.\n" |
| "4. **Always explain WHY before HOW** – reasoning first, then implementation.\n" |
| "5. **Be concrete** – provide commands, TTL values, realistic numbers, and examples.\n" |
| "6. **For system design or architecture questions, naturally cover:**\n" |
| " - Minimal Viable Architecture (what actually works)\n" |
| " - Step‑by‑step data flow\n" |
| " - State management (sessions, presence, caching – include concrete implementation details when relevant)\n" |
| " - Failure scenarios & recovery\n" |
| " - Edge cases (streaming, concurrency, backpressure, retries, idempotency)\n" |
| " - Scaling path (only if needed, clearly labeled as optional)\n" |
| "7. **End with a concise summary** that clearly states key decisions, tradeoffs made, and recommended next steps.\n\n" |
|
|
| "====================\n" |
| "CORE ENGINEERING PRINCIPLES\n" |
| "====================\n" |
| "- **KISS and YAGNI** – start simple. Add complexity only when a real bottleneck demands it.\n" |
| "- **Tradeoffs > absolutes** – discuss latency, consistency, operational burden, and cost.\n" |
| "- **Redis is your go-to tool** for caching, presence, sessions, and lightweight message storage.\n" |
| "- **Presence tracking** – always use TTL (e.g., `EXPIRE presence:user:123 30`) and client heartbeat (ping/pong every 20–30s).\n" |
| "- **Offline messages** – Redis Streams (`XADD`, `XREAD`) or Lists (`LPUSH`, `RPOP`) for moderate scale.\n" |
| "- **Kafka, Kubernetes, Prometheus, ELK** – use only when clearly justified; they are NOT defaults.\n" |
| "- **Never use per-user Kafka consumers** – this creates thousands of consumers and rebalancing chaos.\n\n" |
|
|
| "====================\n" |
| "WHAT DISTINGUISHES YOUR ANSWERS\n" |
| "====================\n" |
| "- **Concrete, not abstract** – provide real commands (HSET, EXPIRE, XADD, XREAD), TTL values, and API examples.\n" |
| "- **Operational mindset** – consider debugging, monitoring, maintenance, and real-world traffic patterns.\n" |
| "- **No fluff** – avoid vague phrases like 'use a scalable system' without explanation.\n" |
| "- **Non‑repetitive** – never repeat the same sentence or idea. If you need to emphasize, rephrase or use bullet points.\n" |
| "- **Honest about assumptions** – state assumptions clearly when uncertain, and prefer simplicity.\n" |
| "- **Step‑by-step clarity** – break complex flows into numbered steps or bullet points.\n" |
| "- **Tradeoff transparency** – compare multiple solutions honestly.\n" |
| "- **Correct formatting** – use Markdown tables, lists, and code blocks where appropriate.\n" |
| "- **Edge case awareness** – always consider streaming, concurrency, backpressure, retries, and idempotency where relevant.\n\n" |
|
|
| "====================\n" |
| "LANGUAGE & STYLE\n" |
| "====================\n" |
| "- Respond in the same language the user used (Arabic → Arabic, English → English).\n" |
| "- Keep all technology names in English (Redis, WebSocket, Kafka, Kubernetes).\n" |
| "- Never start by repeating the user's question – dive directly into the answer.\n" |
| "- For Arabic responses, use precise technical vocabulary but keep tech names in English.\n" |
| ) |
|
|
| |
| if any(0x0600 <= ord(char) <= 0x06FF for char in message): |
| base_system += "\n- Use precise Arabic technical terms; keep technology names in English." |
|
|
| |
| if user and user.additional_info: |
| base_system += f"\n\n- User context: {user.additional_info}. Adjust depth, examples, and complexity accordingly." |
|
|
| |
| if system_prompt and system_prompt.strip(): |
| base_system += f"\n\nAdditional user instructions from settings:\n{system_prompt}" |
|
|
| return base_system |
|
|
| @router.get("/api/settings") |
| async def get_settings(user: User = Depends(current_active_user)): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| return { |
| "available_models": [ |
| {"alias": "advanced", "description": "High-performance model for complex queries"}, |
| {"alias": "standard", "description": "Balanced model for general use"}, |
| {"alias": "light", "description": "Lightweight model for quick responses"} |
| ], |
| "conversation_styles": ["default", "concise", "analytical", "creative"], |
| "user_settings": { |
| "display_name": user.display_name, |
| "preferred_model": user.preferred_model, |
| "job_title": user.job_title, |
| "education": user.education, |
| "interests": user.interests, |
| "additional_info": user.additional_info, |
| "conversation_style": user.conversation_style |
| } |
| } |
|
|
| @router.get("/api/model-info") |
| async def model_info(): |
| return { |
| "available_models": [ |
| {"alias": "advanced", "description": "High-performance model for complex queries"}, |
| {"alias": "standard", "description": "Balanced model for general use"}, |
| {"alias": "light", "description": "Lightweight model for quick responses"}, |
| {"alias": "image_base", "description": "Basic image analysis model"}, |
| {"alias": "image_advanced", "description": "Advanced image analysis model"}, |
| {"alias": "audio", "description": "Audio transcription model (default)"}, |
| {"alias": "tts", "description": "Text-to-speech model (default)"} |
| ], |
| "api_base": API_ENDPOINT, |
| "fallback_api_base": FALLBACK_API_ENDPOINT, |
| "status": "online" |
| } |
|
|
| @router.get("/api/performance") |
| async def performance_stats(): |
| return { |
| "queue_size": int(os.getenv("QUEUE_SIZE", 80)), |
| "concurrency_limit": int(os.getenv("CONCURRENCY_LIMIT", 20)), |
| "uptime": time.time() - psutil.boot_time() |
| } |
|
|
|
|
| @router.post("/api/chat") |
| async def chat_endpoint( |
| request: Request, |
| req: QueryRequest, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| logger.info(f"Received chat request: {req}") |
| |
| if not user: |
| await handle_session(request) |
| |
| conversation = None |
| if user: |
| title = req.title or (req.message[:50] + "..." if len(req.message) > 50 else req.message or "Untitled Conversation") |
| result = await db.execute( |
| select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.updated_at.desc()) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| conversation_id = str(uuid.uuid4()) |
| conversation = Conversation( |
| conversation_id=conversation_id, |
| user_id=user.id, |
| title=title |
| ) |
| db.add(conversation) |
| await db.commit() |
| await db.refresh(conversation) |
| |
| user_msg = Message(role="user", content=req.message, conversation_id=conversation.id) |
| db.add(user_msg) |
| await db.commit() |
| |
| preferred_model = user.preferred_model if user else None |
| model_name, api_endpoint = select_model(req.message, input_type="text", preferred_model=preferred_model) |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.warning(f"Model {model_name} is not available at {api_endpoint}, trying fallback model.") |
| model_name = SECONDARY_MODEL_NAME |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Fallback model {model_name} is not available at {selected_endpoint}") |
| raise HTTPException(status_code=503, detail=f"No available models. Tried {MODEL_NAME} and {SECONDARY_MODEL_NAME}.") |
| |
| system_prompt = enhance_system_prompt(req.system_prompt, req.message, user) |
| |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message=req.message, |
| system_prompt=system_prompt, |
| model_name=model_name, |
| chat_history=req.history, |
| temperature=req.temperature, |
| max_new_tokens=req.max_new_tokens or 2048, |
| deep_search=req.enable_browsing, |
| input_type="text", |
| output_format=req.output_format |
| ) |
| |
| if req.output_format == "audio": |
| audio_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") |
| if isinstance(chunk, bytes): |
| audio_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-bytes chunk in audio stream: {chunk}") |
| if not audio_chunks: |
| logger.error("No audio data generated.") |
| raise HTTPException(status_code=502, detail="No audio data generated. Model may be unavailable.") |
| audio_data = b"".join(audio_chunks) |
| return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") |
| except Exception as e: |
| logger.error(f"Audio generation failed: {e}") |
| raise HTTPException(status_code=502, detail=f"Audio generation failed: {str(e)}") |
| |
| async def stream_response(): |
| nonlocal_stream = stream |
| response_chunks = [] |
| try: |
| for chunk in nonlocal_stream: |
| if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| yield chunk.encode('utf-8') |
| await asyncio.sleep(0.05) |
| else: |
| logger.warning(f"Skipping chunk: {chunk}") |
| |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.warning(f"Empty response from {model_name}. Trying fallback model {SECONDARY_MODEL_NAME}.") |
| fallback_model_name = SECONDARY_MODEL_NAME |
| is_available_fb, api_key_fb, selected_endpoint_fb = check_model_availability(fallback_model_name, HF_TOKEN) |
| if not is_available_fb: |
| logger.error(f"Fallback model {fallback_model_name} is not available at {selected_endpoint_fb}") |
| yield f"Error: No available models. Tried {MODEL_NAME} and {SECONDARY_MODEL_NAME}.".encode('utf-8') |
| return |
| |
| fallback_stream = request_generation( |
| api_key=api_key_fb, |
| api_base=selected_endpoint_fb, |
| message=req.message, |
| system_prompt=system_prompt, |
| model_name=fallback_model_name, |
| chat_history=req.history, |
| temperature=req.temperature, |
| max_new_tokens=req.max_new_tokens or 2048, |
| deep_search=req.enable_browsing, |
| input_type="text", |
| output_format=req.output_format |
| ) |
| response_chunks = [] |
| for chunk in fallback_stream: |
| if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| yield chunk.encode('utf-8') |
| await asyncio.sleep(0.05) |
| else: |
| logger.warning(f"Skipping fallback chunk: {chunk}") |
| |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.error(f"Empty response from fallback model {fallback_model_name}.") |
| yield f"Error: Empty response from both {MODEL_NAME} and {SECONDARY_MODEL_NAME}.".encode('utf-8') |
| return |
| |
| if user and conversation: |
| assistant_msg = Message(role="assistant", content=response, conversation_id=conversation.id) |
| db.add(assistant_msg) |
| await db.commit() |
| conversation.updated_at = datetime.utcnow() |
| await db.commit() |
| yield json.dumps({ |
| "conversation_id": conversation.conversation_id, |
| "conversation_url": f"https://mgzon-mgzon-app.hf.space/chat/{conversation.conversation_id}", |
| "conversation_title": conversation.title |
| }, ensure_ascii=False).encode('utf-8') |
| except Exception as e: |
| logger.error(f"Chat generation failed: {e}") |
| yield f"Error: Chat generation failed: {str(e)}".encode('utf-8') |
|
|
| return StreamingResponse(stream_response(), media_type="text/plain") |
| |
| @router.post("/api/image-generation") |
| async def image_generation_endpoint( |
| request: Request, |
| req: dict, |
| file: Optional[UploadFile] = File(None), |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| await handle_session(request) |
| |
| prompt = req.get("prompt", "") |
| output_format = req.get("output_format", "image") |
| if not prompt.strip(): |
| raise HTTPException(status_code=400, detail="Prompt is required for image generation.") |
| |
| model_name, api_endpoint = select_model(prompt, input_type="image_gen") |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model_name} is not available at {api_endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") |
| |
| image_data = None |
| if file: |
| image_data = await file.read() |
| |
| system_prompt = enhance_system_prompt( |
| "You are an expert in generating high-quality images based on detailed prompts. Ensure the output is visually appealing and matches the user's description.", |
| prompt, user |
| ) |
| |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message=prompt, |
| system_prompt=system_prompt, |
| model_name=model_name, |
| temperature=0.7, |
| max_new_tokens=2048, |
| input_type="image_gen", |
| image_data=image_data, |
| output_format=output_format |
| ) |
| |
| if output_format == "image": |
| image_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing image chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") |
| if isinstance(chunk, bytes): |
| image_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-bytes chunk in image stream: {chunk}") |
| if not image_chunks: |
| logger.error("No image data generated.") |
| raise HTTPException(status_code=500, detail="No image data generated for image generation.") |
| image_data = b"".join(image_chunks) |
| return StreamingResponse(io.BytesIO(image_data), media_type="image/png") |
| except Exception as e: |
| logger.error(f"Image generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Image generation failed: {str(e)}") |
| |
| response_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing text chunk: {chunk[:100]}...") |
| if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| else: |
| logger.warning(f"Skipping chunk: {chunk}") |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.error("Empty response generated.") |
| raise HTTPException(status_code=500, detail="Empty response generated from model.") |
| return {"response": response} |
| except Exception as e: |
| logger.error(f"Image generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Image generation failed: {str(e)}") |
|
|
|
|
| @router.post("/api/audio-transcription") |
| async def audio_transcription_endpoint( |
| request: Request, |
| file: UploadFile = File(...), |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| logger.info(f"Received audio transcription request for file: {file.filename}") |
| |
| if not user: |
| await handle_session(request) |
| |
| conversation = None |
| if user: |
| title = "Audio Transcription" |
| result = await db.execute( |
| select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.updated_at.desc()) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| conversation_id = str(uuid.uuid4()) |
| conversation = Conversation( |
| conversation_id=conversation_id, |
| user_id=user.id, |
| title=title |
| ) |
| db.add(conversation) |
| await db.commit() |
| await db.refresh(conversation) |
| |
| user_msg = Message(role="user", content="Audio message", conversation_id=conversation.id) |
| db.add(user_msg) |
| await db.commit() |
| |
| model_name, api_endpoint = select_model("transcribe audio", input_type="audio") |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model_name} is not available at {api_endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") |
| |
| audio_data = await file.read() |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message="Transcribe audio", |
| system_prompt="Transcribe the provided audio using Whisper. Ensure accurate transcription in the detected language.", |
| model_name=model_name, |
| temperature=0.7, |
| max_new_tokens=2048, |
| input_type="audio", |
| audio_data=audio_data, |
| output_format="text" |
| ) |
| response_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing transcription chunk: {chunk[:100]}...") |
| if isinstance(chunk, str): |
| response_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-string chunk in transcription stream: {chunk}") |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.error("Empty transcription generated.") |
| raise HTTPException(status_code=500, detail="Empty transcription generated from model.") |
| logger.info(f"Audio transcription response: {response[:100]}...") |
| except Exception as e: |
| logger.error(f"Audio transcription failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Audio transcription failed: {str(e)}") |
| |
| if user and conversation: |
| assistant_msg = Message(role="assistant", content=response, conversation_id=conversation.id) |
| db.add(assistant_msg) |
| await db.commit() |
| conversation.updated_at = datetime.utcnow() |
| await db.commit() |
| return { |
| "transcription": response, |
| "conversation_id": conversation.conversation_id, |
| "conversation_url": f"https://mgzon-mgzon-app.hf.space/chat/{conversation.conversation_id}", |
| "conversation_title": conversation.title |
| } |
| |
| return {"transcription": response} |
|
|
| @router.post("/api/text-to-speech") |
| async def text_to_speech_endpoint( |
| request: Request, |
| req: dict, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| await handle_session(request) |
| |
| text = req.get("text", "") |
| if not text.strip(): |
| raise HTTPException(status_code=400, detail="Text input is required for text-to-speech.") |
| |
| model_name, api_endpoint = select_model("text to speech", input_type="tts") |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model_name} is not available at {api_endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") |
| |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message=text, |
| system_prompt="Convert the provided text to speech using a text-to-speech model. Ensure clear and natural pronunciation, especially for Arabic text.", |
| model_name=model_name, |
| temperature=0.7, |
| max_new_tokens=2048, |
| input_type="tts", |
| output_format="audio" |
| ) |
| audio_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing TTS chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") |
| if isinstance(chunk, bytes): |
| audio_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-bytes chunk in TTS stream: {chunk}") |
| if not audio_chunks: |
| logger.error("No audio data generated for TTS.") |
| raise HTTPException(status_code=500, detail="No audio data generated for text-to-speech.") |
| audio_data = b"".join(audio_chunks) |
| return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") |
| except Exception as e: |
| logger.error(f"Text-to-speech generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Text-to-speech generation failed: {str(e)}") |
|
|
| @router.post("/api/code") |
| async def code_endpoint( |
| request: Request, |
| req: dict, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| await handle_session(request) |
| |
| framework = req.get("framework") |
| task = req.get("task") |
| code = req.get("code", "") |
| output_format = req.get("output_format", "text") |
| if not task: |
| raise HTTPException(status_code=400, detail="Task description is required.") |
| |
| prompt = f"Generate code for task: {task} using {framework}. Existing code: {code}" |
| preferred_model = user.preferred_model if user else None |
| model_name, api_endpoint = select_model(prompt, input_type="text", preferred_model=preferred_model) |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model_name} is not available at {api_endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") |
| |
| system_prompt = enhance_system_prompt( |
| "You are a coding expert. Provide detailed, well-commented code with examples and explanations.", |
| prompt, user |
| ) |
| |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message=prompt, |
| system_prompt=system_prompt, |
| model_name=model_name, |
| temperature=0.7, |
| max_new_tokens=2048, |
| input_type="text", |
| output_format=output_format |
| ) |
| if output_format == "audio": |
| audio_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing code audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") |
| if isinstance(chunk, bytes): |
| audio_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-bytes chunk in code audio stream: {chunk}") |
| if not audio_chunks: |
| logger.error("No audio data generated for code.") |
| raise HTTPException(status_code=500, detail="No audio data generated for code.") |
| audio_data = b"".join(audio_chunks) |
| return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") |
| except Exception as e: |
| logger.error(f"Code audio generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Code audio generation failed: {str(e)}") |
| |
| response_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing code text chunk: {chunk[:100]}...") |
| if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| else: |
| logger.warning(f"Skipping code chunk: {chunk}") |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.error("Empty code response generated.") |
| raise HTTPException(status_code=500, detail="Empty code response generated from model.") |
| return {"generated_code": response} |
| except Exception as e: |
| logger.error(f"Code generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Code generation failed: {str(e)}") |
|
|
| @router.post("/api/analysis") |
| async def analysis_endpoint( |
| request: Request, |
| req: dict, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| await handle_session(request) |
| |
| message = req.get("text", "") |
| output_format = req.get("output_format", "text") |
| if not message.strip(): |
| raise HTTPException(status_code=400, detail="Text input is required for analysis.") |
| |
| preferred_model = user.preferred_model if user else None |
| model_name, api_endpoint = select_model(message, input_type="text", preferred_model=preferred_model) |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model_name} is not available at {api_endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") |
| |
| system_prompt = enhance_system_prompt( |
| "You are an expert analyst. Provide detailed analysis with step-by-step reasoning and examples.", |
| message, user |
| ) |
| |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message=message, |
| system_prompt=system_prompt, |
| model_name=model_name, |
| temperature=0.7, |
| max_new_tokens=2048, |
| input_type="text", |
| output_format=output_format |
| ) |
| if output_format == "audio": |
| audio_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing analysis audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") |
| if isinstance(chunk, bytes): |
| audio_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-bytes chunk in analysis audio stream: {chunk}") |
| if not audio_chunks: |
| logger.error("No audio data generated for analysis.") |
| raise HTTPException(status_code=500, detail="No audio data generated for analysis.") |
| audio_data = b"".join(audio_chunks) |
| return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") |
| except Exception as e: |
| logger.error(f"Analysis audio generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Analysis audio generation failed: {str(e)}") |
| |
| response_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing analysis text chunk: {chunk[:100]}...") |
| if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| else: |
| logger.warning(f"Skipping analysis chunk: {chunk}") |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.error("Empty analysis response generated.") |
| raise HTTPException(status_code=500, detail="Empty analysis response generated from model.") |
| return {"analysis": response} |
| except Exception as e: |
| logger.error(f"Analysis generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Analysis generation failed: {str(e)}") |
|
|
| @router.post("/api/image-analysis") |
| async def image_analysis_endpoint( |
| request: Request, |
| file: UploadFile = File(...), |
| output_format: str = "text", |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| await handle_session(request) |
| |
| conversation = None |
| if user: |
| title = "Image Analysis" |
| result = await db.execute( |
| select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.updated_at.desc()) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| conversation_id = str(uuid.uuid4()) |
| conversation = Conversation( |
| conversation_id=conversation_id, |
| user_id=user.id, |
| title=title |
| ) |
| db.add(conversation) |
| await db.commit() |
| await db.refresh(conversation) |
| |
| user_msg = Message(role="user", content="Image analysis request", conversation_id=conversation.id) |
| db.add(user_msg) |
| await db.commit() |
| |
| preferred_model = user.preferred_model if user else None |
| model_name, api_endpoint = select_model("analyze image", input_type="image", preferred_model=preferred_model) |
| |
| is_available, api_key, selected_endpoint = check_model_availability(model_name, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model_name} is not available at {api_endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model_name} is not available. Please try another model.") |
| |
| image_data = await file.read() |
| system_prompt = enhance_system_prompt( |
| "You are an expert in image analysis. Provide detailed descriptions or classifications based on the query.", |
| "Analyze this image", user |
| ) |
| |
| stream = request_generation( |
| api_key=api_key, |
| api_base=selected_endpoint, |
| message="Analyze this image", |
| system_prompt=system_prompt, |
| model_name=model_name, |
| temperature=0.7, |
| max_new_tokens=2048, |
| input_type="image", |
| image_data=image_data, |
| output_format=output_format |
| ) |
| if output_format == "audio": |
| audio_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing image analysis audio chunk: {chunk[:100] if isinstance(chunk, str) else 'bytes'}") |
| if isinstance(chunk, bytes): |
| audio_chunks.append(chunk) |
| else: |
| logger.warning(f"Unexpected non-bytes chunk in image analysis audio stream: {chunk}") |
| if not audio_chunks: |
| logger.error("No audio data generated for image analysis.") |
| raise HTTPException(status_code=500, detail="No audio data generated for image analysis.") |
| audio_data = b"".join(audio_chunks) |
| return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") |
| except Exception as e: |
| logger.error(f"Image analysis audio generation failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Image analysis audio generation failed: {str(e)}") |
| |
| response_chunks = [] |
| try: |
| for chunk in stream: |
| logger.debug(f"Processing image analysis text chunk: {chunk[:100]}...") |
| if isinstance(chunk, str) and chunk.strip() and chunk not in ["analysis", "assistantfinal"]: |
| response_chunks.append(chunk) |
| else: |
| logger.warning(f"Skipping image analysis chunk: {chunk}") |
| response = "".join(response_chunks) |
| if not response.strip(): |
| logger.error("Empty image analysis response generated.") |
| raise HTTPException(status_code=500, detail="Empty image analysis response generated from model.") |
| |
| if user and conversation: |
| assistant_msg = Message(role="assistant", content=response, conversation_id=conversation.id) |
| db.add(assistant_msg) |
| await db.commit() |
| conversation.updated_at = datetime.utcnow() |
| await db.commit() |
| return { |
| "image_analysis": response, |
| "conversation_id": conversation.conversation_id, |
| "conversation_url": f"https://mgzon-mgzon-app.hf.space/chat/{conversation.conversation_id}", |
| "conversation_title": conversation.title |
| } |
| |
| return {"image_analysis": response} |
| except Exception as e: |
| logger.error(f"Image analysis failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Image analysis failed: {str(e)}") |
|
|
| @router.get("/api/test-model") |
| async def test_model(model: str = MODEL_NAME, endpoint: str = API_ENDPOINT): |
| try: |
| is_available, api_key, selected_endpoint = check_model_availability(model, HF_TOKEN) |
| if not is_available: |
| logger.error(f"Model {model} is not available at {endpoint}") |
| raise HTTPException(status_code=503, detail=f"Model {model} is not available.") |
| |
| client = OpenAI(api_key=api_key, base_url=selected_endpoint, timeout=60.0) |
| response = client.chat.completions.create( |
| model=model, |
| messages=[{"role": "user", "content": "Test"}], |
| max_tokens=50 |
| ) |
| logger.debug(f"Test model response: {response.choices[0].message.content}") |
| return {"status": "success", "response": response.choices[0].message.content} |
| except Exception as e: |
| logger.error(f"Test model failed: {e}") |
| raise HTTPException(status_code=500, detail=f"Test model failed: {str(e)}") |
|
|
| @router.post("/api/conversations", response_model=ConversationOut) |
| async def create_conversation( |
| req: ConversationCreate, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| conversation_id = str(uuid.uuid4()) |
| conversation = Conversation( |
| conversation_id=conversation_id, |
| title=req.title or "Untitled Conversation", |
| user_id=user.id |
| ) |
| db.add(conversation) |
| await db.commit() |
| await db.refresh(conversation) |
| return ConversationOut.from_orm(conversation) |
|
|
| @router.get("/api/conversations/{conversation_id}", response_model=ConversationOut) |
| async def get_conversation( |
| conversation_id: str, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| result = await db.execute( |
| select(Conversation).filter( |
| Conversation.conversation_id == conversation_id, |
| Conversation.user_id == user.id |
| ) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| raise HTTPException(status_code=404, detail="Conversation not found") |
| return ConversationOut.from_orm(conversation) |
|
|
| @router.get("/api/conversations", response_model=List[ConversationOut]) |
| async def list_conversations( |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| result = await db.execute( |
| select(Conversation).filter(Conversation.user_id == user.id).order_by(Conversation.created_at.desc()) |
| ) |
| conversations = result.scalars().all() |
| return [ConversationOut.from_orm(conv) for conv in conversations] |
|
|
| @router.put("/api/conversations/{conversation_id}/title") |
| async def update_conversation_title( |
| conversation_id: str, |
| title: str, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| result = await db.execute( |
| select(Conversation).filter( |
| Conversation.conversation_id == conversation_id, |
| Conversation.user_id == user.id |
| ) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| raise HTTPException(status_code=404, detail="Conversation not found") |
| |
| conversation.title = title |
| conversation.updated_at = datetime.utcnow() |
| await db.commit() |
| return {"message": "Conversation title updated", "title": conversation.title} |
|
|
| @router.delete("/api/conversations/{conversation_id}") |
| async def delete_conversation( |
| conversation_id: str, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| result = await db.execute( |
| select(Conversation).filter( |
| Conversation.conversation_id == conversation_id, |
| Conversation.user_id == user.id |
| ) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| raise HTTPException(status_code=404, detail="Conversation not found") |
| |
| await db.execute(delete(Message).filter(Message.conversation_id == conversation.id)) |
| await db.delete(conversation) |
| await db.commit() |
| return {"message": "Conversation deleted successfully"} |
|
|
| @router.get("/users/me") |
| async def get_user_settings(user: User = Depends(current_active_user)): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| return { |
| "id": user.id, |
| "email": user.email, |
| "display_name": user.display_name, |
| "preferred_model": user.preferred_model, |
| "job_title": user.job_title, |
| "education": user.education, |
| "interests": user.interests, |
| "additional_info": user.additional_info, |
| "conversation_style": user.conversation_style, |
| "is_active": user.is_active, |
| "is_superuser": user.is_superuser |
| } |
|
|
|
|
|
|
| @router.get("/api/verify-token") |
| async def verify_token(user: User = Depends(current_active_user)): |
| if not user: |
| raise HTTPException(status_code=401, detail="Invalid or expired token") |
| return { |
| "status": "valid", |
| "user": { |
| "id": user.id, |
| "email": user.email, |
| "is_active": user.is_active |
| } |
| } |
|
|
| @router.put("/users/me") |
| async def update_user_settings( |
| settings: UserUpdate, |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| |
| if settings.preferred_model and settings.preferred_model not in MODEL_ALIASES: |
| raise HTTPException(status_code=400, detail="Invalid model alias") |
| |
| if settings.display_name is not None: |
| user.display_name = settings.display_name |
| if settings.preferred_model is not None: |
| user.preferred_model = settings.preferred_model |
| if settings.job_title is not None: |
| user.job_title = settings.job_title |
| if settings.education is not None: |
| user.education = settings.education |
| if settings.interests is not None: |
| user.interests = settings.interests |
| if settings.additional_info is not None: |
| user.additional_info = settings.additional_info |
| if settings.conversation_style is not None: |
| user.conversation_style = settings.conversation_style |
| |
| await db.commit() |
| await db.refresh(user) |
| return {"message": "Settings updated successfully", "user": { |
| "id": user.id, |
| "email": user.email, |
| "display_name": user.display_name, |
| "preferred_model": user.preferred_model, |
| "job_title": user.job_title, |
| "education": user.education, |
| "interests": user.interests, |
| "additional_info": user.additional_info, |
| "conversation_style": user.conversation_style, |
| "is_active": user.is_active, |
| "is_superuser": user.is_superuser |
| }} |
| |
|
|
|
|
| @router.post("/api/conversations/sync", response_model=ConversationOut) |
| async def sync_conversation( |
| request: Request, |
| payload: dict = Body(...), |
| user: User = Depends(current_active_user), |
| db: AsyncSession = Depends(get_db) |
| ): |
| if not user: |
| raise HTTPException(status_code=401, detail="Login required") |
| |
| messages = payload.get("messages", []) |
| title = payload.get("title", "Untitled Conversation") |
| conversation_id = payload.get("conversation_id") |
| |
| logger.info(f"Syncing conversation for user {user.email}, conversation_id: {conversation_id}") |
|
|
| try: |
| |
| if conversation_id: |
| result = await db.execute( |
| select(Conversation).filter( |
| Conversation.conversation_id == conversation_id, |
| Conversation.user_id == user.id |
| ) |
| ) |
| conversation = result.scalar_one_or_none() |
| if not conversation: |
| raise HTTPException(status_code=404, detail="Conversation not found") |
| |
| |
| conversation.title = title |
| conversation.updated_at = datetime.utcnow() |
| |
| |
| await db.execute( |
| delete(Message).filter(Message.conversation_id == conversation.id) |
| ) |
| |
| |
| for msg in messages: |
| new_message = Message( |
| conversation_id=conversation.id, |
| role=msg.get("role", "user"), |
| content=msg.get("content", ""), |
| created_at=datetime.utcnow() |
| ) |
| db.add(new_message) |
| |
| await db.commit() |
| await db.refresh(conversation) |
| logger.info(f"Updated conversation {conversation_id} for user {user.email}") |
| else: |
| |
| conversation_id = str(uuid.uuid4()) |
| conversation = Conversation( |
| conversation_id=conversation_id, |
| user_id=user.id, |
| title=title, |
| created_at=datetime.utcnow(), |
| updated_at=datetime.utcnow() |
| ) |
| db.add(conversation) |
| await db.commit() |
| await db.refresh(conversation) |
| |
| |
| for msg in messages: |
| new_message = Message( |
| conversation_id=conversation.id, |
| role=msg.get("role", "user"), |
| content=msg.get("content", ""), |
| created_at=datetime.utcnow() |
| ) |
| db.add(new_message) |
| |
| await db.commit() |
| logger.info(f"Created new conversation {conversation_id} for user {user.email}") |
|
|
| return ConversationOut.from_orm(conversation) |
| except Exception as e: |
| logger.error(f"Error syncing conversation: {str(e)}") |
| raise HTTPException(status_code=500, detail=f"Failed to sync conversation: {str(e)}") |
|
|