Spaces:
Build error
Build error
| # managers/gemini_manager.py | |
| # | |
| # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos | |
| # | |
| # Version: 1.1.0 | |
| # | |
| # This file defines the GeminiManager, a specialist responsible for all Natural | |
| # Language Processing, reasoning, and vision-language tasks. It acts as the | |
| # Scriptwriter, Editor, and Cinematic Director for the ADUC framework, generating | |
| # storyboards, prompts, and making creative decisions. | |
| import os | |
| import logging | |
| import json | |
| from pathlib import Path | |
| import gradio as gr | |
| from PIL import Image | |
| import google.generativeai as genai | |
| import re | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def robust_json_parser(raw_text: str) -> dict: | |
| """ | |
| Parses a JSON object from a string that might contain extra text, | |
| such as Markdown code blocks from an LLM's response. | |
| """ | |
| clean_text = raw_text.strip() | |
| try: | |
| # Try to find JSON delimited by ```json ... ``` | |
| match = re.search(r'```json\s*(\{.*?\})\s*```', clean_text, re.DOTALL) | |
| if match: | |
| json_str = match.group(1) | |
| return json.loads(json_str) | |
| # If not found, try to find the first '{' and the last '}' | |
| start_index = clean_text.find('{') | |
| end_index = clean_text.rfind('}') | |
| if start_index != -1 and end_index != -1 and end_index > start_index: | |
| json_str = clean_text[start_index : end_index + 1] | |
| return json.loads(json_str) | |
| else: | |
| raise ValueError("No valid JSON object could be found in the AI's response.") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to decode JSON. The AI returned the following text:\n---\n{raw_text}\n---") | |
| raise ValueError(f"The AI returned an invalid JSON format: {e}") | |
| class GeminiManager: | |
| """ | |
| Manages interactions with the Google Gemini API, acting as the primary | |
| reasoning and language specialist for the ADUC framework. | |
| """ | |
| def __init__(self): | |
| self.api_key = os.environ.get("GEMINI_API_KEY") | |
| if self.api_key: | |
| genai.configure(api_key=self.api_key) | |
| self.model = genai.GenerativeModel('gemini-2.5-pro') | |
| logger.info("Gemini Specialist (1.5 Pro) initialized successfully.") | |
| else: | |
| self.model = None | |
| logger.warning("Gemini API key not found. Specialist disabled.") | |
| def _check_model(self): | |
| """Raises an error if the Gemini API is not configured.""" | |
| if not self.model: | |
| raise gr.Error("The Google Gemini API key is not configured (GEMINI_API_KEY).") | |
| def _read_prompt_template(self, filename: str) -> str: | |
| """Reads a prompt template file from the 'prompts' directory.""" | |
| try: | |
| # Assuming the 'prompts' directory is in the root of the project | |
| prompts_dir = Path(__file__).resolve().parent.parent / "prompts" | |
| with open(prompts_dir / filename, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| raise gr.Error(f"Prompt template file not found: prompts/{filename}") | |
| def generate_storyboard(self, prompt: str, num_keyframes: int, ref_image_paths: list[str]) -> list[str]: | |
| """Delegated task: Acts as a Scriptwriter to generate a storyboard.""" | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("unified_storyboard_prompt.txt") | |
| storyboard_prompt = template.format(user_prompt=prompt, num_fragments=num_keyframes) | |
| model_contents = [storyboard_prompt] + [Image.open(p) for p in ref_image_paths] | |
| logger.info("Calling Gemini to generate storyboard...") | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"Gemini responded with (raw storyboard): {response.text}") | |
| storyboard_data = robust_json_parser(response.text) | |
| storyboard = storyboard_data.get("scene_storyboard", []) | |
| if not storyboard or len(storyboard) != num_keyframes: | |
| raise ValueError(f"Incorrect number of scenes generated. Expected {num_keyframes}, got {len(storyboard)}.") | |
| return storyboard | |
| except Exception as e: | |
| raise gr.Error(f"The Scriptwriter (Gemini) failed: {e}") | |
| def select_keyframes_from_pool(self, storyboard: list, base_image_paths: list[str], pool_image_paths: list[str]) -> list[str]: | |
| """Delegated task: Acts as a Photographer/Editor to select keyframes.""" | |
| self._check_model() | |
| if not pool_image_paths: | |
| raise gr.Error("The 'image pool' (Additional Images) is empty.") | |
| try: | |
| template = self._read_prompt_template("keyframe_selection_prompt.txt") | |
| image_map = {f"IMG-{i+1}": path for i, path in enumerate(pool_image_paths)} | |
| base_image_map = {f"BASE-{i+1}": path for i, path in enumerate(base_image_paths)} | |
| model_contents = ["# Reference Images (Story Base)"] | |
| for identifier, path in base_image_map.items(): | |
| model_contents.extend([f"Identifier: {identifier}", Image.open(path)]) | |
| model_contents.append("\n# Image Pool (Scene Bank)") | |
| for identifier, path in image_map.items(): | |
| model_contents.extend([f"Identifier: {identifier}", Image.open(path)]) | |
| storyboard_str = "\n".join([f"- Scene {i+1}: {s}" for i, s in enumerate(storyboard)]) | |
| selection_prompt = template.format(storyboard_str=storyboard_str, image_identifiers=list(image_map.keys())) | |
| model_contents.append(selection_prompt) | |
| logger.info("Calling Gemini to select keyframes from pool...") | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"Gemini responded with (raw keyframe selection): {response.text}") | |
| selection_data = robust_json_parser(response.text) | |
| selected_identifiers = selection_data.get("selected_image_identifiers", []) | |
| if len(selected_identifiers) != len(storyboard): | |
| raise ValueError("The AI did not select the correct number of images for the scenes.") | |
| selected_paths = [image_map[identifier] for identifier in selected_identifiers] | |
| return selected_paths | |
| except Exception as e: | |
| raise gr.Error(f"The Photographer (Gemini) failed to select images: {e}") | |
| def get_anticipatory_keyframe_prompt(self, global_prompt: str, scene_history: str, current_scene_desc: str, future_scene_desc: str, last_image_path: str, fixed_ref_paths: list[str]) -> str: | |
| """Delegated task: Acts as an Art Director to generate an image prompt.""" | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("anticipatory_keyframe_prompt.txt") | |
| director_prompt = template.format( | |
| historico_prompt=scene_history, | |
| cena_atual=current_scene_desc, | |
| cena_futura=future_scene_desc | |
| ) | |
| model_contents = [ | |
| "# CONTEXT:", | |
| f"- Global Story Goal: {global_prompt}", | |
| "# VISUAL ASSETS:", | |
| "Current Base Image [IMG-BASE]:", | |
| Image.open(last_image_path) | |
| ] | |
| ref_counter = 1 | |
| for path in fixed_ref_paths: | |
| if path != last_image_path: | |
| model_contents.extend([f"General Reference Image [IMG-REF-{ref_counter}]:", Image.open(path)]) | |
| ref_counter += 1 | |
| model_contents.append(director_prompt) | |
| logger.info("Calling Gemini to generate anticipatory keyframe prompt...") | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"Gemini responded with (raw keyframe prompt): {response.text}") | |
| final_flux_prompt = response.text.strip().replace("`", "").replace("\"", "") | |
| return final_flux_prompt | |
| except Exception as e: | |
| raise gr.Error(f"The Art Director (Gemini) failed: {e}") | |
| def get_cinematic_decision(self, global_prompt: str, story_history: str, | |
| past_keyframe_path: str, present_keyframe_path: str, future_keyframe_path: str, | |
| past_scene_desc: str, present_scene_desc: str, future_scene_desc: str) -> dict: | |
| """ | |
| Delegated task: Acts as a Film Director to make editing decisions and generate motion prompts. | |
| """ | |
| self._check_model() | |
| try: | |
| template = self._read_prompt_template("cinematic_director_prompt.txt") | |
| prompt_text = template.format( | |
| global_prompt=global_prompt, | |
| story_history=story_history, | |
| past_scene_desc=past_scene_desc, | |
| present_scene_desc=present_scene_desc, | |
| future_scene_desc=future_scene_desc | |
| ) | |
| model_contents = [ | |
| prompt_text, | |
| "[PAST_IMAGE]:", Image.open(past_keyframe_path), | |
| "[PRESENT_IMAGE]:", Image.open(present_keyframe_path), | |
| "[FUTURE_IMAGE]:", Image.open(future_keyframe_path) | |
| ] | |
| logger.info("Calling Gemini to generate cinematic decision...") | |
| response = self.model.generate_content(model_contents) | |
| logger.info(f"Gemini responded with (raw cinematic decision): {response.text}") | |
| decision_data = robust_json_parser(response.text) | |
| if "transition_type" not in decision_data or "motion_prompt" not in decision_data: | |
| raise ValueError("AI response (Cinematographer) is malformed. Missing 'transition_type' or 'motion_prompt'.") | |
| return decision_data | |
| except Exception as e: | |
| logger.error(f"The Film Director (Gemini) failed: {e}. Using fallback to 'continuous'.") | |
| return { | |
| "transition_type": "continuous", | |
| "motion_prompt": f"A smooth, continuous cinematic transition from '{present_scene_desc}' to '{future_scene_desc}'." | |
| } | |
| # --- Singleton Instance --- | |
| gemini_manager_singleton = GeminiManager() |