""" json_utils.py - Production-safe JSON serialization for caching and hashing. Converts objects containing pandas DataFrames, numpy types, datetime objects, and other non-JSON-native types into deterministic, JSON-safe structures. Security and performance constraints: - DataFrame records are ONLY included for small frames (<= DATAFRAME_ROW_THRESHOLD). Large frames emit metadata + content_hash only (lightweight but collision-safe). - Unknown objects are rendered as ``str(obj)`` with a ``__type__`` tag. We do NOT recurse into arbitrary ``__dict__`` to avoid exposing internal state, credentials, or non-deterministic fields. - Circular references are detected via an ``id()``-based seen-set and replaced with a sentinel rather than causing infinite recursion. - Depth is capped at MAX_DEPTH to guard against pathologically nested structures. Determinism guarantees: - Dict keys are always stringified. - ``json.dumps`` uses ``sort_keys=True, separators=(",",":")`` (compact, no whitespace variance). - DataFrame columns AND rows are sorted before CSV hashing. - Meaningful indices (non-default) are promoted to columns so they participate in both sorting and hashing. - Sets/frozensets become sorted lists (elements must be comparable). - NaN / Inf are mapped to stable sentinel strings. - A null-byte sentinel is used for NA values in CSV output so that real data containing common marker strings cannot collide. """ from __future__ import annotations import hashlib import json import logging import math from datetime import date, datetime from typing import Any logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tunables # --------------------------------------------------------------------------- #: DataFrames with more rows than this threshold will NOT include inline #: records in the serialized output. The content_hash is always present #: so cache-key determinism is preserved regardless. DATAFRAME_ROW_THRESHOLD: int = 100 #: Hard recursion-depth limit. MAX_DEPTH: int = 30 #: NA sentinel for CSV hashing. #: Uses a null byte prefix + suffix so it CANNOT collide with real string #: data (null bytes are invalid in normal text / CSV cell values). _NA_SENTINEL: str = "\x00NA\x00" #: Canonical CSV serialization kwargs used by both DataFrame and Series #: hashing. These guarantee identical output across Windows/Linux/macOS #: and across pandas versions. _CANONICAL_CSV_KWARGS: dict[str, Any] = { "index": False, # emitted AFTER row sorting; index already handled "lineterminator": "\n", # force LF on ALL platforms (Windows default is CRLF) "float_format": "%.17g", # full double-precision roundtrip; avoids repr() variance "na_rep": _NA_SENTINEL, # null-byte sentinel; impossible in real text data "date_format": "%Y-%m-%dT%H:%M:%S.%f%z", # ISO-8601 with timezone, not locale-dependent } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _has_meaningful_index(df: Any) -> bool: """Return True if the DataFrame index carries semantic data. A "meaningful" index is one that is NOT a default RangeIndex (0, 1, 2, ..., n-1) with no name. Named RangeIndex, Int64Index with gaps, string indices, DatetimeIndex, etc. are all meaningful and must participate in the hash. """ import pandas as pd idx = df.index if not isinstance(idx, pd.RangeIndex): return True # RangeIndex, but has a name → user assigned semantic meaning if idx.name is not None: return True # RangeIndex(0, n, 1) with no name → default positional index return not (idx.start == 0 and idx.step == 1 and idx.stop == len(df)) def _prepare_for_hash(df: Any) -> Any: """Return a copy of *df* with deterministic row and column order. Strategy: 1. If the index is meaningful (named, non-default), promote it to regular column(s) via ``reset_index()`` so it participates in both sorting and hashing. 2. Sort columns alphabetically (by string name). 3. Sort rows by ALL column values (stable mergesort). - Primary: ``sort_values`` on all columns. - Fallback: if mixed/unhashable types prevent comparison, cast to string for sort-key derivation only. 4. Reset to a clean positional index (not included in CSV). Returns a DataFrame ready for ``to_csv(**_CANONICAL_CSV_KWARGS)``. """ import pandas as pd work = df.copy() # Step 1: promote meaningful index to columns if _has_meaningful_index(work): work = work.reset_index() # Step 2: sort columns alphabetically sorted_cols = sorted(work.columns.astype(str).tolist()) work = work.reindex(columns=sorted_cols) # Step 3: sort rows deterministically by all column values if not work.empty: try: work = work.sort_values( by=sorted_cols, kind="mergesort", na_position="last" ) except TypeError: # Mixed / unhashable types — fall back to string-based sort. str_keys = work[sorted_cols].astype(str).apply(tuple, axis=1) work = work.iloc[str_keys.argsort(kind="mergesort")] # Step 4: clean positional index (not emitted in CSV) work = work.reset_index(drop=True) return work def _stable_dataframe_hash(df: Any) -> str: """Return a SHA-256 hex digest of the DataFrame content. Cross-platform and cross-version stability is achieved by: - Promoting meaningful indices to columns before sorting. - Sorting rows by ALL column values (not just by index position). - Sorting columns alphabetically. - Using ``lineterminator='\\n'`` to prevent Windows CRLF variance. - Using ``float_format='%.17g'`` for consistent double precision. - Using a null-byte NA sentinel so real data cannot collide. - Using ``index=False`` after sorting (position is no longer data). - Using ISO-8601 date formatting to avoid locale/tz drift. """ prepared = _prepare_for_hash(df) csv_bytes = prepared.to_csv(**_CANONICAL_CSV_KWARGS).encode("utf-8") return hashlib.sha256(csv_bytes).hexdigest() def _stable_series_hash(series: Any) -> str: """Return a SHA-256 hex digest of a Series, using canonical CSV params. The Series is sorted by value (not index) for deterministic ordering. If the index is meaningful, its values are included by converting the Series to a single-column DataFrame first. """ import pandas as pd if _has_meaningful_index(series.to_frame()): # Promote index → column, hash as DataFrame return _stable_dataframe_hash(series.reset_index()) # Sort by values for deterministic order try: sorted_s = series.sort_values(kind="mergesort", na_position="last") except TypeError: sorted_s = series.iloc[series.astype(str).argsort(kind="mergesort")] csv_bytes = sorted_s.to_csv( index=False, lineterminator="\n", float_format="%.17g", na_rep=_NA_SENTINEL, date_format="%Y-%m-%dT%H:%M:%S.%f%z", ).encode("utf-8") return hashlib.sha256(csv_bytes).hexdigest() # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def make_json_safe( obj: Any, *, _depth: int = 0, _seen: set[int] | None = None, ) -> Any: """Recursively convert *obj* into a JSON-serializable structure. Supported conversions --------------------- ======================== ================================================ Source type Serialized form ======================== ================================================ ``pd.DataFrame`` (small) ``{__type__, columns, shape, dtypes, content_hash, records}`` ``pd.DataFrame`` (large) ``{__type__, columns, shape, dtypes, content_hash}`` -- **no records** ``pd.Series`` ``{__type__, name, length, content_hash}`` ``pd.Timestamp`` ISO-format string ``pd.Categorical`` plain list ``np.integer`` ``int`` ``np.floating`` ``float`` (NaN/Inf -> sentinels) ``np.bool_`` ``bool`` ``np.ndarray`` nested list ``np.generic`` native Python scalar via ``.item()`` ``datetime`` / ``date`` ISO-format string ``set`` / ``frozenset`` sorted list ``bytes`` / ``bytearray`` ``{__type__, hex, length}`` ``complex`` ``[real, imag]`` Unknown objects ``{__type__, repr}`` -- **no __dict__** ======================== ================================================ Parameters ---------- obj : Any The object to convert. Returns ------- Any A JSON-serializable equivalent of *obj*. """ # -- depth guard -- if _depth > MAX_DEPTH: return f"" # -- circular reference guard -- if _seen is None: _seen = set() # Only track mutable containers / complex objects by id. obj_id: int | None = None if isinstance(obj, (dict, list, set, frozenset)) or ( not isinstance(obj, (str, bytes, bytearray, int, float, bool, type(None))) and hasattr(obj, "__iter__") ): obj_id = id(obj) if obj_id in _seen: return f"" _seen.add(obj_id) try: return _convert(obj, _depth=_depth, _seen=_seen) finally: # Pop from seen-set so the same container in a *different* branch # of the tree is not falsely flagged. (Only true circular refs # -- where the object appears in its own descendant chain -- are # caught.) if obj_id is not None: _seen.discard(obj_id) def _convert( obj: Any, *, _depth: int, _seen: set[int], ) -> Any: """Core conversion dispatch (called inside the seen-set guard).""" next_depth = _depth + 1 # -- primitives (fast path) -- if obj is None or isinstance(obj, bool): return obj if isinstance(obj, int): return obj if isinstance(obj, str): return obj if isinstance(obj, float): if math.isnan(obj): return "__NaN__" if math.isinf(obj): return "__Inf__" if obj > 0 else "__-Inf__" return obj # -- stdlib containers -- if isinstance(obj, dict): return { str(k): make_json_safe(v, _depth=next_depth, _seen=_seen) for k, v in obj.items() } if isinstance(obj, (list, tuple)): return [ make_json_safe(item, _depth=next_depth, _seen=_seen) for item in obj ] if isinstance(obj, (set, frozenset)): converted = [ make_json_safe(item, _depth=next_depth, _seen=_seen) for item in obj ] # sorted() requires homogeneous comparable types; fall back to # str-keyed sort when elements are mixed. try: return sorted(converted) except TypeError: return sorted(converted, key=str) # -- datetime / date -- if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, date): return obj.isoformat() # -- bytes -- if isinstance(obj, (bytes, bytearray)): return {"__type__": "bytes", "hex": obj.hex(), "length": len(obj)} if isinstance(obj, complex): return [obj.real, obj.imag] # ------------------------------------------------------------------ # pandas (import-guarded so the module works without pandas) # ------------------------------------------------------------------ try: import pandas as pd if isinstance(obj, pd.DataFrame): content_hash = _stable_dataframe_hash(obj) nrows, ncols = obj.shape result: dict[str, Any] = { "__type__": "DataFrame", "shape": [nrows, ncols], "columns": sorted(obj.columns.tolist()), "dtypes": { str(col): str(dtype) for col, dtype in sorted(obj.dtypes.items()) }, "content_hash": content_hash, } # Only inline full records for small DataFrames. if nrows <= DATAFRAME_ROW_THRESHOLD: prepared = _prepare_for_hash(obj) result["records"] = make_json_safe( prepared.to_dict(orient="records"), _depth=next_depth, _seen=_seen, ) else: logger.debug( "DataFrame with %d rows exceeds threshold (%d); " "records omitted from serialization, content_hash retained.", nrows, DATAFRAME_ROW_THRESHOLD, ) return result if isinstance(obj, pd.Series): # Lightweight: hash + length, no full values dump for large Series. content_hash = _stable_series_hash(obj) length = len(obj) result_series: dict[str, Any] = { "__type__": "Series", "name": str(obj.name) if obj.name is not None else None, "length": length, "content_hash": content_hash, } if length <= DATAFRAME_ROW_THRESHOLD: result_series["values"] = make_json_safe( obj.tolist(), _depth=next_depth, _seen=_seen ) return result_series if isinstance(obj, pd.Timestamp): return obj.isoformat() if isinstance(obj, pd.Categorical): return make_json_safe( obj.tolist(), _depth=next_depth, _seen=_seen ) except ImportError: pass # ------------------------------------------------------------------ # numpy (import-guarded) # ------------------------------------------------------------------ try: import numpy as np if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): val = float(obj) if math.isnan(val): return "__NaN__" if math.isinf(val): return "__Inf__" if val > 0 else "__-Inf__" return val if isinstance(obj, np.bool_): return bool(obj) if isinstance(obj, np.ndarray): return make_json_safe( obj.tolist(), _depth=next_depth, _seen=_seen ) if isinstance(obj, np.generic): return make_json_safe( obj.item(), _depth=next_depth, _seen=_seen ) except ImportError: pass # ------------------------------------------------------------------ # Unknown objects -- SAFE FALLBACK (no __dict__ serialization) # # Rationale for NOT recursing into __dict__: # 1. Non-deterministic: internal state may include timestamps, # random IDs, thread-locals, or mutable caches. # 2. Security risk: credentials, API keys, or tokens could leak # into cache keys / stored values. # 3. Fragile: private attributes change across library versions # causing silent cache invalidation or collisions. # # The repr() output is sufficient for cache-key uniqueness in # practice, and the __type__ tag allows debugging. # ------------------------------------------------------------------ return { "__type__": type(obj).__qualname__, "repr": repr(obj), } # --------------------------------------------------------------------------- # Deterministic hashing # --------------------------------------------------------------------------- def deterministic_json_hash( obj: Any, *, algorithm: str = "sha256", prefix: str = "", ) -> str: """Return a deterministic hex-digest hash of *obj*. Pipeline: 1. ``make_json_safe(obj)`` -- normalise to JSON-safe primitives. 2. ``json.dumps(..., sort_keys=True, separators=(",",":"))`` -- canonical compact JSON with deterministic key ordering. 3. ``hashlib.new(algorithm, ...).hexdigest()`` -- hash the bytes. The same ``make_json_safe`` transform is used here as in ``CacheManager.set()``, so a cache key derived from a payload will always correspond to the transformation applied when storing that payload. Parameters ---------- obj : Any Arbitrary (possibly non-JSON-serializable) object to hash. algorithm : str Hash algorithm name accepted by :mod:`hashlib`. Default ``"sha256"``; ``"md5"`` accepted for backward compat. prefix : str Optional string prepended to the hex digest. Returns ------- str ``prefix + hex_digest`` """ safe = make_json_safe(obj) canonical = json.dumps( safe, sort_keys=True, separators=(",", ":") ).encode("utf-8") digest = hashlib.new(algorithm, canonical).hexdigest() return f"{prefix}{digest}"