Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # bouncing_balls_pygame_flask.py - PyGame renders, Flask displays | |
| import pygame | |
| import numpy as np | |
| import time | |
| import threading | |
| import os | |
| from flask import Flask, Response | |
| # ===== 1. SETUP ===== | |
| os.environ['SDL_VIDEODRIVER'] = 'dummy' | |
| os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' | |
| PORT = int(os.getenv('PORT', 7860)) | |
| WIDTH, HEIGHT = 400, 300 | |
| print(f"π Starting PyGame + Flask on port {PORT}") | |
| # ===== 2. SHARED MEMORY ===== | |
| # Create initial black frame | |
| initial_frame = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8) | |
| shared_state = { | |
| "pixels": initial_frame, # NumPy array (300, 400, 3) | |
| "frame_count": 0, | |
| "streaming": True, | |
| "lock": threading.Lock() | |
| } | |
| # ===== 3. FLASK APP ===== | |
| app = Flask(__name__) | |
| HTML = '''<!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>PyGame Bouncing Balls</title> | |
| <style> | |
| body { margin: 0; padding: 20px; background: #111; color: white; font-family: Arial; text-align: center; } | |
| canvas { border: 3px solid #0af; background: black; display: block; margin: 20px auto; } | |
| .stats { background: #222; padding: 15px; border-radius: 8px; display: inline-block; margin: 10px; } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>π± PyGame Bouncing Balls</h1> | |
| <canvas id="canvas" width="400" height="300"></canvas> | |
| <div class="stats"> | |
| <div>Frames: <span id="frameCount">0</span></div> | |
| <div>Status: <span id="status">Loading...</span></div> | |
| </div> | |
| <script> | |
| const canvas = document.getElementById('canvas'); | |
| const ctx = canvas.getContext('2d'); | |
| function loadFrame() { | |
| fetch('/frame') | |
| .then(response => response.arrayBuffer()) | |
| .then(buffer => { | |
| // Convert RGB to RGBA | |
| const imageData = new ImageData(400, 300); | |
| const rgba = imageData.data; | |
| const rgb = new Uint8Array(buffer); | |
| for (let i = 0, j = 0; i < rgba.length; i += 4, j += 3) { | |
| rgba[i] = rgb[j]; | |
| rgba[i + 1] = rgb[j + 1]; | |
| rgba[i + 2] = rgb[j + 2]; | |
| rgba[i + 3] = 255; | |
| } | |
| ctx.putImageData(imageData, 0, 0); | |
| // Update stats | |
| fetch('/stats') | |
| .then(r => r.json()) | |
| .then(data => { | |
| document.getElementById('frameCount').textContent = data.frame_count; | |
| document.getElementById('status').textContent = 'Streaming β'; | |
| document.getElementById('status').style.color = '#0af'; | |
| }); | |
| }) | |
| .catch(error => { | |
| console.error('Error:', error); | |
| document.getElementById('status').textContent = 'Error'; | |
| document.getElementById('status').style.color = 'red'; | |
| }); | |
| } | |
| // Start streaming at 30 FPS | |
| setInterval(loadFrame, 33); | |
| loadFrame(); | |
| </script> | |
| </body> | |
| </html>''' | |
| def index(): | |
| return HTML | |
| def stats(): | |
| with shared_state['lock']: | |
| return {'frame_count': shared_state['frame_count']} | |
| def get_frame(): | |
| """Return raw RGB pixel data""" | |
| with shared_state['lock']: | |
| # Convert NumPy array to raw bytes | |
| pixel_bytes = shared_state['pixels'].tobytes() | |
| return Response( | |
| pixel_bytes, | |
| mimetype='application/octet-stream', | |
| headers={'Cache-Control': 'no-cache'} | |
| ) | |
| # ===== 4. PYGAME RENDER THREAD ===== | |
| def pygame_render(): | |
| """PyGame renders bouncing balls""" | |
| print("π¬ Starting PyGame render thread...") | |
| try: | |
| pygame.init() | |
| print("β PyGame initialized") | |
| except Exception as e: | |
| print(f"β PyGame init failed: {e}") | |
| return | |
| # Create PyGame surface | |
| surface = pygame.Surface((WIDTH, HEIGHT)) | |
| # Create 3 bouncing balls | |
| balls = [ | |
| {'x': 100, 'y': 150, 'dx': 3, 'dy': 2, 'radius': 20, 'color': (255, 50, 50)}, | |
| {'x': 200, 'y': 100, 'dx': -2, 'dy': 3, 'radius': 15, 'color': (50, 255, 50)}, | |
| {'x': 300, 'y': 200, 'dx': 4, 'dy': -1, 'radius': 25, 'color': (50, 50, 255)}, | |
| ] | |
| print("β Created 3 bouncing balls") | |
| while shared_state['streaming']: | |
| # Clear screen | |
| surface.fill((20, 20, 40)) | |
| # Update and draw each ball | |
| for ball in balls: | |
| # Update position | |
| ball['x'] += ball['dx'] | |
| ball['y'] += ball['dy'] | |
| # Bounce off walls | |
| if ball['x'] - ball['radius'] < 0 or ball['x'] + ball['radius'] > WIDTH: | |
| ball['dx'] = -ball['dx'] | |
| if ball['y'] - ball['radius'] < 0 or ball['y'] + ball['radius'] > HEIGHT: | |
| ball['dy'] = -ball['dy'] | |
| # Draw ball | |
| pygame.draw.circle( | |
| surface, | |
| ball['color'], | |
| (int(ball['x']), int(ball['y'])), | |
| ball['radius'] | |
| ) | |
| # Draw outline | |
| pygame.draw.circle( | |
| surface, | |
| (255, 255, 255), | |
| (int(ball['x']), int(ball['y'])), | |
| ball['radius'], | |
| 2 | |
| ) | |
| # Add frame counter | |
| font = pygame.font.Font(None, 24) | |
| with shared_state['lock']: | |
| frame_num = shared_state['frame_count'] | |
| text = font.render(f"Frame: {frame_num}", True, (255, 255, 255)) | |
| surface.blit(text, (10, 10)) | |
| # Convert surface to NumPy array | |
| pixel_array = pygame.surfarray.pixels3d(surface) | |
| # Update shared memory | |
| with shared_state['lock']: | |
| shared_state['pixels'] = pixel_array.copy() | |
| shared_state['frame_count'] += 1 | |
| # Control frame rate (30 FPS) | |
| time.sleep(1/30) | |
| pygame.quit() | |
| print("π PyGame render stopped") | |
| # ===== 5. MAIN ===== | |
| def main(): | |
| print("="*60) | |
| print("π― PyGame + Flask Bouncing Balls") | |
| print("="*60) | |
| print(f"π‘ Port: {PORT}") | |
| print(f"π¨ Resolution: {WIDTH}x{HEIGHT}") | |
| print("="*60) | |
| # Start PyGame render thread | |
| render_thread = threading.Thread(target=pygame_render, daemon=True) | |
| render_thread.start() | |
| # Wait a moment | |
| time.sleep(1) | |
| # Start Flask | |
| app.run(host='0.0.0.0', port=PORT, threaded=True, use_reloader=False) | |
| if __name__ == "__main__": | |
| main() |