#!/usr/bin/env python3
# reliable_30fps_numpy.py - PyGame + Flask with NumPy surface arrays
import pygame
import numpy as np
import time
import threading
import base64
import os
import io
from flask import Flask, Response, render_template_string
# ===== 1. SETUP =====
os.environ['SDL_VIDEODRIVER'] = 'dummy'
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
PORT = int(os.getenv('PORT', 7860))
# Configuration
WIDTH, HEIGHT = 800, 600
STREAM_WIDTH, STREAM_HEIGHT = 400, 300
TARGET_FPS = 30
# Shared state
shared = {
"latest_frame": None,
"frame_count": 0,
"capture_fps": 0,
"streaming": True,
"last_frame_time": time.time(),
"frames_this_second": 0
}
# ===== 2. CREATE FLASK APP =====
app = Flask(__name__)
# ===== 3. HTML TEMPLATE =====
HTML = '''
PyGame + NumPy Streaming
PyGame + NumPy Surface Array Streaming
Using NumPy arrays for pixel manipulation
Capture FPS:0.0
Total Frames:0
Stream FPS:0.0
Latency:-- ms
Rendering with NumPy arrays • Port: ''' + str(PORT) + ''' • Target: 30 FPS
'''
# ===== 4. FLASK ROUTES =====
@app.route('/')
def index():
return render_template_string(HTML)
@app.route('/stats')
def stats():
return {
'frame_count': shared['frame_count'],
'capture_fps': shared['capture_fps'],
'timestamp': time.time(),
'has_frame': shared['latest_frame'] is not None
}
@app.route('/stream')
def stream():
"""Stream endpoint - returns the latest NumPy/surface frame."""
if shared['latest_frame']:
try:
# Decode base64 and serve
image_data = base64.b64decode(shared['latest_frame'])
return Response(
image_data,
mimetype='image/jpeg',
headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'
}
)
except Exception as e:
print(f"Stream error: {e}")
# Return empty response if no frame
return Response(b'', mimetype='image/jpeg')
# ===== 5. NUMPY SURFACE CAPTURE LOOP =====
def numpy_capture_loop():
"""Capture loop using NumPy arrays for pixel manipulation."""
try:
pygame.init()
print("✅ PyGame initialized for NumPy rendering")
except Exception as e:
print(f"❌ PyGame init error: {e}")
return
# Create surfaces
screen = pygame.Surface((WIDTH, HEIGHT))
# Create NumPy array for direct pixel access (optional, for advanced effects)
# This creates a 3D array: [height, width, 3] for RGB
pixel_array = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
# Animation variables
circle_x, circle_y = WIDTH // 2, HEIGHT // 2
circle_r = 30
speed_x, speed_y = 4, 3
# For gradient effect using NumPy
x_coords = np.arange(WIDTH)
y_coords = np.arange(HEIGHT)
X, Y = np.meshgrid(x_coords, y_coords)
fps_update_time = time.time()
frames_since_update = 0
try:
while shared["streaming"]:
start_time = time.time()
# === UPDATE POSITION ===
circle_x += speed_x
circle_y += speed_y
if circle_x - circle_r < 0 or circle_x + circle_r > WIDTH:
speed_x *= -1
if circle_y - circle_r < 0 or circle_y + circle_r > HEIGHT:
speed_y *= -1
# === METHOD 1: Traditional PyGame drawing (simpler) ===
screen.fill((25, 25, 45)) # Dark blue background
# Draw bouncing circle
pygame.draw.circle(screen, (255, 80, 80), (int(circle_x), int(circle_y)), circle_r)
pygame.draw.circle(screen, (255, 255, 255), (int(circle_x), int(circle_y)), circle_r, 2)
# Draw frame counter
font = pygame.font.Font(None, 36)
text = font.render(f"Frame: {shared['frame_count']}", True, (100, 255, 100))
screen.blit(text, (10, 10))
# Draw FPS
fps_text = font.render(f"FPS: {shared['capture_fps']:.1f}", True, (100, 255, 100))
screen.blit(fps_text, (10, 50))
# === OPTIONAL: METHOD 2: NumPy pixel manipulation ===
# Uncomment this for advanced NumPy effects
"""
# Clear with gradient using NumPy
gradient = 25 + (X * 0.03 + Y * 0.02).astype(np.uint8)
pixel_array[:, :, 0] = gradient # Red channel
pixel_array[:, :, 1] = gradient // 2 # Green channel
pixel_array[:, :, 2] = 45 # Blue channel
# Draw circle using NumPy (distance formula)
distance = np.sqrt((X - circle_x)**2 + (Y - circle_y)**2)
circle_mask = distance < circle_r
# Set circle pixels to red
pixel_array[circle_mask, 0] = 255 # Red
pixel_array[circle_mask, 1] = 80 # Green
pixel_array[circle_mask, 2] = 80 # Blue
# Convert NumPy array to PyGame surface
pygame.surfarray.blit_array(screen, pixel_array)
"""
# === SCALE AND SAVE FRAME ===
# Scale down for streaming
scaled = pygame.transform.smoothscale(screen, (STREAM_WIDTH, STREAM_HEIGHT))
# Save to memory buffer
buffer = io.BytesIO()
pygame.image.save(scaled, buffer, 'JPEG')
buffer.seek(0)
# Encode to base64 for sharing
image_bytes = buffer.getvalue()
shared["latest_frame"] = base64.b64encode(image_bytes).decode('utf-8')
# === UPDATE COUNTERS ===
shared["frame_count"] += 1
frames_since_update += 1
shared["frames_this_second"] += 1
# Calculate FPS
current_time = time.time()
if current_time - fps_update_time >= 1.0:
shared["capture_fps"] = frames_since_update / (current_time - fps_update_time)
frames_since_update = 0
fps_update_time = current_time
shared["last_frame_time"] = current_time
# === FRAME RATE CONTROL ===
elapsed = time.time() - start_time
target_time = 1.0 / TARGET_FPS
if elapsed < target_time:
time.sleep(target_time - elapsed)
except Exception as e:
print(f"❌ Capture loop error: {e}")
import traceback
traceback.print_exc()
pygame.quit()
print("🛑 NumPy capture loop stopped")
# ===== 6. MAIN FUNCTION =====
def main():
print("=" * 60)
print("🚀 PyGame + NumPy Surface Array Streaming")
print("=" * 60)
print(f"Port: {PORT}")
print(f"Render: {WIDTH}x{HEIGHT} → Stream: {STREAM_WIDTH}x{STREAM_HEIGHT}")
print(f"Target FPS: {TARGET_FPS}")
print("Using NumPy for pixel manipulation")
print("=" * 60)
# Start capture thread
print("Starting NumPy capture thread...")
capture_thread = threading.Thread(target=numpy_capture_loop, daemon=True)
capture_thread.start()
# Wait for first frame
print("Waiting for first frame...")
for i in range(30):
if shared['latest_frame']:
print(f"✅ First frame ready after {i*0.1:.1f}s")
break
time.sleep(0.1)
print(f"Starting Flask server on port {PORT}...")
print("=" * 60)
try:
app.run(
host='0.0.0.0',
port=PORT,
debug=False,
threaded=True,
use_reloader=False
)
except KeyboardInterrupt:
print("\n👋 Interrupted")
except Exception as e:
print(f"❌ Server error: {e}")
finally:
shared['streaming'] = False
capture_thread.join(timeout=2)
print(f"\n📊 Final: {shared['frame_count']} frames @ {shared['capture_fps']:.1f} FPS")
print("✅ Shutdown complete")
if __name__ == "__main__":
main()