| from geopandas import GeoDataFrame |
| from networkx import MultiDiGraph |
| import pandas as pd |
| import numpy as np |
| import osmnx as ox |
| from shapely.geometry import LineString, MultiLineString |
| from sklearn.neighbors import BallTree |
| import requests |
| from sklearn.cluster import KMeans |
| from datetime import datetime |
|
|
| def filter_by_direction(selected_road: GeoDataFrame, road_direction: str) -> GeoDataFrame: |
| if road_direction == 'North': |
| return selected_road[ |
| (selected_road['bearing'] >= 270) | (selected_road['bearing'] <= 90) |
| ] |
| elif road_direction == 'South': |
| return selected_road[ |
| (selected_road['bearing'] > 90) & (selected_road['bearing'] < 270) |
| ] |
| elif road_direction == 'East': |
| return selected_road[ |
| (selected_road['bearing'] >= 0) & (selected_road['bearing'] <= 180) |
| ] |
| elif road_direction == 'West': |
| return selected_road[ |
| (selected_road['bearing'] > 180) & (selected_road['bearing'] < 360) |
| ] |
| else: |
| raise ValueError(f"Invalid road_direction: {road_direction}. Must be one of: North, South, East, West.") |
|
|
| def add_weather_to_df(df: pd.DataFrame, num_clusters: int = 4 , api_key = 'FLMEW5QEEB8WT8YGUJXF6KAPK', time: datetime | None = None) -> pd.DataFrame: |
| if df.empty: |
| df['weather'] = None |
| return df |
| |
| if time is None: |
| time = datetime.now() |
| |
| coords = df[['Latitude', 'Longitude']].dropna().values |
| kmeans = KMeans(n_clusters=min(num_clusters, len(coords)), random_state=42) |
| df['weather_cluster'] = kmeans.fit_predict(coords) |
|
|
| weather_data = {} |
| date_str = time.strftime("%Y-%m-%d") |
| target_hour = time.strftime("%H:%M:%S") |
|
|
| for cluster_id in range(kmeans.n_clusters): |
| lat, lon = kmeans.cluster_centers_[cluster_id] |
| url = f"https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{lat},{lon}/{date_str}" |
| params = { |
| "key": api_key, |
| "unitGroup": "metric", |
| "contentType": "json" |
| } |
|
|
| try: |
| response = requests.get(url=url, params=params) |
| response.raise_for_status() |
| data = response.json() |
| hours = data.get("days", [{}])[0].get("hours", []) |
|
|
| def hour_diff(hour_entry): |
| try: |
| return abs(datetime.strptime(hour_entry["datetime"], "%H:%M:%S") - datetime.strptime(target_hour, "%H:%M:%S")) |
| except: |
| return datetime.max |
|
|
| if hours: |
| best_match = min(hours, key=hour_diff) |
| weather = best_match.get("conditions", "Unknown") |
| weather_time = best_match.get("datetime", None) |
| else: |
| weather = "Unknown" |
| weather_time = None |
|
|
| except Exception as e: |
| print(f"Weather api error for cluster {cluster_id}: {e}") |
| weather = "Unknown" |
| weather_time = None |
| |
| weather_data[cluster_id] = { |
| "conditions": weather, |
| "datetime": weather_time |
| } |
|
|
| df['time'] = time |
| df['weather'] = df['weather_cluster'].map(lambda x: weather_data[x]["conditions"]) |
| df['weather_time'] = df['weather_cluster'].map(lambda x: weather_data[x]["datetime"]) |
| df.drop(columns=['weather_cluster'], inplace=True) |
| return df |
|
|
| def get_coordinates_from_network(G : MultiDiGraph, road_name: str, road_direction: str): |
|
|
| edges = ox.graph_to_gdfs(G, nodes=False, edges=True) |
|
|
| edges_motorway = edges[edges['highway'].isin(['motorway', 'motorway_link'])] |
|
|
| selected_road = edges_motorway[ |
| edges_motorway['ref'].str.contains(road_name, na=False, case=False) |
| ] |
|
|
| selected_road = filter_by_direction(selected_road, road_direction) |
|
|
| rows = [] |
|
|
| for _, row in selected_road.iterrows(): |
| lanes = row.get("lanes", None) |
| maxspeed = row.get("maxspeed", None) |
| road_name = row.get("name", None) |
| ref = row.get("ref", None) |
| geometry = row.geometry |
|
|
| if isinstance(geometry, LineString): |
| coords = geometry.coords |
| elif isinstance(geometry, MultiLineString): |
| coords = [pt for line in geometry.geoms for pt in line.coords] |
| else: |
| continue |
|
|
| for lon, lat in coords: |
| rows.append({ |
| "Longitude": lon, |
| "Latitude": lat, |
| "lanes": lanes, |
| "maxspeed": maxspeed, |
| "road_name": road_name, |
| "ref": ref, |
| "direction" : road_direction |
| }) |
|
|
| |
| road_df = pd.DataFrame(rows) |
| print(f"Total points in {road_name} - {road_direction}: {len(road_df)}") |
| return road_df |
|
|
|
|
| def sort_gps_by_greedy_path(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Greedy nearest-neighbor sorting of GPS coordinates. |
| |
| Args: |
| df (pd.DataFrame): DataFrame with 'Latitude' and 'Longitude' columns. |
| |
| Returns: |
| pd.DataFrame: Reordered DataFrame. |
| """ |
| coords_rad = np.radians(df[['Latitude', 'Longitude']].values) |
| tree = BallTree(coords_rad, metric='haversine') |
|
|
| visited = np.zeros(len(df), dtype=bool) |
| path = [] |
| current_idx = 0 |
|
|
| for _ in range(len(df)): |
| visited[current_idx] = True |
| path.append(current_idx) |
|
|
| dist, ind = tree.query([coords_rad[current_idx]], k=len(df)) |
|
|
| for next_idx in ind[0]: |
| if not visited[next_idx]: |
| current_idx = next_idx |
| break |
|
|
| return df.iloc[path].reset_index(drop=True) |
|
|
|
|