#!/usr/bin/env python3
# shared_memory_working.py - Working shared memory
import pygame
import numpy as np
import time
import threading
import os
from flask import Flask, Response
# ===== 1. SETUP =====
os.environ['SDL_VIDEODRIVER'] = 'dummy'
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
PORT = int(os.getenv('PORT', 7860))
WIDTH, HEIGHT = 400, 300
print(f"🚀 Starting on port {PORT}")
# ===== 2. SHARED MEMORY WITH DEFAULT DATA =====
# Create initial frame data immediately (BEFORE any threads start)
initial_frame = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
initial_frame[:, :] = [100, 150, 200] # Light blue
initial_bytes = initial_frame.tobytes()
shared_state = {
"raw_pixels": initial_bytes, # ALWAYS has data from the start
"frame_count": 0,
"streaming": True,
"lock": threading.Lock(),
"ready": True # Mark as ready immediately
}
# ===== 3. FLASK APP =====
app = Flask(__name__)
# ===== 4. SIMPLE HTML WITH VISIBLE TEST =====
HTML = '''
Shared Memory WORKING
🎯 SHARED MEMORY TEST - MUST SHOW IMAGE
Canvas should show colors below:
Server Status: Connected
Frames Received: 0
Last Update: Never
If you see a colored canvas above, it's working!
If it's black, check browser console (F12) for errors.
'''
@app.route('/')
def index():
return HTML
@app.route('/test')
def test():
"""Simple test endpoint"""
with shared_state['lock']:
return {
'status': 'ok',
'frame_count': shared_state['frame_count'],
'data_size': len(shared_state['raw_pixels']) if shared_state['raw_pixels'] else 0,
'time': time.time()
}
@app.route('/frame')
def get_frame():
"""Return frame data - ALWAYS returns data"""
with shared_state['lock']:
# ALWAYS return data, even if it's the initial frame
data = shared_state['raw_pixels']
frame_num = shared_state['frame_count']
print(f"📤 Serving frame {frame_num}, {len(data) if data else 0} bytes")
if data is None:
# This should never happen, but just in case
print("⚠️ WARNING: No frame data, sending fallback")
fallback = np.ones((HEIGHT, WIDTH, 3), dtype=np.uint8) * [255, 0, 0] # Red
data = fallback.tobytes()
return Response(
data,
mimetype='application/octet-stream',
headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0',
'Content-Type': 'application/octet-stream',
'X-Frame-Number': str(frame_num)
}
)
# ===== 5. RENDER THREAD - UPDATES SHARED MEMORY =====
def render_thread():
"""Update the shared memory with new frames"""
print("🎬 Starting render thread...")
try:
pygame.init()
print("✅ PyGame initialized")
except Exception as e:
print(f"❌ PyGame init failed: {e}")
# Continue anyway - we already have initial frame
# Create surface
surface = pygame.Surface((WIDTH, HEIGHT))
# Get pixel array
pixels_3d = pygame.surfarray.pixels3d(surface)
# Simple animation
x, y = WIDTH // 2, HEIGHT // 2
dx, dy = 3, 2
color_cycle = 0
while shared_state['streaming']:
try:
# Clear with gradient
for y_pos in range(HEIGHT):
gradient = 50 + int(y_pos / HEIGHT * 100)
pixels_3d[y_pos, :, 0] = gradient # Red
pixels_3d[y_pos, :, 1] = gradient // 2 # Green
pixels_3d[y_pos, :, 2] = 100 # Blue
# Update position
x += dx
y += dy
if x < 20 or x > WIDTH - 20:
dx = -dx
color_cycle = (color_cycle + 1) % 3
if y < 20 or y > HEIGHT - 20:
dy = -dy
color_cycle = (color_cycle + 1) % 3
# Draw ball
colors = [[255, 100, 100], [100, 255, 100], [100, 100, 255]]
current_color = colors[color_cycle]
# Simple circle drawing
for dy_circle in range(-20, 21):
for dx_circle in range(-20, 21):
if dx_circle*dx_circle + dy_circle*dy_circle <= 20*20:
px = x + dx_circle
py = y + dy_circle
if 0 <= px < WIDTH and 0 <= py < HEIGHT:
pixels_3d[py, px] = current_color
# Add text
font = pygame.font.Font(None, 24)
with shared_state['lock']:
frame_num = shared_state['frame_count']
time_text = font.render(time.strftime("%H:%M:%S"), True, (255, 255, 255))
frame_text = font.render(f"Frame: {frame_num}", True, (255, 255, 255))
# Blit text
surface.blit(time_text, (10, 10))
surface.blit(frame_text, (10, 40))
# Convert to bytes
raw_bytes = pixels_3d.tobytes()
# Update shared memory
with shared_state['lock']:
shared_state['raw_pixels'] = raw_bytes
shared_state['frame_count'] += 1
if frame_num % 30 == 0:
print(f"📸 Rendered frame {frame_num}")
time.sleep(1/30) # 30 FPS
except Exception as e:
print(f"⚠️ Render error: {e}")
time.sleep(0.1)
print("🛑 Render thread stopped")
# ===== 6. MAIN =====
def main():
print("="*60)
print("🎯 SHARED MEMORY SERVER STARTING")
print("="*60)
print(f"📡 Port: {PORT}")
print(f"🎨 Frame size: {WIDTH}x{HEIGHT}x3 = {WIDTH*HEIGHT*3} bytes")
print(f"📊 Initial data ready: {len(shared_state['raw_pixels'])} bytes")
print("="*60)
# Start render thread
renderer = threading.Thread(target=render_thread, daemon=True)
renderer.start()
# Give renderer time to start
time.sleep(0.5)
print("🌐 Starting Flask server...")
try:
app.run(
host='0.0.0.0',
port=PORT,
debug=False,
threaded=True,
use_reloader=False
)
except Exception as e:
print(f"❌ Server error: {e}")
finally:
shared_state['streaming'] = False
renderer.join(timeout=1)
print("✅ Server stopped")
if __name__ == "__main__":
main()