""" Data loading and management for VPR plotting system. This module handles loading experiment data from JSON files and provides a clean interface for accessing results, statistics, and visualization data. """ import json import os import numpy as np def print_info(message): """Utility function to print informational messages""" print(f"[INFO] {message}") def print_key(message): """Utility function to print key messages""" print(f"[KEY] {message}") class ExperimentDataLoader: """Load and manage VPR experiment data from JSON files""" def __init__(self, output_dir): self.output_dir = output_dir self.results_summary = None self.visualization_data = None self.statistical_comparison = None self.advanced_analysis = None self.distance_matrices = {} self.similarity_scores = {} self.gps_data = None # Add GPS data storage # Load all available data self._load_results_summary() self._load_visualization_data() self._load_statistical_comparison() self._load_advanced_analysis() self._load_matrix_data() self._load_similarity_data() self._extract_gps_data() # Extract GPS data from loaded results def load_all_data(self): """Load all available experiment data""" success = True # Load core results if not self._load_results_summary(): success = False # Load optional data (don't fail if missing) self._load_visualization_data() self._load_statistical_comparison() self._load_advanced_analysis() self._load_matrix_data() self._load_similarity_data() return success def _load_results_summary(self): """Load the main results summary""" filepath = os.path.join(self.output_dir, "results_summary.json") if not os.path.exists(filepath): print_key(f"Error: Results summary not found: {filepath}") return False try: with open(filepath, 'r') as f: self.results_summary = json.load(f) print_info("✓ Results summary loaded") return True except Exception as e: print_key(f"Error loading results summary: {e}") return False def _load_visualization_data(self): """Load visualization data""" filepath = os.path.join(self.output_dir, "visualization_data.json") if os.path.exists(filepath): try: with open(filepath, 'r') as f: self.visualization_data = json.load(f) print_info("✓ Visualization data loaded") except Exception as e: print_key(f"Warning: Could not load visualization data: {e}") def _load_statistical_comparison(self): """Load statistical comparison data""" filepath = os.path.join(self.output_dir, "statistical_comparison.json") if os.path.exists(filepath): try: with open(filepath, 'r') as f: self.statistical_comparison = json.load(f) print_info("✓ Statistical comparison data loaded") except Exception as e: print_key(f"Warning: Could not load statistical comparison: {e}") def _load_advanced_analysis(self): """Load advanced analysis data""" filepath = os.path.join(self.output_dir, "advanced_analysis.json") if os.path.exists(filepath): try: with open(filepath, 'r') as f: self.advanced_analysis = json.load(f) print_info("✓ Advanced analysis data loaded") except Exception as e: print_key(f"Warning: Could not load advanced analysis: {e}") def _load_matrix_data(self): """Load distance matrix data for each method""" for method_key in self.get_method_keys(): filepath = os.path.join(self.output_dir, f"distance_matrix_{method_key}.json") if os.path.exists(filepath): try: with open(filepath, 'r') as f: data = json.load(f) self.distance_matrices[method_key] = np.array(data['distance_matrix']['data']) print_info(f"✓ Distance matrix loaded for {method_key}") except Exception as e: print_key(f"Warning: Could not load distance matrix for {method_key}: {e}") def _load_similarity_data(self): """Load similarity score data for each method""" for method_key in self.get_method_keys(): filepath = os.path.join(self.output_dir, f"similarity_scores_{method_key}.json") if os.path.exists(filepath): try: with open(filepath, 'r') as f: data = json.load(f) self.similarity_scores[method_key] = np.array(data['similarity_scores']['data']) print_info(f"✓ Similarity scores loaded for {method_key}") except Exception as e: print_key(f"Warning: Could not load similarity scores for {method_key}: {e}") def _extract_gps_data(self): """Extract GPS coordinates from visualization data if available""" if not self.results_summary: return # Check if experiment uses meter distances (GPS-based) exp_info = self.results_summary.get('experiment_info', {}) use_meter_distances = exp_info.get('use_meter_distances', False) distance_calc = exp_info.get('distance_calculation', '') if not use_meter_distances and distance_calc != 'meters': print_info("Experiment does not use GPS-based distances") return dataset_name = exp_info.get('dataset', '') if not dataset_name: print_key("Warning: No dataset name found in experiment info") return try: # Initialize GPS data structure self.gps_data = { 'use_meter_distances': True, 'available': True, 'dataset': dataset_name, 'distance_unit': 'meters' } # GPS coordinates are now embedded in prediction structs, no need to load separately # The plotting functions will extract coordinates directly from predictions print_info("✓ GPS data structure initialized (coordinates embedded in predictions)") except Exception as e: print_key(f"Warning: Could not extract GPS data: {e}") def _load_gps_coordinates_from_dataset(self): """Load actual GPS coordinates from dataset files""" if not self.gps_data or not self.gps_data.get('available'): return dataset_name = self.gps_data.get('dataset', '') if not dataset_name: return # Try to find the dataset directory dataset_path = None possible_paths = [ os.path.join(os.path.dirname(self.output_dir), 'data', dataset_name), os.path.join(os.path.dirname(os.path.dirname(self.output_dir)), 'data', dataset_name), os.path.join('data', dataset_name) ] for path in possible_paths: if os.path.exists(path): dataset_path = path break if not dataset_path: print_key(f"Warning: Could not find dataset directory for {dataset_name}") return # Try to load GPS files gps_files = ['summary_data_log.csv', 'full_data_log.csv', 'gps_data.csv', 'coordinates.csv'] for gps_file in gps_files: gps_path = os.path.join(dataset_path, gps_file) if os.path.exists(gps_path): try: # Load GPS data manually without pandas dependency coords = [] with open(gps_path, 'r') as f: lines = f.readlines() if len(lines) < 2: continue # Parse header to find longitude and latitude columns header = lines[0].strip().split(',') lon_idx = None lat_idx = None for i, col in enumerate(header): if 'longitude' in col.lower(): lon_idx = i elif 'latitude' in col.lower(): lat_idx = i if lon_idx is None or lat_idx is None: print_key(f"Warning: Could not find GPS columns in {gps_file}") continue # Parse GPS coordinates for line in lines[1:]: parts = line.strip().split(',') if len(parts) > max(lon_idx, lat_idx): try: lat = float(parts[lat_idx]) lon = float(parts[lon_idx]) coords.append((lat, lon)) except (ValueError, IndexError): continue if coords: # For VPR datasets, typically the first half are reference/database images # and the second half are query images total_coords = len(coords) split_point = total_coords // 2 # Assign based on VPR convention self.gps_data['database_coords'] = coords[:split_point] self.gps_data['query_coords'] = coords[split_point:] print_info(f"✓ GPS coordinates loaded from {gps_file}: {len(self.gps_data['database_coords'])} database, {len(self.gps_data['query_coords'])} query") return except Exception as e: print_key(f"Warning: Could not load GPS file {gps_file}: {e}") continue print_key(f"Warning: No valid GPS files found in {dataset_path}") def get_method_keys(self): """Get list of available method keys""" keys = set() # From visualization data if self.visualization_data and 'successful_methods' in self.visualization_data: for method in self.visualization_data['successful_methods']: keys.add(method.get('method_key', 'unknown')) # From similarity data keys.update(self.similarity_scores.keys()) return list(keys) def _enhance_predictions_with_gps(self, predictions, method_key): """Enhance similarity-based predictions with GPS coordinates""" if not self.gps_data or not predictions: return predictions database_coords = self.gps_data.get('database_coords', []) query_coords = self.gps_data.get('query_coords', []) if not database_coords or not query_coords: return predictions enhanced_predictions = [] for pred in predictions: enhanced_pred = pred.copy() query_idx = pred.get('query_index', 0) predicted_idx = pred.get('predicted_index', 0) # Add GPS coordinates if indices are valid if query_idx < len(query_coords) and predicted_idx < len(database_coords): try: query_coord = query_coords[query_idx] predicted_coord = database_coords[predicted_idx] # Calculate GPS-based ground truth (closest database coordinate to query) gt_idx = query_idx if query_idx < len(database_coords) else predicted_idx gt_coord = database_coords[gt_idx] # Calculate distance error using Haversine formula def haversine_distance(lat1, lon1, lat2, lon2): import math R = 6371000 # Earth radius in meters lat1_rad, lon1_rad = math.radians(lat1), math.radians(lon1) lat2_rad, lon2_rad = math.radians(lat2), math.radians(lon2) dlat, dlon = lat2_rad - lat1_rad, lon2_rad - lon1_rad a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2 return R * 2 * math.asin(math.sqrt(a)) distance_error = haversine_distance( query_coord[0], query_coord[1], predicted_coord[0], predicted_coord[1] ) enhanced_pred.update({ 'distance_error': distance_error, 'is_correct': distance_error < 25, # Default 25m tolerance 'gps_coordinates': { 'query_lat': query_coord[0], 'query_lon': query_coord[1], 'predicted_lat': predicted_coord[0], 'predicted_lon': predicted_coord[1], 'ground_truth_lat': gt_coord[0], 'ground_truth_lon': gt_coord[1] } }) except Exception as e: print_key(f"Warning: Could not add GPS coordinates for {method_key} prediction {query_idx}: {e}") enhanced_predictions.append(enhanced_pred) return enhanced_predictions if self.results_summary and 'method_results' in self.results_summary: return list(self.results_summary['method_results'].keys()) return [] def get_method_config(self, method_key): """Get configuration for a specific method""" if (self.results_summary and 'method_results' in self.results_summary and method_key in self.results_summary['method_results']): return self.results_summary['method_results'][method_key] return None def get_experiment_info(self): """Get general experiment information""" if self.results_summary and 'experiment_info' in self.results_summary: return self.results_summary['experiment_info'] return {} def get_distance_matrix(self, method_key): """Get distance matrix for a method""" return self.distance_matrices.get(method_key) def get_similarity_scores(self, method_key): """Get similarity scores for a method""" return self.similarity_scores.get(method_key) def get_method_data(self): """Get method data with consistent key handling, combining visualization and similarity data""" method_data = [] # First, get data from visualization (if available) if self.visualization_data: # Handle different key names for method data if 'successful_methods' in self.visualization_data: method_data.extend(self.visualization_data['successful_methods']) elif 'method results' in self.visualization_data: method_data.extend(self.visualization_data['method results']) elif 'methods' in self.visualization_data: method_data.extend(self.visualization_data['methods']) # Get method keys that are already in visualization data existing_method_keys = {method['method_key'] for method in method_data} # Add methods from similarity scores that aren't in visualization data for method_key, similarity_data in self.similarity_scores.items(): if method_key not in existing_method_keys: # Create method data from similarity scores config = similarity_data.get('config', { 'name': method_key, 'description': method_key.title() + ' (from similarity scores)', 'color': 'blue' # Default color }) # Create a basic method entry with similarity data method_entry = { 'method_key': method_key, 'config': config, 'predictions': [] # Will be populated by plots that need it } # If similarity matrix is available, we can create basic predictions if 'similarity_scores' in similarity_data and 'data' in similarity_data['similarity_scores']: sim_matrix = np.array(similarity_data['similarity_scores']['data']) # Create basic predictions (without GPS coordinates for now) predictions = [] for query_idx in range(sim_matrix.shape[1]): predicted_idx = int(np.argmax(sim_matrix[:, query_idx])) predictions.append({ 'query_index': query_idx, 'predicted_index': predicted_idx, 'method_key': method_key }) # Enhance with GPS coordinates method_entry['predictions'] = self._enhance_predictions_with_gps(predictions, method_key) method_data.append(method_entry) print_info(f"✓ Added {method_key} from similarity scores") return method_data def get_statistical_comparison_data(self): """Get statistical comparison data""" return self.statistical_comparison def get_advanced_analysis_data(self, method_key=None): """Get advanced analysis data""" if self.advanced_analysis is None: return None if method_key: methods = self.advanced_analysis.get('methods', {}) return methods.get(method_key) return self.advanced_analysis def has_gps_data(self): """Check if experiment used GPS-based distances""" exp_info = self.get_experiment_info() return exp_info.get('distance_calculation') == 'meters' def get_tolerance(self): """Get experiment tolerance value""" exp_info = self.get_experiment_info() return exp_info.get('tolerance', 25) def get_distance_unit(self): """Get distance unit (meters or frames)""" if self.has_gps_data(): return 'meters' return 'frames' def get_data_dict(self): """Get data in the format expected by plotting functions""" # Load GPS coordinates if available if self.gps_data and self.gps_data.get('available'): self._load_gps_coordinates_from_dataset() return { 'output_dir': self.output_dir, 'results_summary': self.results_summary, 'visualization_data': self.visualization_data, 'statistical_comparison': self.statistical_comparison, 'advanced_analysis': self.advanced_analysis, 'distance_matrices': self.distance_matrices, 'similarity_scores': self.similarity_scores, 'gps_data': self.gps_data, # Include GPS data 'method_data': self.get_method_data(), # Consistent method data access 'experiment_info': self.get_experiment_info(), 'method_keys': self.get_method_keys(), 'has_gps_data': self.has_gps_data(), 'tolerance': self.get_tolerance(), 'distance_unit': self.get_distance_unit() } def load_experiment_data(output_dir): """ Load experiment data from output directory Args: output_dir: Directory containing experiment results Returns: dict: Dictionary containing all loaded data, or None if loading failed """ if not os.path.exists(output_dir): print_key(f"Error: Output directory does not exist: {output_dir}") return None loader = ExperimentDataLoader(output_dir) if not loader.load_all_data(): print_key("Failed to load required experiment data") return None return loader.get_data_dict()