flaskdockernew / app0.py
MySafeCode's picture
Rename app.py to app0.py
98c2bcf verified
Raw
History Blame
11.7 kB
#!/usr/bin/env python3
# reliable_30fps_numpy.py - PyGame + Flask with NumPy surface arrays
import pygame
import numpy as np
import time
import threading
import base64
import os
import io
from flask import Flask, Response, render_template_string
# ===== 1. SETUP =====
os.environ['SDL_VIDEODRIVER'] = 'dummy'
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
PORT = int(os.getenv('PORT', 7860))
# Configuration
WIDTH, HEIGHT = 800, 600
STREAM_WIDTH, STREAM_HEIGHT = 400, 300
TARGET_FPS = 30
# Shared state
shared = {
"latest_frame": None,
"frame_count": 0,
"capture_fps": 0,
"streaming": True,
"last_frame_time": time.time(),
"frames_this_second": 0
}
# ===== 2. CREATE FLASK APP =====
app = Flask(__name__)
# ===== 3. HTML TEMPLATE =====
HTML = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>PyGame + NumPy Streaming</title>
<style>
body { font-family: Arial; background: #0f172a; color: white; padding: 20px; margin: 0; }
.container { max-width: 800px; margin: 0 auto; text-align: center; }
h1 { color: #60a5fa; }
.image-container {
width: 400px;
height: 300px;
margin: 20px auto;
border: 3px solid #60a5fa;
background: #000;
overflow: hidden;
}
#streamImg {
width: 100%;
height: 100%;
object-fit: contain;
image-rendering: pixelated;
}
.stats {
display: inline-block;
background: #1e293b;
padding: 20px;
border-radius: 10px;
margin: 10px;
text-align: left;
}
.stat-row { margin: 10px 0; display: flex; justify-content: space-between; min-width: 200px; }
.stat-value { font-weight: bold; color: #60a5fa; }
.info { color: #94a3b8; margin-top: 20px; font-size: 0.9em; }
</style>
</head>
<body>
<div class="container">
<h1>PyGame + NumPy Surface Array Streaming</h1>
<p class="info">Using NumPy arrays for pixel manipulation</p>
<div class="image-container">
<img id="streamImg" src="/stream" alt="Live Stream">
</div>
<div class="stats">
<div class="stat-row"><span>Capture FPS:</span><span class="stat-value" id="captureFps">0.0</span></div>
<div class="stat-row"><span>Total Frames:</span><span class="stat-value" id="totalFrames">0</span></div>
<div class="stat-row"><span>Stream FPS:</span><span class="stat-value" id="streamFps">0.0</span></div>
<div class="stat-row"><span>Latency:</span><span class="stat-value" id="latency">-- ms</span></div>
</div>
<div class="info">
Rendering with NumPy arrays β€’ Port: ''' + str(PORT) + ''' β€’ Target: 30 FPS
</div>
</div>
<script>
let streamFrameCount = 0;
let lastStreamUpdate = Date.now();
let lastFrameTimestamp = 0;
function updateStream() {
const now = Date.now();
const img = document.getElementById('streamImg');
// Update image with anti-cache timestamp
img.src = '/stream?_=' + now;
// Calculate stream FPS
streamFrameCount++;
if (now - lastStreamUpdate >= 1000) {
const streamFps = streamFrameCount / ((now - lastStreamUpdate) / 1000);
document.getElementById('streamFps').textContent = streamFps.toFixed(1);
streamFrameCount = 0;
lastStreamUpdate = now;
}
// Get stats
fetch('/stats')
.then(r => r.json())
.then(data => {
document.getElementById('captureFps').textContent = data.capture_fps.toFixed(1);
document.getElementById('totalFrames').textContent = data.frame_count;
if (lastFrameTimestamp > 0) {
const latency = now - (data.timestamp * 1000);
document.getElementById('latency').textContent = Math.round(latency) + ' ms';
}
lastFrameTimestamp = data.timestamp * 1000;
});
}
// Start polling at 30 FPS
setInterval(updateStream, 33);
updateStream();
</script>
</body>
</html>
'''
# ===== 4. FLASK ROUTES =====
@app.route('/')
def index():
return render_template_string(HTML)
@app.route('/stats')
def stats():
return {
'frame_count': shared['frame_count'],
'capture_fps': shared['capture_fps'],
'timestamp': time.time(),
'has_frame': shared['latest_frame'] is not None
}
@app.route('/stream')
def stream():
"""Stream endpoint - returns the latest NumPy/surface frame."""
if shared['latest_frame']:
try:
# Decode base64 and serve
image_data = base64.b64decode(shared['latest_frame'])
return Response(
image_data,
mimetype='image/jpeg',
headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'
}
)
except Exception as e:
print(f"Stream error: {e}")
# Return empty response if no frame
return Response(b'', mimetype='image/jpeg')
# ===== 5. NUMPY SURFACE CAPTURE LOOP =====
def numpy_capture_loop():
"""Capture loop using NumPy arrays for pixel manipulation."""
try:
pygame.init()
print("βœ… PyGame initialized for NumPy rendering")
except Exception as e:
print(f"❌ PyGame init error: {e}")
return
# Create surfaces
screen = pygame.Surface((WIDTH, HEIGHT))
# Create NumPy array for direct pixel access (optional, for advanced effects)
# This creates a 3D array: [height, width, 3] for RGB
pixel_array = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
# Animation variables
circle_x, circle_y = WIDTH // 2, HEIGHT // 2
circle_r = 30
speed_x, speed_y = 4, 3
# For gradient effect using NumPy
x_coords = np.arange(WIDTH)
y_coords = np.arange(HEIGHT)
X, Y = np.meshgrid(x_coords, y_coords)
fps_update_time = time.time()
frames_since_update = 0
try:
while shared["streaming"]:
start_time = time.time()
# === UPDATE POSITION ===
circle_x += speed_x
circle_y += speed_y
if circle_x - circle_r < 0 or circle_x + circle_r > WIDTH:
speed_x *= -1
if circle_y - circle_r < 0 or circle_y + circle_r > HEIGHT:
speed_y *= -1
# === METHOD 1: Traditional PyGame drawing (simpler) ===
screen.fill((25, 25, 45)) # Dark blue background
# Draw bouncing circle
pygame.draw.circle(screen, (255, 80, 80), (int(circle_x), int(circle_y)), circle_r)
pygame.draw.circle(screen, (255, 255, 255), (int(circle_x), int(circle_y)), circle_r, 2)
# Draw frame counter
font = pygame.font.Font(None, 36)
text = font.render(f"Frame: {shared['frame_count']}", True, (100, 255, 100))
screen.blit(text, (10, 10))
# Draw FPS
fps_text = font.render(f"FPS: {shared['capture_fps']:.1f}", True, (100, 255, 100))
screen.blit(fps_text, (10, 50))
# === OPTIONAL: METHOD 2: NumPy pixel manipulation ===
# Uncomment this for advanced NumPy effects
"""
# Clear with gradient using NumPy
gradient = 25 + (X * 0.03 + Y * 0.02).astype(np.uint8)
pixel_array[:, :, 0] = gradient # Red channel
pixel_array[:, :, 1] = gradient // 2 # Green channel
pixel_array[:, :, 2] = 45 # Blue channel
# Draw circle using NumPy (distance formula)
distance = np.sqrt((X - circle_x)**2 + (Y - circle_y)**2)
circle_mask = distance < circle_r
# Set circle pixels to red
pixel_array[circle_mask, 0] = 255 # Red
pixel_array[circle_mask, 1] = 80 # Green
pixel_array[circle_mask, 2] = 80 # Blue
# Convert NumPy array to PyGame surface
pygame.surfarray.blit_array(screen, pixel_array)
"""
# === SCALE AND SAVE FRAME ===
# Scale down for streaming
scaled = pygame.transform.smoothscale(screen, (STREAM_WIDTH, STREAM_HEIGHT))
# Save to memory buffer
buffer = io.BytesIO()
pygame.image.save(scaled, buffer, 'JPEG')
buffer.seek(0)
# Encode to base64 for sharing
image_bytes = buffer.getvalue()
shared["latest_frame"] = base64.b64encode(image_bytes).decode('utf-8')
# === UPDATE COUNTERS ===
shared["frame_count"] += 1
frames_since_update += 1
shared["frames_this_second"] += 1
# Calculate FPS
current_time = time.time()
if current_time - fps_update_time >= 1.0:
shared["capture_fps"] = frames_since_update / (current_time - fps_update_time)
frames_since_update = 0
fps_update_time = current_time
shared["last_frame_time"] = current_time
# === FRAME RATE CONTROL ===
elapsed = time.time() - start_time
target_time = 1.0 / TARGET_FPS
if elapsed < target_time:
time.sleep(target_time - elapsed)
except Exception as e:
print(f"❌ Capture loop error: {e}")
import traceback
traceback.print_exc()
pygame.quit()
print("πŸ›‘ NumPy capture loop stopped")
# ===== 6. MAIN FUNCTION =====
def main():
print("=" * 60)
print("πŸš€ PyGame + NumPy Surface Array Streaming")
print("=" * 60)
print(f"Port: {PORT}")
print(f"Render: {WIDTH}x{HEIGHT} β†’ Stream: {STREAM_WIDTH}x{STREAM_HEIGHT}")
print(f"Target FPS: {TARGET_FPS}")
print("Using NumPy for pixel manipulation")
print("=" * 60)
# Start capture thread
print("Starting NumPy capture thread...")
capture_thread = threading.Thread(target=numpy_capture_loop, daemon=True)
capture_thread.start()
# Wait for first frame
print("Waiting for first frame...")
for i in range(30):
if shared['latest_frame']:
print(f"βœ… First frame ready after {i*0.1:.1f}s")
break
time.sleep(0.1)
print(f"Starting Flask server on port {PORT}...")
print("=" * 60)
try:
app.run(
host='0.0.0.0',
port=PORT,
debug=False,
threaded=True,
use_reloader=False
)
except KeyboardInterrupt:
print("\nπŸ‘‹ Interrupted")
except Exception as e:
print(f"❌ Server error: {e}")
finally:
shared['streaming'] = False
capture_thread.join(timeout=2)
print(f"\nπŸ“Š Final: {shared['frame_count']} frames @ {shared['capture_fps']:.1f} FPS")
print("βœ… Shutdown complete")
if __name__ == "__main__":
main()