| import os |
| import numpy as np |
| import pandas as pd |
| import geopandas as gpd |
| import osmnx as ox |
| import folium |
| from shapely.geometry import LineString, MultiLineString, Point |
| from shapely import ops |
| from sklearn.neighbors import BallTree |
| from typing import Tuple, List |
| from .utils import get_coordinates_from_network, sort_gps_by_greedy_path, add_weather_to_df |
| from .mock_predictor import MockTrafficPredictor |
| from geopy.distance import geodesic |
| import re |
| import math |
| from datetime import datetime |
| import sys |
| import os |
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| from model_v3.predict_road import RoadPredictor |
|
|
| |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| MODEL_PATH = os.path.join(PROJECT_ROOT, "model_v3", "final_lstm.pt") |
| ENCODER_PATH = os.path.join(PROJECT_ROOT, "model_v3", "final_encoder.pkl") |
|
|
| |
| if not os.path.exists(MODEL_PATH): |
| raise FileNotFoundError(f"Model file not found at: {MODEL_PATH}") |
| if not os.path.exists(ENCODER_PATH): |
| raise FileNotFoundError(f"Encoder file not found at: {ENCODER_PATH}") |
|
|
| DIST_THRESHOLD_METERS_MAX = 1200 |
| DIST_THRESHOLD_METERS_MIN = 10 |
|
|
|
|
| class RoadMapManager: |
|
|
| def __init__(self, city: str,bbox: Tuple[float,float,float,float], base_data_dir: str = "data"): |
| self.city = city |
| self.bbox = bbox |
| self.base_data_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", base_data_dir)) |
| self.city_path = os.path.join(self.base_data_dir, self.city) |
| self.coordinates_path = os.path.join(self.city_path, 'coordinates') |
| self.roads_path = os.path.join(self.city_path, 'roads') |
| self.road_network_path = os.path.join(self.city_path, 'maps') |
| self.visualizations_path = os.path.join(self.city_path, 'visualizations') |
|
|
| self.roads = dict() |
|
|
| self._validate_structure() |
| self._load_road_network(self.bbox) |
|
|
| |
| def _validate_structure(self): |
| for path in [self.coordinates_path, self.roads_path, self.road_network_path, self.visualizations_path]: |
| os.makedirs(path, exist_ok=True) |
| |
| @staticmethod |
| def split_road_name_direction(road_name: str) -> Tuple[str, str]: |
| parts = road_name.split() |
| return " ".join(parts[:-1]), parts[-1] |
| |
| def set_roads(self, roads: List[str]): |
| os.makedirs(self.coordinates_path, exist_ok=True) |
|
|
| for road in roads: |
| road_name, direction = self.split_road_name_direction(road) |
| file_name = f"{road_name} {direction}.csv" |
| file_path = os.path.join(self.coordinates_path,file_name) |
| |
| if os.path.exists(file_path): |
| print(f"DataFrame for {road_name} - {direction} already exists") |
| df = pd.read_csv(file_path) |
| else: |
| print(f"Downloading DataFrame for {road_name} - {direction}") |
| df = get_coordinates_from_network(self.road_network, road_name, direction) |
| df.to_csv(file_path, index=False) |
|
|
| self.roads[(road_name,direction)] = df |
|
|
| def get_roads(self): |
| """ |
| Used for testing |
| """ |
| for (road_name, direction), df in self.roads.items(): |
| print(f"road name: {road_name} - {direction}") |
| print(df.head()) |
| print("\n" + "-"*40 + "\n") |
|
|
| def _load_road_network(self, bbox: Tuple[float,float,float,float]): |
| network_filename = f"{self.city.replace(' ', '_')}_network.graphml" |
| network_path = os.path.join(self.road_network_path, network_filename) |
|
|
| if os.path.exists(network_path): |
| print("map already exists") |
| self.road_network = ox.load_graphml(network_path) |
| else: |
| print("Downloading map") |
| self.road_network = ox.graph_from_bbox( |
| bbox=bbox, |
| network_type='drive' |
| ) |
|
|
| self.road_network = ox.bearing.add_edge_bearings(self.road_network) |
|
|
| ox.save_graphml(self.road_network, filepath=network_path) |
|
|
| def apply_prediction_data(self, predict_time: datetime | None = None): |
| """ |
| Needed data to predict: |
| Gather data about weather in current day in time for each point |
| Speed limit - from the road network |
| Road name, Direction - key of the coordinates dict |
| Cooridnate from the coordinate df |
| Lanes - either from coordinate if given else from road network |
| Time - input from user |
| """ |
| predictions = {} |
| road_predictor = RoadPredictor(MODEL_PATH, ENCODER_PATH) |
| for (road_name, direction) in self.roads.keys(): |
| road_under = road_name.replace(" ", "_") |
| df = pd.read_csv(os.path.join(self.roads_path, f"{road_under}_{direction.lower()}.csv.gz"), compression='gzip') |
| predictions[(road_name, direction)] = road_predictor.predict_road_speeds(df, road_name, direction, predict_time) |
| |
| |
| self._map_predictions_to_roads(predictions) |
|
|
| for (road_name, direction), df in self.roads.items(): |
| print(df.head()) |
| df = sort_gps_by_greedy_path(df) |
| self.roads[(road_name, direction)] = df |
|
|
| def _map_predictions_to_roads(self, predictions: dict): |
| """ |
| Map predicted speeds to the closest points in self.roads coordinates. |
| |
| Args: |
| predictions: Dictionary with (road_name, direction) keys and prediction DataFrames as values |
| """ |
| from sklearn.neighbors import BallTree |
| |
| for (road_name, direction), road_df in self.roads.items(): |
| if (road_name, direction) not in predictions: |
| print(f"No predictions found for {road_name} {direction}") |
| continue |
| |
| pred_df = predictions[(road_name, direction)] |
| |
| if pred_df.empty: |
| print(f"Empty predictions for {road_name} {direction}") |
| continue |
| |
| |
| road_coords = road_df[['Latitude', 'Longitude']].values |
| |
| |
| pred_coords = pred_df[['Latitude', 'Longitude']].values |
| |
| |
| |
| road_coords_rad = np.radians(road_coords) |
| pred_coords_rad = np.radians(pred_coords) |
| |
| tree = BallTree(pred_coords_rad, metric='haversine') |
| |
| |
| distances, indices = tree.query(road_coords_rad, k=1) |
| |
| |
| distances_meters = distances.flatten() * 6371000 |
| |
| |
| closest_pred_speeds = pred_df.iloc[indices.flatten()]['predicted_speed'].values |
| |
| |
| if 'real_speed' in pred_df.columns: |
| closest_real_speeds = pred_df.iloc[indices.flatten()]['real_speed'].values |
| road_df['real_speed'] = closest_real_speeds |
| else: |
| road_df['real_speed'] = None |
| |
| |
| road_df['predicted_speed'] = closest_pred_speeds |
| road_df['prediction_distance_m'] = distances_meters |
| |
| |
| road_df['speed'] = road_df['predicted_speed'] |
| |
| |
| max_distance_threshold = 1000 |
| far_points = distances_meters > max_distance_threshold |
| |
| if far_points.any(): |
| print(f"Warning: {far_points.sum()} points in {road_name} {direction} are >{max_distance_threshold}m from predictions") |
| |
| road_df.loc[far_points, 'predicted_speed'] = road_df.loc[~far_points, 'predicted_speed'].mean() |
| road_df.loc[far_points, 'speed'] = road_df.loc[~far_points, 'speed'].mean() |
| |
| print(f"Mapped predictions for {road_name} {direction}: " |
| f"{len(road_df)} points, avg distance: {distances_meters.mean():.1f}m") |
|
|
| def get_prediction_statistics(self) -> dict: |
| """ |
| Get statistics about the prediction mapping for all roads. |
| |
| Returns: |
| Dictionary with statistics for each road |
| """ |
| stats = {} |
| |
| for (road_name, direction), road_df in self.roads.items(): |
| if 'predicted_speed' not in road_df.columns: |
| continue |
| |
| stats[(road_name, direction)] = { |
| 'total_points': len(road_df), |
| 'avg_predicted_speed': road_df['predicted_speed'].mean(), |
| 'min_predicted_speed': road_df['predicted_speed'].min(), |
| 'max_predicted_speed': road_df['predicted_speed'].max(), |
| 'avg_distance_to_prediction': road_df.get('prediction_distance_m', pd.Series([0])).mean(), |
| 'max_distance_to_prediction': road_df.get('prediction_distance_m', pd.Series([0])).max(), |
| 'points_with_predictions': road_df['predicted_speed'].notna().sum() |
| } |
| |
| return stats |
|
|
| def print_prediction_summary(self): |
| """Print a summary of prediction statistics for all roads.""" |
| stats = self.get_prediction_statistics() |
| |
| if not stats: |
| print("No prediction statistics available. Run apply_prediction_data() first.") |
| return |
| |
| print("\n" + "="*80) |
| print("PREDICTION MAPPING SUMMARY") |
| print("="*80) |
| |
| for (road_name, direction), stat in stats.items(): |
| print(f"\n{road_name} {direction}:") |
| print(f" Points: {stat['points_with_predictions']}/{stat['total_points']}") |
| print(f" Speed: {stat['avg_predicted_speed']:.1f} mph (range: {stat['min_predicted_speed']:.1f}-{stat['max_predicted_speed']:.1f})") |
| print(f" Avg distance to prediction: {stat['avg_distance_to_prediction']:.1f}m") |
| print(f" Max distance to prediction: {stat['max_distance_to_prediction']:.1f}m") |
|
|
| |
| |
| def draw_map(self): |
|
|
| def get_color(speed, max_speed): |
| if speed >= 0.85 * max_speed: |
| return '#00FF00' |
| elif speed >= 0.55 * max_speed: |
| return '#FFA500' |
| else: |
| return '#FF0000' |
| |
| center_lon = (self.bbox[0] + self.bbox[2]) / 2 |
| center_lat = (self.bbox[1] + self.bbox[3]) / 2 |
|
|
| m = folium.Map( |
| location=[center_lat, center_lon], |
| zoom_start=13, |
| tiles='CartoDB dark_matter' |
| ) |
|
|
| for (road_name, direction), df in self.roads.items(): |
| for i in range(len(df) - 1): |
| lat1, lon1, speed1 = df.loc[i, ['Latitude', 'Longitude', 'speed']] |
| lat2, lon2, speed2 = df.loc[i+1, ['Latitude', 'Longitude', 'speed']] |
| raw_speed = df.loc[i, 'maxspeed'] |
| match = re.search(r'\d+', str(raw_speed)) |
| if match: |
| max_speed = float(match.group()) |
| else: |
| max_speed = 60 |
|
|
| dist = geodesic((lat1, lon1), (lat2, lon2)).meters |
| if dist > DIST_THRESHOLD_METERS_MAX or dist < DIST_THRESHOLD_METERS_MIN: |
| continue |
|
|
| avg_speed = (speed1 + speed2) / 2 |
| color = get_color(avg_speed,max_speed) |
|
|
| folium.PolyLine( |
| locations=[(lat1, lon1), (lat2, lon2)], |
| color=color, |
| weight=1, |
| opacity=0.9 |
| ).add_to(m) |
|
|
| output_path = os.path.join(self.visualizations_path, "sorted_path.html") |
| m.save(output_path) |
| print("Saved map with distance filtering to 'sorted_path.html'") |
|
|
|
|
|
|
|
|
| def draw_map_offset(self): |
|
|
| def get_color(speed, max_speed): |
| if speed >= 0.85 * max_speed: |
| return '#00FF00' |
| elif speed >= 0.55 * max_speed: |
| return '#FFA500' |
| else: |
| return '#FF0000' |
|
|
| def get_maxspeed(raw_speed): |
| match = re.search(r'\d+', str(raw_speed)) |
| return float(match.group()) if match else 60 |
|
|
| def apply_offset(lat, lon, bearing, direction): |
| """Offset lat/lon a little perpendicular to bearing, based on direction.""" |
| offset_meters = -600 if direction.lower() in ["north", "east"] else 600 |
|
|
| |
| angle_rad = math.radians((bearing + 90) % 360) |
| delta_lat = offset_meters * math.cos(angle_rad) / 111111 |
| delta_lon = offset_meters * math.sin(angle_rad) / (111111 * math.cos(math.radians(lat))) |
|
|
| return lat + delta_lat, lon + delta_lon |
|
|
| |
| center_lon = (self.bbox[0] + self.bbox[2]) / 2 |
| center_lat = (self.bbox[1] + self.bbox[3]) / 2 |
|
|
| m = folium.Map( |
| location=[center_lat, center_lon], |
| zoom_start=13, |
| tiles='CartoDB dark_matter' |
| ) |
|
|
| |
| road_groups = {} |
| for (road_name, direction), df in self.roads.items(): |
| road_groups.setdefault(road_name, {})[direction] = df |
|
|
| for road_name, direction_map in road_groups.items(): |
| for direction, df in direction_map.items(): |
| for i in range(len(df) - 1): |
| lat1, lon1, speed1 = df.loc[i, ['Latitude', 'Longitude', 'speed']] |
| lat2, lon2, speed2 = df.loc[i + 1, ['Latitude', 'Longitude', 'speed']] |
| raw_speed = df.loc[i, 'maxspeed'] |
| max_speed = get_maxspeed(raw_speed) |
| bearing = df.loc[i, 'bearing'] if 'bearing' in df.columns else 0 |
|
|
| dist = geodesic((lat1, lon1), (lat2, lon2)).meters |
| if dist > DIST_THRESHOLD_METERS_MAX or dist < DIST_THRESHOLD_METERS_MIN: |
| continue |
|
|
| avg_speed = (speed1 + speed2) / 2 |
| color = get_color(avg_speed, max_speed) |
|
|
| |
| has_opposite = len(direction_map) > 1 |
| if has_opposite: |
| lat1, lon1 = apply_offset(lat1, lon1, bearing, direction) |
| lat2, lon2 = apply_offset(lat2, lon2, bearing, direction) |
|
|
| folium.PolyLine( |
| locations=[(lat1, lon1), (lat2, lon2)], |
| color=color, |
| weight=2, |
| opacity=0.95 |
| ).add_to(m) |
|
|
| output_path = os.path.join(self.visualizations_path, "direction_offset_map.html") |
| m.save(output_path) |
| print("✅ Saved map with directional offsets to 'direction_offset_map.html'") |
| return m |
|
|
| def draw_map_with_real_speed(self): |
| """ |
| Draw map using real speed data instead of predicted speed. |
| """ |
| def get_color(speed, max_speed): |
| if speed >= 0.85 * max_speed: |
| return '#00FF00' |
| elif speed >= 0.55 * max_speed: |
| return '#FFA500' |
| else: |
| return '#FF0000' |
|
|
| def get_maxspeed(raw_speed): |
| match = re.search(r'\d+', str(raw_speed)) |
| return float(match.group()) if match else 60 |
|
|
| def apply_offset(lat, lon, bearing, direction): |
| """Offset lat/lon a little perpendicular to bearing, based on direction.""" |
| offset_meters = -600 if direction.lower() in ["north", "east"] else 600 |
|
|
| |
| angle_rad = math.radians((bearing + 90) % 360) |
| delta_lat = offset_meters * math.cos(angle_rad) / 111111 |
| delta_lon = offset_meters * math.sin(angle_rad) / (111111 * math.cos(math.radians(lat))) |
|
|
| return lat + delta_lat, lon + delta_lon |
|
|
| |
| center_lon = (self.bbox[0] + self.bbox[2]) / 2 |
| center_lat = (self.bbox[1] + self.bbox[3]) / 2 |
|
|
| m = folium.Map( |
| location=[center_lat, center_lon], |
| zoom_start=13, |
| tiles='CartoDB dark_matter' |
| ) |
|
|
| |
| road_groups = {} |
| for (road_name, direction), df in self.roads.items(): |
| road_groups.setdefault(road_name, {})[direction] = df |
|
|
| for road_name, direction_map in road_groups.items(): |
| for direction, df in direction_map.items(): |
| for i in range(len(df) - 1): |
| lat1, lon1 = df.loc[i, ['Latitude', 'Longitude']] |
| lat2, lon2 = df.loc[i + 1, ['Latitude', 'Longitude']] |
| |
| |
| if 'real_speed' in df.columns and pd.notna(df.loc[i, 'real_speed']): |
| speed1 = df.loc[i, 'real_speed'] |
| speed2 = df.loc[i + 1, 'real_speed'] if i + 1 < len(df) and pd.notna(df.loc[i + 1, 'real_speed']) else speed1 |
| else: |
| speed1 = df.loc[i, 'speed'] |
| speed2 = df.loc[i + 1, 'speed'] |
| |
| raw_speed = df.loc[i, 'maxspeed'] |
| max_speed = get_maxspeed(raw_speed) |
| bearing = df.loc[i, 'bearing'] if 'bearing' in df.columns else 0 |
|
|
| dist = geodesic((lat1, lon1), (lat2, lon2)).meters |
| if dist > DIST_THRESHOLD_METERS_MAX or dist < DIST_THRESHOLD_METERS_MIN: |
| continue |
|
|
| avg_speed = (speed1 + speed2) / 2 |
| color = get_color(avg_speed, max_speed) |
|
|
| |
| has_opposite = len(direction_map) > 1 |
| if has_opposite: |
| lat1, lon1 = apply_offset(lat1, lon1, bearing, direction) |
| lat2, lon2 = apply_offset(lat2, lon2, bearing, direction) |
|
|
| folium.PolyLine( |
| locations=[(lat1, lon1), (lat2, lon2)], |
| color=color, |
| weight=2, |
| opacity=0.95 |
| ).add_to(m) |
|
|
| output_path = os.path.join(self.visualizations_path, "real_speed_map.html") |
| m.save(output_path) |
| print("✅ Saved map with real speed data to 'real_speed_map.html'") |
| return m |
|
|
| def draw_side_by_side_maps(self): |
| """ |
| Create side-by-side maps showing both predicted and real speeds. |
| Returns a tuple of (predicted_map, real_map) for use in Streamlit. |
| """ |
| |
| predicted_map = self.draw_map_offset() |
| |
| |
| real_map = self.draw_map_with_real_speed() |
| |
| return predicted_map, real_map |
|
|
|
|
| """ |
| mock_predictor = MockTrafficPredictor({ |
| 'I 405 North': 'moderate', |
| 'I 405 South': 'free', |
| 'US 101 North': 'busy', |
| 'US 101 South': 'free', |
| 'I 5 North': 'busy', |
| 'I 5 South': 'free', |
| 'I 10 East': 'moderate', |
| 'I 10 West': 'moderate', |
| 'I 110 North': 'busy', |
| 'I 110 South': 'busy', |
| 'CA 110 North': 'busy', |
| 'CA 110 South': 'busy', |
| 'CA 170 North': 'moderate', |
| 'CA 170 South': 'free', |
| 'CA 118 East': 'free', |
| 'CA 118 West': 'free', |
| 'CA 134 East': 'moderate', |
| 'CA 134 West': 'free', |
| 'CA 2 North': 'moderate', |
| 'CA 2 South': 'moderate', |
| 'I 605 North': 'busy', |
| 'I 605': 'free', |
| 'I 210 East' : 'free', |
| 'I 210 West' : 'busy' |
| }) |
| |
| if predict_time is None: |
| predict_time = datetime.now() |
| |
| for (road_name, direction), df in self.roads.items(): |
| #self.roads[(road_name, direction)] = add_weather_to_df(self.roads[(road_name, direction)], time = predict_time) |
| print(f"Mocking for {road_name} - {direction}") |
| df = mock_predictor.predict(df) |
| print(df.head()) |
| df = sort_gps_by_greedy_path(df) |
| self.roads[(road_name, direction)] = df |
| |
| """ |