#!/usr/bin/env python3 # reliable_30fps_numpy.py - PyGame + Flask with NumPy surface arrays import pygame import numpy as np import time import threading import base64 import os import io from flask import Flask, Response, render_template_string # ===== 1. SETUP ===== os.environ['SDL_VIDEODRIVER'] = 'dummy' os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' PORT = int(os.getenv('PORT', 7860)) # Configuration WIDTH, HEIGHT = 800, 600 STREAM_WIDTH, STREAM_HEIGHT = 400, 300 TARGET_FPS = 30 # Shared state shared = { "latest_frame": None, "frame_count": 0, "capture_fps": 0, "streaming": True, "last_frame_time": time.time(), "frames_this_second": 0 } # ===== 2. CREATE FLASK APP ===== app = Flask(__name__) # ===== 3. HTML TEMPLATE ===== HTML = ''' PyGame + NumPy Streaming

PyGame + NumPy Surface Array Streaming

Using NumPy arrays for pixel manipulation

Live Stream
Capture FPS:0.0
Total Frames:0
Stream FPS:0.0
Latency:-- ms
Rendering with NumPy arrays • Port: ''' + str(PORT) + ''' • Target: 30 FPS
''' # ===== 4. FLASK ROUTES ===== @app.route('/') def index(): return render_template_string(HTML) @app.route('/stats') def stats(): return { 'frame_count': shared['frame_count'], 'capture_fps': shared['capture_fps'], 'timestamp': time.time(), 'has_frame': shared['latest_frame'] is not None } @app.route('/stream') def stream(): """Stream endpoint - returns the latest NumPy/surface frame.""" if shared['latest_frame']: try: # Decode base64 and serve image_data = base64.b64decode(shared['latest_frame']) return Response( image_data, mimetype='image/jpeg', headers={ 'Cache-Control': 'no-cache, no-store, must-revalidate', 'Pragma': 'no-cache', 'Expires': '0' } ) except Exception as e: print(f"Stream error: {e}") # Return empty response if no frame return Response(b'', mimetype='image/jpeg') # ===== 5. NUMPY SURFACE CAPTURE LOOP ===== def numpy_capture_loop(): """Capture loop using NumPy arrays for pixel manipulation.""" try: pygame.init() print("✅ PyGame initialized for NumPy rendering") except Exception as e: print(f"❌ PyGame init error: {e}") return # Create surfaces screen = pygame.Surface((WIDTH, HEIGHT)) # Create NumPy array for direct pixel access (optional, for advanced effects) # This creates a 3D array: [height, width, 3] for RGB pixel_array = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8) # Animation variables circle_x, circle_y = WIDTH // 2, HEIGHT // 2 circle_r = 30 speed_x, speed_y = 4, 3 # For gradient effect using NumPy x_coords = np.arange(WIDTH) y_coords = np.arange(HEIGHT) X, Y = np.meshgrid(x_coords, y_coords) fps_update_time = time.time() frames_since_update = 0 try: while shared["streaming"]: start_time = time.time() # === UPDATE POSITION === circle_x += speed_x circle_y += speed_y if circle_x - circle_r < 0 or circle_x + circle_r > WIDTH: speed_x *= -1 if circle_y - circle_r < 0 or circle_y + circle_r > HEIGHT: speed_y *= -1 # === METHOD 1: Traditional PyGame drawing (simpler) === screen.fill((25, 25, 45)) # Dark blue background # Draw bouncing circle pygame.draw.circle(screen, (255, 80, 80), (int(circle_x), int(circle_y)), circle_r) pygame.draw.circle(screen, (255, 255, 255), (int(circle_x), int(circle_y)), circle_r, 2) # Draw frame counter font = pygame.font.Font(None, 36) text = font.render(f"Frame: {shared['frame_count']}", True, (100, 255, 100)) screen.blit(text, (10, 10)) # Draw FPS fps_text = font.render(f"FPS: {shared['capture_fps']:.1f}", True, (100, 255, 100)) screen.blit(fps_text, (10, 50)) # === OPTIONAL: METHOD 2: NumPy pixel manipulation === # Uncomment this for advanced NumPy effects """ # Clear with gradient using NumPy gradient = 25 + (X * 0.03 + Y * 0.02).astype(np.uint8) pixel_array[:, :, 0] = gradient # Red channel pixel_array[:, :, 1] = gradient // 2 # Green channel pixel_array[:, :, 2] = 45 # Blue channel # Draw circle using NumPy (distance formula) distance = np.sqrt((X - circle_x)**2 + (Y - circle_y)**2) circle_mask = distance < circle_r # Set circle pixels to red pixel_array[circle_mask, 0] = 255 # Red pixel_array[circle_mask, 1] = 80 # Green pixel_array[circle_mask, 2] = 80 # Blue # Convert NumPy array to PyGame surface pygame.surfarray.blit_array(screen, pixel_array) """ # === SCALE AND SAVE FRAME === # Scale down for streaming scaled = pygame.transform.smoothscale(screen, (STREAM_WIDTH, STREAM_HEIGHT)) # Save to memory buffer buffer = io.BytesIO() pygame.image.save(scaled, buffer, 'JPEG') buffer.seek(0) # Encode to base64 for sharing image_bytes = buffer.getvalue() shared["latest_frame"] = base64.b64encode(image_bytes).decode('utf-8') # === UPDATE COUNTERS === shared["frame_count"] += 1 frames_since_update += 1 shared["frames_this_second"] += 1 # Calculate FPS current_time = time.time() if current_time - fps_update_time >= 1.0: shared["capture_fps"] = frames_since_update / (current_time - fps_update_time) frames_since_update = 0 fps_update_time = current_time shared["last_frame_time"] = current_time # === FRAME RATE CONTROL === elapsed = time.time() - start_time target_time = 1.0 / TARGET_FPS if elapsed < target_time: time.sleep(target_time - elapsed) except Exception as e: print(f"❌ Capture loop error: {e}") import traceback traceback.print_exc() pygame.quit() print("🛑 NumPy capture loop stopped") # ===== 6. MAIN FUNCTION ===== def main(): print("=" * 60) print("🚀 PyGame + NumPy Surface Array Streaming") print("=" * 60) print(f"Port: {PORT}") print(f"Render: {WIDTH}x{HEIGHT} → Stream: {STREAM_WIDTH}x{STREAM_HEIGHT}") print(f"Target FPS: {TARGET_FPS}") print("Using NumPy for pixel manipulation") print("=" * 60) # Start capture thread print("Starting NumPy capture thread...") capture_thread = threading.Thread(target=numpy_capture_loop, daemon=True) capture_thread.start() # Wait for first frame print("Waiting for first frame...") for i in range(30): if shared['latest_frame']: print(f"✅ First frame ready after {i*0.1:.1f}s") break time.sleep(0.1) print(f"Starting Flask server on port {PORT}...") print("=" * 60) try: app.run( host='0.0.0.0', port=PORT, debug=False, threaded=True, use_reloader=False ) except KeyboardInterrupt: print("\n👋 Interrupted") except Exception as e: print(f"❌ Server error: {e}") finally: shared['streaming'] = False capture_thread.join(timeout=2) print(f"\n📊 Final: {shared['frame_count']} frames @ {shared['capture_fps']:.1f} FPS") print("✅ Shutdown complete") if __name__ == "__main__": main()