""" Avatar Chatbot - HuggingFace Spaces Edition Avatar profile system: each avatar has ref.png, persona.txt, idlevideos/ Chunked pipeline: LLM (once) → split sentences → TTS+FLOAT per chunk → stream to frontend """ from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse from pathlib import Path import mimetypes import os import re import logging import subprocess import time as _time import uuid import cv2 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # ---- Avatar system ---- AVATARS_DIR = Path("/app/avatars") _current_avatar = None # name of active avatar folder TTS_DIR = Path("/tmp/tts_output") LIPSYNC_DIR = Path("/tmp/lipsync_output") REPO_ID = os.environ.get("SPACE_ID", "ffbeqwbe/AvatarChatbot") FLOAT_ENABLED = os.environ.get("FLOAT_ENABLED", "true").lower() == "true" for d in [TTS_DIR, LIPSYNC_DIR]: d.mkdir(parents=True, exist_ok=True) def _ts(): return _time.strftime("%H:%M:%S", _time.gmtime()) + f".{int(_time.time()*1000)%1000:03d}" def _get_avatar_dir(name=None): """Get path to an avatar's directory.""" if name is None: name = _current_avatar if name: return AVATARS_DIR / name return None def _get_idle_videos_dir(): """Get the current avatar's idle videos directory.""" d = _get_avatar_dir() if d: return d / "idlevideos" return Path("/tmp/videos") def _list_idle_videos(): """List idle video files for the current avatar.""" d = _get_idle_videos_dir() if not d.exists(): return [] return [ {"name": f.stem, "filename": f.name} for f in sorted(d.iterdir()) if f.suffix.lower() in (".mp4", ".webm", ".mkv", ".mov") and f.stat().st_size > 1024 ] import threading _gpu_lock = threading.Lock() _sessions = {} GREETING_DATA = None def _discover_avatars(): """Find all avatar profiles in the avatars directory.""" if not AVATARS_DIR.exists(): logger.warning(f"[AVATARS] Directory not found: {AVATARS_DIR}") return [] avatars = [] for d in sorted(AVATARS_DIR.iterdir()): if d.is_dir(): has_ref = (d / "ref.png").exists() or (d / "ref.jpg").exists() has_persona = (d / "persona.txt").exists() has_videos = (d / "idlevideos").exists() avatars.append({ "name": d.name, "has_ref": has_ref, "has_persona": has_persona, "has_videos": has_videos, }) return avatars def _parse_avatar_config(avatar_dir): """Parse config.txt from avatar folder. Returns dict of key=value pairs.""" config = {} config_path = avatar_dir / "config.txt" if config_path.exists(): for line in open(config_path).readlines(): line = line.strip() if '=' in line and not line.startswith('#'): key, val = line.split('=', 1) config[key.strip().lower()] = val.strip() return config def _switch_avatar(avatar_name: str): """Switch to a different avatar profile.""" global _current_avatar avatar_dir = AVATARS_DIR / avatar_name if not avatar_dir.exists(): logger.error(f"[AVATARS] Avatar not found: {avatar_name}") return False _current_avatar = avatar_name logger.info(f"[AVATARS] Switching to avatar: {avatar_name}") # 0. Parse config.txt config = _parse_avatar_config(avatar_dir) # 1. Load persona into LLM persona_path = avatar_dir / "persona.txt" from llm import load_persona_from_file persona_text = load_persona_from_file(str(persona_path)) logger.info(f"[AVATARS] Persona loaded: {persona_text[:80]}..." if persona_text else "[AVATARS] No persona.txt found") # 2. Set TTS voice from config (default: diana) tts_voice = config.get("groqttsname", "diana") from groq_tts import set_voice set_voice(tts_voice) # 3. Update FLOAT reference image if FLOAT_ENABLED: try: from float_lipsync import get_lipsync lipsync = get_lipsync() if lipsync.ready: ref_path = avatar_dir / "ref.png" if not ref_path.exists(): ref_path = avatar_dir / "ref.jpg" if ref_path.exists(): lipsync.update_reference_image(str(ref_path)) logger.info(f"[AVATARS] Reference image updated: {ref_path}") else: logger.warning(f"[AVATARS] No ref image in {avatar_dir}") except Exception as e: logger.error(f"[AVATARS] Failed to update ref image: {e}") # 3. Idle videos — no action needed, _get_idle_videos_dir() uses _current_avatar dynamically logger.info(f"[AVATARS] ✓ Switched to {avatar_name}") return True @app.on_event("startup") async def startup(): # Discover avatars and set default avatars = _discover_avatars() logger.info(f"[STARTUP] Found {len(avatars)} avatars: {[a['name'] for a in avatars]}") if avatars: # Use first avatar as default default = avatars[0]["name"] # Load persona for default avatar (before FLOAT init so ref image is set) global _current_avatar _current_avatar = default persona_path = AVATARS_DIR / default / "persona.txt" from llm import load_persona_from_file load_persona_from_file(str(persona_path)) # Load config (TTS voice etc.) config = _parse_avatar_config(AVATARS_DIR / default) tts_voice = config.get("groqttsname", "diana") logger.info(f"[STARTUP] Default avatar: {default} | voice: {tts_voice}") await init_tts() # Apply default avatar's TTS voice (after TTS init) if avatars: config = _parse_avatar_config(AVATARS_DIR / _current_avatar) tts_voice = config.get("groqttsname", "diana") from groq_tts import set_voice set_voice(tts_voice) await init_stt() if FLOAT_ENABLED: await init_float() await generate_greeting() async def init_tts(): logger.info(f"[{_ts()}] [STARTUP] Initializing TTS...") t0 = _time.time() try: import groq_tts as tts_module tts_module.initialize() logger.info(f"[{_ts()}] [STARTUP] ✓ TTS ready in {_time.time()-t0:.2f}s") except Exception as e: logger.error(f"[{_ts()}] [STARTUP] TTS init failed: {type(e).__name__}: {e}", exc_info=True) async def generate_greeting(): global GREETING_DATA greeting_text = "Hello! Feel free to ask me anything." logger.info(f"[{_ts()}] [GREETING] Generating warmup greeting...") t0 = _time.time() try: from groq_tts import generate_audio audio_path = generate_audio(greeting_text) audio_url = None audio_duration = 0 if audio_path and os.path.exists(audio_path): audio_url = f"/api/audio/{os.path.basename(audio_path)}" try: r = subprocess.run(["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", audio_path], capture_output=True, text=True, check=True) audio_duration = float(r.stdout.strip()) except Exception: audio_duration = 5.0 lipsync_video_url = None if FLOAT_ENABLED: try: from float_lipsync import get_lipsync lipsync = get_lipsync() if lipsync.ready and audio_path: lp = lipsync.generate(audio_path) if lp and os.path.exists(lp): lipsync_video_url = f"/api/stream/{os.path.basename(lp)}" except Exception as e: logger.warning(f"[GREETING] FLOAT failed: {e}") GREETING_DATA = { "text": greeting_text, "audio_url": audio_url, "audio_duration": audio_duration, "lipsync_video_url": lipsync_video_url, } logger.info(f"[{_ts()}] [GREETING] Ready in {_time.time()-t0:.2f}s (lipsync: {lipsync_video_url is not None})") except Exception as e: logger.error(f"[{_ts()}] [GREETING] Failed: {e}", exc_info=True) GREETING_DATA = {"text": greeting_text, "audio_url": None, "audio_duration": 0, "lipsync_video_url": None} async def init_stt(): """Initialize Vosk speech-to-text.""" logger.info(f"[{_ts()}] [STARTUP] Initializing STT...") try: import vosk_stt if vosk_stt.initialize(): logger.info(f"[{_ts()}] [STARTUP] ✓ STT ready") else: logger.warning(f"[{_ts()}] [STARTUP] STT init failed (voice input disabled)") except Exception as e: logger.warning(f"[{_ts()}] [STARTUP] STT not available: {e}") async def init_float(): import torch logger.info(f"[STARTUP] CUDA available: {torch.cuda.is_available()}") if not torch.cuda.is_available(): logger.warning("[STARTUP] No GPU - FLOAT disabled"); return try: from float_lipsync import get_lipsync from huggingface_hub import hf_hub_download, snapshot_download import shutil ckpt = Path("/app/checkpoints") ckpt.mkdir(parents=True, exist_ok=True) w2v = ckpt / "wav2vec2-base-960h" if not w2v.exists(): logger.info("[STARTUP] Downloading wav2vec2-base-960h...") snapshot_download(repo_id="facebook/wav2vec2-base-960h", local_dir=str(w2v)) emo = ckpt / "wav2vec-english-speech-emotion-recognition" if not emo.exists(): logger.info("[STARTUP] Downloading emotion model...") snapshot_download(repo_id="r-f/wav2vec-english-speech-emotion-recognition", local_dir=str(emo)) fp = ckpt / "float.pth" if not fp.exists() or fp.stat().st_size < 1024: logger.info("[STARTUP] Downloading float.pth...") dl = hf_hub_download(repo_id=REPO_ID, repo_type="space", filename="app/checkpoints/float.pth", local_dir="/tmp/hf_download") shutil.copy2(dl, fp) logger.info(f"[STARTUP] float.pth: {fp.stat().st_size/1e6:.1f} MB") # Use current avatar's ref.png for initial FLOAT setup ref_path = None avatar_dir = _get_avatar_dir() if avatar_dir: for name in ["ref.png", "ref.jpg"]: p = avatar_dir / name if p.exists(): ref_path = str(p) break # Fallback to legacy location if not ref_path: assets = Path("/app/assets") assets.mkdir(parents=True, exist_ok=True) ref = assets / "ref.png" if not ref.exists() or ref.stat().st_size < 1024: for name in ["ref.png", "ref.jpg", "main2.png"]: try: dl = hf_hub_download(repo_id=REPO_ID, repo_type="space", filename=f"app/assets/{name}", local_dir="/tmp/hf_download") shutil.copy2(dl, assets / name) ref = assets / name break except Exception: continue else: logger.warning("[STARTUP] No ref image found"); return ref_path = str(ref) lipsync = get_lipsync() lipsync.initialize({ "ref_path": ref_path, "ckpt_path": str(fp), "wav2vec_model_path": str(w2v), "audio2emotion_path": str(emo), }) logger.info("[STARTUP] ✓ FLOAT ready!") except Exception as e: logger.error(f"[STARTUP] FLOAT init failed: {e}", exc_info=True) # ---- Avatar API endpoints ---- @app.get("/api/avatars") async def list_avatars(): """List all available avatar profiles.""" avatars = _discover_avatars() return JSONResponse({ "avatars": avatars, "current": _current_avatar, }) @app.post("/api/switch-avatar") async def switch_avatar(request: Request): """Switch to a different avatar. Resets chat history.""" body = await request.json() avatar_name = body.get("avatar", "").strip() if not avatar_name: raise HTTPException(400, "No avatar specified") success = _switch_avatar(avatar_name) if not success: raise HTTPException(404, f"Avatar not found: {avatar_name}") return JSONResponse({ "status": "ok", "avatar": avatar_name, "videos": _list_idle_videos(), }) @app.get("/api/videos") async def list_videos(): vids = _list_idle_videos() return JSONResponse({"videos": vids}) # ---- Avatar creation ---- _creation_status = {"active": False, "progress": 0, "total": 0, "message": "", "avatar_name": "", "done": False, "error": None} @app.post("/api/create-avatar") async def create_avatar(request: Request): """Create a new avatar from uploaded image + settings. Kicks off idle video generation.""" global _creation_status if _creation_status["active"]: raise HTTPException(409, "Avatar creation already in progress") import base64 body = await request.json() name = body.get("name", "").strip() voice = body.get("voice", "diana").strip() persona = body.get("persona", "").strip() image_b64 = body.get("image", "") if not name: raise HTTPException(400, "Name is required") if not image_b64: raise HTTPException(400, "Image is required") safe_name = re.sub(r'[^\w\-]', '', name) if not safe_name: raise HTTPException(400, "Invalid name") avatar_dir = AVATARS_DIR / safe_name if avatar_dir.exists(): raise HTTPException(409, f"Avatar '{safe_name}' already exists") avatar_dir.mkdir(parents=True, exist_ok=True) idle_dir = avatar_dir / "idlevideos" idle_dir.mkdir(exist_ok=True) try: if ',' in image_b64: image_b64 = image_b64.split(',', 1)[1] image_bytes = base64.b64decode(image_b64) ref_path = avatar_dir / "ref.png" import numpy as np nparr = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise ValueError("Could not decode image") cv2.imwrite(str(ref_path), img) logger.info(f"[CREATE] Saved ref image: {ref_path} ({img.shape})") except Exception as e: import shutil shutil.rmtree(str(avatar_dir), ignore_errors=True) raise HTTPException(400, f"Invalid image: {e}") with open(avatar_dir / "config.txt", "w") as f: f.write(f"groqttsname={voice}\n") with open(avatar_dir / "persona.txt", "w") as f: f.write(persona if persona else f"You are {name}.") logger.info(f"[CREATE] Avatar profile created: {safe_name} | voice: {voice}") _creation_status = { "active": True, "progress": 0, "total": 6, "message": "Starting idle video generation...", "avatar_name": safe_name, "done": False, "error": None } import asyncio asyncio.get_event_loop().run_in_executor(None, _run_idle_generation, safe_name, str(ref_path), str(idle_dir)) return JSONResponse({ "status": "started", "avatar": safe_name, "message": "Avatar created. Generating idle videos (this takes a few minutes)..." }) def _run_idle_generation(avatar_name, ref_path, idle_dir): """Background task: generate idle clips and switch to new avatar when done.""" global _creation_status try: from idle_generator import get_idle_generator generator = get_idle_generator() def on_progress(clip_idx, total, message): _creation_status["progress"] = clip_idx _creation_status["total"] = total _creation_status["message"] = message with _gpu_lock: generator.generate( ref_image_path=ref_path, output_dir=idle_dir, avatar_name=avatar_name, num_clips=6, progress_callback=on_progress, ) _switch_avatar(avatar_name) _creation_status["done"] = True _creation_status["message"] = f"Avatar '{avatar_name}' ready!" logger.info(f"[CREATE] ✓ Avatar '{avatar_name}' fully created and activated") except Exception as e: _creation_status["error"] = str(e) _creation_status["message"] = f"Error: {e}" logger.error(f"[CREATE] Failed: {e}", exc_info=True) finally: _creation_status["active"] = False @app.get("/api/create-avatar/status") async def create_avatar_status(): """Poll creation progress.""" return JSONResponse(_creation_status) @app.get("/api/stream/{filename}") async def stream_video(filename: str): path = None # Search in current avatar's idle videos, lipsync output, and TTS output search_dirs = [_get_idle_videos_dir(), LIPSYNC_DIR, TTS_DIR] for d in search_dirs: if d is None: continue c = d / filename if c.exists() and c.is_file(): path = c break if not path: raise HTTPException(404, "Not found") mime = mimetypes.guess_type(filename)[0] or "video/mp4" def gen(): with open(path, "rb") as f: while chunk := f.read(1024*1024): yield chunk return StreamingResponse(gen(), media_type=mime, headers={"Content-Length": str(path.stat().st_size), "Accept-Ranges": "bytes"}) @app.get("/api/audio/{filename}") async def stream_audio(filename: str): path = TTS_DIR / filename if not path.exists(): raise HTTPException(404, "Not found") mime = mimetypes.guess_type(filename)[0] or "audio/wav" def gen(): with open(path, "rb") as f: while chunk := f.read(1024*1024): yield chunk return StreamingResponse(gen(), media_type=mime, headers={"Content-Length": str(path.stat().st_size)}) def split_sentences(text): parts = re.split(r'(?<=[.!?])\s+', text) parts = [s.strip() for s in parts if s.strip()] MIN_WORDS = 25 merged = [] buffer = "" for p in parts: if buffer: buffer += " " + p else: buffer = p if len(buffer.split()) >= MIN_WORDS: merged.append(buffer) buffer = "" if buffer: if merged: merged[-1] += " " + buffer else: merged.append(buffer) return merged if merged else [text] @app.post("/api/chat") async def chat(request: Request): t0 = _time.time() body = await request.json() user_text = body.get("text", "").strip() if not user_text: raise HTTPException(400, "No text") logger.info(f"[{_ts()}] [CHAT] User: {user_text}") t1 = _time.time() from llm import generate_response llm_result = generate_response(user_text) reply_text = llm_result['text'] # With voice direction tags — sent to TTS clean_text = llm_result.get('clean_text', reply_text) # Without tags — shown in UI t2 = _time.time() logger.info(f"[{_ts()}] [CHAT] Reply: {reply_text}") logger.info(f"[{_ts()}] [TIMING] LLM: {t2-t1:.2f}s") sentences = split_sentences(reply_text) if not sentences: sentences = [reply_text] logger.info(f"[{_ts()}] [CHAT] Split into {len(sentences)} chunks") session_id = str(uuid.uuid4())[:8] _sessions[session_id] = { "sentences": sentences, "next_index": 0, "total": len(sentences), "done": False, "created": _time.time(), } return JSONResponse({ "text": clean_text, "tts_text": reply_text, "emotion": llm_result.get('emotion', 'neutral'), "session_id": session_id, "total_chunks": len(sentences), }) @app.post("/api/chat/next") async def chat_next(request: Request): t_req = _time.time() body = await request.json() session_id = body.get("session_id", "") session = _sessions.get(session_id) if not session: raise HTTPException(404, "Session not found") idx = session["next_index"] if idx >= session["total"]: session["done"] = True return JSONResponse({"done": True}) sentence = session["sentences"][idx] session["next_index"] = idx + 1 is_last = (idx + 1 >= session["total"]) total = session["total"] logger.info(f"[{_ts()}] [REQ] /api/chat/next chunk {idx+1}/{total} received") import asyncio result = await asyncio.to_thread(_generate_chunk, sentence, idx, total) t_resp = _time.time() logger.info(f"[{_ts()}] [REQ] /api/chat/next chunk {idx+1}/{total} responding | request-to-response: {t_resp-t_req:.2f}s") if is_last: session["done"] = True if len(_sessions) > 10: oldest = sorted(_sessions.keys(), key=lambda k: _sessions[k]["created"]) for k in oldest[:-10]: del _sessions[k] # Strip voice direction tags for UI display import re as _re clean_sentence = _re.sub(r'\[[^\]]*\]\s*', '', sentence).strip() return JSONResponse({ "done": False, "chunk_index": idx, "total_chunks": total, "is_last": is_last, "sentence": clean_sentence, "audio_url": result["audio_url"], "audio_duration": result["audio_duration"], "lipsync_video_url": result["lipsync_video_url"], }) def _generate_chunk(sentence, idx, total): logger.info(f"[{_ts()}] [CHUNK {idx+1}/{total}] START | \"{sentence[:80]}\"") t0 = _time.time() import torch with _gpu_lock: from groq_tts import generate_audio t_tts_start = _time.time() audio_path = generate_audio(sentence) t_tts_end = _time.time() if torch.cuda.is_available(): torch.cuda.empty_cache() audio_url = None audio_duration = 0 audio_size = 0 if audio_path and os.path.exists(audio_path): audio_size = os.path.getsize(audio_path) audio_url = f"/api/audio/{os.path.basename(audio_path)}" try: r = subprocess.run(["ffprobe","-v","quiet","-show_entries","format=duration", "-of","csv=p=0", audio_path], capture_output=True, text=True, check=True) audio_duration = float(r.stdout.strip()) except Exception: audio_duration = 3.0 t_probe_end = _time.time() else: t_probe_end = _time.time() logger.info(f"[{_ts()}] [CHUNK {idx+1}/{total}] TTS: {t_tts_end-t_tts_start:.2f}s | audio: {audio_duration:.1f}s, {audio_size/1024:.0f}KB") lipsync_video_url = None if FLOAT_ENABLED and audio_path and os.path.exists(audio_path): try: from float_lipsync import get_lipsync lipsync = get_lipsync() if lipsync.ready: t_float_start = _time.time() lp = lipsync.generate(audio_path) t_float_done = _time.time() logger.info(f"[{_ts()}] [CHUNK {idx+1}/{total}] FLOAT: {t_float_done-t_float_start:.2f}s") if lp and os.path.exists(lp): lipsync_video_url = f"/api/stream/{os.path.basename(lp)}" logger.info(f"[{_ts()}] [CHUNK {idx+1}/{total}] VIDEO: {os.path.getsize(lp)/1024:.0f}KB") except Exception as e: logger.error(f"[{_ts()}] [CHUNK {idx+1}/{total}] LIPSYNC FAILED: {e}", exc_info=True) if torch.cuda.is_available(): torch.cuda.empty_cache() total_time = _time.time() - t0 logger.info(f"[{_ts()}] [CHUNK {idx+1}/{total}] DONE | total: {total_time:.2f}s") return { "audio_url": audio_url, "audio_duration": audio_duration, "lipsync_video_url": lipsync_video_url, } @app.post("/api/reset") async def reset_chat(): from llm import reset_conversation reset_conversation() return JSONResponse({"status": "ok"}) @app.post("/api/transcribe") async def transcribe(request: Request): """Transcribe audio blob with Vosk. Accepts base64 JSON or raw body.""" try: content_type = request.headers.get("content-type", "") if "json" in content_type: body = await request.json() import base64 audio_bytes = base64.b64decode(body.get("audio", "")) audio_ct = body.get("content_type", "audio/webm") else: audio_bytes = await request.body() audio_ct = content_type or "audio/webm" if not audio_bytes or len(audio_bytes) < 500: return JSONResponse({"text": ""}) import asyncio import vosk_stt text = await asyncio.to_thread(vosk_stt.transcribe_audio, audio_bytes, audio_ct) return JSONResponse({"text": text}) except Exception as e: logger.error(f"[{_ts()}] [STT] Transcribe error: {e}", exc_info=True) return JSONResponse({"text": "", "error": str(e)}) @app.get("/api/greeting") async def get_greeting(): if GREETING_DATA: return JSONResponse(GREETING_DATA) return JSONResponse({"text": None}) @app.get("/", response_class=HTMLResponse) async def index(): return HTML_PAGE HTML_PAGE = """\