#!/usr/bin/env python3 """Batch TTS generator with auto-restart on failure.""" import subprocess, time, os, sys, glob TOKEN = os.environ.get("HF_TOKEN", "") CHUNK_DIR = "/tmp/tts/smchunks" OUT_DIR = "/tmp/tts/smout" VOICE = "scarlett_johansson" FMT = "ogg" SCRIPT = "/home/runner/.openclaw/workspace/scripts/voice.py" RESTART_SCRIPT = "/home/runner/.openclaw/workspace/scripts/restart_space.py" os.makedirs(OUT_DIR, exist_ok=True) def restart_space(): print("Restarting space...") subprocess.run(["python3", RESTART_SCRIPT], env={**os.environ, "HF_TOKEN": TOKEN}, capture_output=True) print("Waiting 95s for startup...") time.sleep(95) # Quick test r = subprocess.run( ["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", "--max-time", "60", "https://hf4uwho-pocket-tts.hf.space/tts?text=x&voice=af_alloy&format=ogg"], capture_output=True, text=True, timeout=70 ) ok = r.stdout.strip() == "200" print(f"Space test: {'OK' if ok else 'FAIL'}") return ok def generate_chunk(idx): chunk_file = f"{CHUNK_DIR}/{idx:02d}" outfile = f"{OUT_DIR}/chunk_{idx:02d}.{FMT}" if os.path.exists(outfile) and os.path.getsize(outfile) > 1000: print(f" Chunk {idx:02d} already exists, skipping") return True print(f" Generating chunk {idx:02d}...", end=" ", flush=True) r = subprocess.run( ["python3", SCRIPT, "--file", chunk_file, VOICE, FMT], capture_output=True, text=True, timeout=600 ) # Find the output file if r.returncode == 0 and r.stdout.strip(): gen_file = r.stdout.strip() if os.path.exists(gen_file) and os.path.getsize(gen_file) > 1000: os.rename(gen_file, outfile) sz = os.path.getsize(outfile) print(f"OK ({sz} bytes)") return True print(f"FAIL (exit={r.returncode})") return False # Get all chunk files chunk_files = sorted(glob.glob(f"{CHUNK_DIR}/[0-9][0-9]")) total = len(chunk_files) print(f"Total chunks: {total}") # Restart space first if not restart_space(): print("Space not responding after restart, retrying...") if not restart_space(): print("FATAL: Space won't start") sys.exit(1) failures_in_a_row = 0 i = 0 while i < total: idx = int(os.path.basename(chunk_files[i])) success = generate_chunk(idx) if success: failures_in_a_row = 0 i += 1 else: failures_in_a_row += 1 if failures_in_a_row >= 2: print(f" {failures_in_a_row} failures, restarting space...") if not restart_space(): print("Space won't restart, waiting 120s and retrying...") time.sleep(120) if not restart_space(): print("FATAL: Space won't restart") sys.exit(1) failures_in_a_row = 0 else: print(" Single failure, waiting 30s and retrying chunk...") time.sleep(30) print(f"\nAll {total} chunks generated!") # Concatenate ffmpeg_path = "/tmp/ffmpeg-master-latest-linux64-gpl/bin/ffmpeg" concat_list = f"{OUT_DIR}/concat.txt" with open(concat_list, 'w') as f: for j in range(total): chunk_path = f"{OUT_DIR}/chunk_{j:02d}.{FMT}" if os.path.exists(chunk_path): f.write(f"file '{chunk_path}'\n") final = f"{OUT_DIR}/giselle_60min_full.ogg" r = subprocess.run( [ffmpeg_path, "-y", "-f", "concat", "-safe", "0", "-i", concat_list, "-c", "copy", final], capture_output=True, text=True, timeout=60 ) if r.returncode == 0: sz = os.path.getsize(final) # Get duration r2 = subprocess.run( [ffmpeg_path.replace("ffmpeg", "ffprobe"), "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", final], capture_output=True, text=True, timeout=10 ) dur = float(r2.stdout.strip()) if r2.stdout.strip() else 0 print(f"FINAL: {final}") print(f"SIZE: {sz} bytes") print(f"DURATION: {dur/60:.1f} minutes") else: print(f"Concat failed: {r.stderr[:500]}")