import pygame
import time
import threading
import os
from flask import Flask, Response, render_template_string
from io import BytesIO
from PIL import Image
os.environ['SDL_VIDEODRIVER'] = 'dummy'
pygame.init()
WIDTH, HEIGHT = 400, 300
shared = {
"frame_count": 0,
"streaming": True,
"x_position": 200,
"last_frame_bytes": None # Store frame as bytes in memory
}
def game_loop():
"""PyGame draws, PIL converts to web-ready JPEG"""
screen = pygame.Surface((WIDTH, HEIGHT))
x, y = 200, 150
speed = 5
while shared["streaming"]:
start = time.time()
# Update position
x += speed
if x < 30 or x > WIDTH-30:
speed *= -1
# PyGame: Draw to surface
screen.fill((25, 25, 45))
pygame.draw.circle(screen, (255, 80, 80), (int(x), int(y)), 20)
pygame.draw.circle(screen, (255, 255, 255), (int(x), int(y)), 20, 2)
# Convert PyGame surface to PIL Image
frame_str = pygame.image.tostring(screen, 'RGB')
img = Image.frombytes('RGB', (WIDTH, HEIGHT), frame_str)
# PIL: Optimize for web (small, fast JPEG)
buf = BytesIO()
img.save(buf, format='JPEG', quality=85, optimize=True)
# Store frame in memory (no filesystem!)
shared["frame_count"] += 1
shared["x_position"] = x
shared["last_frame_bytes"] = buf.getvalue()
# Log occasionally
if shared["frame_count"] % 10 == 0:
print(f"Frame {shared['frame_count']}: X={x}")
# Control FPS
elapsed = time.time() - start
if elapsed < 1.0/30:
time.sleep(1.0/30 - elapsed)
app = Flask(__name__)
HTML = '''
🎮 PyGame + PIL Stream
Frame: 0
Position: X=200
FPS: 0
'''
@app.route('/')
def index():
return render_template_string(HTML)
@app.route('/stats')
def stats():
"""Return game state as JSON"""
return {
'frame_count': shared['frame_count'],
'x': shared['x_position'],
'time': time.time()
}
@app.route('/frame')
def frame():
"""Serve the latest frame as JPEG"""
if shared["last_frame_bytes"]:
return Response(
shared["last_frame_bytes"],
mimetype='image/jpeg',
headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0',
'Last-Modified': time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime())
}
)
# Create a default frame if none exists yet
screen = pygame.Surface((WIDTH, HEIGHT))
screen.fill((25, 25, 45))
pygame.draw.circle(screen, (255, 80, 80), (200, 150), 20)
frame_str = pygame.image.tostring(screen, 'RGB')
img = Image.frombytes('RGB', (WIDTH, HEIGHT), frame_str)
buf = BytesIO()
img.save(buf, format='JPEG')
return Response(buf.getvalue(), mimetype='image/jpeg')
if __name__ == "__main__":
print("🚀 Starting PyGame + PIL stream...")
# Create initial frame
screen = pygame.Surface((WIDTH, HEIGHT))
screen.fill((25, 25, 45))
pygame.draw.circle(screen, (255, 80, 80), (200, 150), 20)
frame_str = pygame.image.tostring(screen, 'RGB')
img = Image.frombytes('RGB', (WIDTH, HEIGHT), frame_str)
buf = BytesIO()
img.save(buf, format='JPEG')
shared["last_frame_bytes"] = buf.getvalue()
# Start game loop
thread = threading.Thread(target=game_loop, daemon=True)
thread.start()
# Wait a moment for first update
time.sleep(0.5)
# Start server
app.run(
host='0.0.0.0',
port=int(os.environ.get('PORT', 7860)),
debug=False,
threaded=True
)