""" Video Processing Utilities Frame extraction, video saving, and control signal processing """ import numpy as np from PIL import Image, ImageFilter from typing import List, Union, Optional from pathlib import Path import tempfile def load_video_frames( video_path: str, max_frames: Optional[int] = None, resize: Optional[tuple] = None ) -> List[Image.Image]: """ Load video frames from file Args: video_path: Path to video file max_frames: Maximum frames to load (None = all) resize: Optional (width, height) to resize frames Returns: List of PIL Images """ import av frames = [] container = av.open(video_path) for frame in container.decode(video=0): img = frame.to_image() if resize: img = img.resize(resize, Image.Resampling.LANCZOS) frames.append(img) if max_frames and len(frames) >= max_frames: break container.close() print(f"Loaded {len(frames)} frames from {video_path}") return frames def save_video( frames: Union[List[Image.Image], List[np.ndarray]], output_path: str, fps: int = 16 ): """ Save frames as video file Args: frames: List of PIL Images or numpy arrays output_path: Output path (.mp4) fps: Frames per second """ import imageio # Convert PIL Images to numpy arrays if needed if isinstance(frames[0], Image.Image): frames = [np.array(f) for f in frames] # Ensure output directory exists Path(output_path).parent.mkdir(parents=True, exist_ok=True) # Save video writer = imageio.get_writer(output_path, fps=fps, codec='libx264') for frame in frames: writer.append_data(frame) writer.close() print(f"Saved {len(frames)} frames to {output_path} at {fps} fps") def extract_edges(image: Image.Image, low_threshold: int = 50, high_threshold: int = 150) -> Image.Image: """ Extract Canny edges from image Args: image: Input PIL Image low_threshold: Canny low threshold high_threshold: Canny high threshold Returns: Edge image (grayscale) """ import cv2 # Convert to numpy img_array = np.array(image.convert("RGB")) # Convert to grayscale gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) # Apply Canny edge detection edges = cv2.Canny(gray, low_threshold, high_threshold) # Convert back to RGB (3 channel) edges_rgb = cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB) return Image.fromarray(edges_rgb) def extract_depth_map(image: Image.Image) -> Image.Image: """ Extract approximate depth map using gradient magnitude Note: For production, use a proper depth estimation model Args: image: Input PIL Image Returns: Depth map image """ import cv2 img_array = np.array(image.convert("L")) # Compute gradient (Sobel) sobelx = cv2.Sobel(img_array, cv2.CV_64F, 1, 0, ksize=3) sobely = cv2.Sobel(img_array, cv2.CV_64F, 0, 1, ksize=3) # Magnitude magnitude = np.sqrt(sobelx**2 + sobely**2) # Normalize to 0-255 magnitude = (magnitude / magnitude.max() * 255).astype(np.uint8) # Convert to RGB depth_rgb = cv2.cvtColor(magnitude, cv2.COLOR_GRAY2RGB) return Image.fromarray(depth_rgb) def compute_frame_diff(frame1: Image.Image, frame2: Image.Image) -> float: """ Compute pixel difference between two frames Args: frame1: First frame frame2: Second frame Returns: Mean absolute difference (0-255 scale) """ arr1 = np.array(frame1.convert("RGB")).astype(float) arr2 = np.array(frame2.convert("RGB")).astype(float) return np.mean(np.abs(arr1 - arr2)) def compute_temporal_smoothness(frames: List[Image.Image]) -> dict: """ Compute temporal smoothness metrics for video frames Args: frames: List of video frames Returns: dict with smoothness metrics """ if len(frames) < 2: return {"mean_diff": 0, "max_diff": 0, "std_diff": 0} diffs = [] for i in range(len(frames) - 1): diff = compute_frame_diff(frames[i], frames[i + 1]) diffs.append(diff) return { "mean_diff": float(np.mean(diffs)), "max_diff": float(np.max(diffs)), "std_diff": float(np.std(diffs)), "num_frames": len(frames) } def compute_ssim(image1: Image.Image, image2: Image.Image) -> float: """ Compute Structural Similarity Index (SSIM) between two images Args: image1: First image image2: Second image Returns: SSIM value (0-1, higher is more similar) """ from skimage.metrics import structural_similarity arr1 = np.array(image1.convert("L")) arr2 = np.array(image2.convert("L")) # Resize if different sizes if arr1.shape != arr2.shape: from PIL import Image image2 = image2.resize(image1.size, Image.Resampling.LANCZOS) arr2 = np.array(image2.convert("L")) return structural_similarity(arr1, arr2) def create_test_video( num_frames: int = 16, width: int = 320, height: int = 240, output_path: Optional[str] = None ) -> str: """ Create a simple test video with moving shapes Args: num_frames: Number of frames width: Video width height: Video height output_path: Output path Returns: Path to created video """ from PIL import ImageDraw frames = [] for i in range(num_frames): # Create frame with gradient background img = Image.new("RGB", (width, height), color=(50, 50, 80)) draw = ImageDraw.Draw(img) # Moving circle x = int(width * (i / num_frames)) y = height // 2 radius = 30 draw.ellipse([x - radius, y - radius, x + radius, y + radius], fill=(255, 200, 100)) # Static rectangle draw.rectangle([20, 20, 80, 80], fill=(100, 200, 100)) frames.append(img) if output_path is None: output_path = tempfile.mktemp(suffix=".mp4") save_video(frames, output_path, fps=16) return output_path def get_video_info(video_path: str) -> dict: """ Get video metadata Args: video_path: Path to video file Returns: dict with video info """ import av container = av.open(video_path) stream = container.streams.video[0] info = { "width": stream.width, "height": stream.height, "fps": float(stream.average_rate) if stream.average_rate else 0, "duration_s": float(stream.duration * stream.time_base) if stream.duration else 0, "num_frames": stream.frames if stream.frames else 0, "codec": stream.codec_context.name } container.close() return info