chopratejas commited on
Commit
a67c352
·
1 Parent(s): 6abe499

perf(rust): tier-1 multi-worker wins — GIL release, sharded CCR store, single-serialize CCR write

Browse files

Three orthogonal hot-path fixes targeting concurrent-request throughput.
Each is independently bench-measured below; the proxy hot path benefits
from all three at once.

== 1. PyO3 GIL release on heavy compute ==

PyO3 methods (crush, smart_crush_content, crush_array_json,
compact_document_json, compress, compress_with_stats) used to hold the
GIL across the entire Rust call. Result: a 100ms compress() blocked
EVERY other Python thread for 100ms — multi-worker uvicorn deployments
serialized through SmartCrusher.

Wrap each compute call in `py.allow_threads(|| ...)`. Inputs (`&str`
from Python) are copied to owned `String` first because PyO3 ties them
to the GIL hold. PyDict construction stays on the GIL side.

Measured: 4 Python threads each running 20 crushes:
before (GIL held): ~3.3s wall (serialized — equivalent to 4×0.83s)
after (allow_threads): 826ms wall (4.01x speedup, perfect parallel)

== 2. CcrStore: Mutex<HashMap> -> DashMap-backed sharded ==

Single Mutex was the dominant bottleneck under multi-worker load — every
put/get serialized through one lock. Replace with DashMap (sharded
concurrent map, lock-free reads within a shard) plus a separate
small Mutex<VecDeque> for FIFO insertion-order eviction. Reads of
distinct keys never contend; writes only contend during the brief
order-queue push or capacity-sweep.

A/B bench (200 mixed put/get ops × N threads, in benches/ccr_store.rs):
Threads | DashMap Legacy Mutex Speedup
-------------------------------------------
1 | 63 µs 71 µs 1.13x
2 | 98 µs 194 µs 2.0x
4 | 178 µs 707 µs 4.0x
8 | 342 µs 1267 µs 3.7x

Legacy degrades ~linearly with thread count; DashMap stays near-flat
per-thread. Real multi-worker scaling.

== 3. Single-serialize the lossy CCR payload ==

The lossy `crush_array` path used to serialize the full array TWICE:
once in `hash_array_for_ccr` (allocates `Value::Array(items.to_vec())`,
deep-clones every Value subtree, then serializes), and a second time
in the store-write site. For a 50-item dict array that's ~MB of
allocator pressure per crushed array.

Introduce `canonical_array_json` (serializes `&[Value]` directly — same
bytes as `Value::Array(items.to_vec())` but no wrapper allocation +
no tree clone), call it ONCE per lossy path, then both hash and store
from those same bytes. Hash-format stable — all 17 parity fixtures
match byte-for-byte.

== Tests ==

- 8 ccr.rs unit tests including a new concurrent-stress test (8 threads
× 200 puts/gets, every key readable afterwards)
- 14 ccr_roundtrip integration tests stay green
- parity-run smart_crusher: 17/17 fixtures match
- 479 lib + 14 integration + 185 Python tests all pass
- New benches/ccr_store.rs runs the A/B and is committed for regression
visibility

== Dependencies added ==

- dashmap v6 (mature, widely-used in tokio/linkerd ecosystem)

Cargo.lock CHANGED
@@ -763,6 +763,20 @@ dependencies = [
763
  "serde",
764
  ]
765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
766
  [[package]]
767
  name = "data-encoding"
768
  version = "2.11.0"
@@ -1268,6 +1282,12 @@ dependencies = [
1268
  "zerocopy",
1269
  ]
1270
 
 
 
 
 
 
 
1271
  [[package]]
1272
  name = "hashbrown"
1273
  version = "0.15.5"
@@ -1302,6 +1322,7 @@ version = "0.1.0"
1302
  dependencies = [
1303
  "bytes",
1304
  "criterion",
 
1305
  "fastembed",
1306
  "flate2",
1307
  "hf-hub 0.4.3",
@@ -1914,6 +1935,15 @@ version = "1.0.0"
1914
  source = "registry+https://github.com/rust-lang/crates.io-index"
1915
  checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
1916
 
 
 
 
 
 
 
 
 
 
1917
  [[package]]
1918
  name = "log"
1919
  version = "0.4.29"
@@ -2358,6 +2388,19 @@ dependencies = [
2358
  "ureq 3.3.0",
2359
  ]
2360
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2361
  [[package]]
2362
  name = "paste"
2363
  version = "1.0.15"
@@ -2855,6 +2898,15 @@ dependencies = [
2855
  "crossbeam-utils",
2856
  ]
2857
 
 
 
 
 
 
 
 
 
 
2858
  [[package]]
2859
  name = "redox_users"
2860
  version = "0.5.2"
@@ -3076,6 +3128,12 @@ dependencies = [
3076
  "windows-sys 0.61.2",
3077
  ]
3078
 
 
 
 
 
 
 
3079
  [[package]]
3080
  name = "security-framework"
3081
  version = "3.7.0"
 
763
  "serde",
764
  ]
765
 
766
+ [[package]]
767
+ name = "dashmap"
768
+ version = "6.1.0"
769
+ source = "registry+https://github.com/rust-lang/crates.io-index"
770
+ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
771
+ dependencies = [
772
+ "cfg-if",
773
+ "crossbeam-utils",
774
+ "hashbrown 0.14.5",
775
+ "lock_api",
776
+ "once_cell",
777
+ "parking_lot_core",
778
+ ]
779
+
780
  [[package]]
781
  name = "data-encoding"
782
  version = "2.11.0"
 
1282
  "zerocopy",
1283
  ]
1284
 
1285
+ [[package]]
1286
+ name = "hashbrown"
1287
+ version = "0.14.5"
1288
+ source = "registry+https://github.com/rust-lang/crates.io-index"
1289
+ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
1290
+
1291
  [[package]]
1292
  name = "hashbrown"
1293
  version = "0.15.5"
 
1322
  dependencies = [
1323
  "bytes",
1324
  "criterion",
1325
+ "dashmap",
1326
  "fastembed",
1327
  "flate2",
1328
  "hf-hub 0.4.3",
 
1935
  source = "registry+https://github.com/rust-lang/crates.io-index"
1936
  checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
1937
 
1938
+ [[package]]
1939
+ name = "lock_api"
1940
+ version = "0.4.14"
1941
+ source = "registry+https://github.com/rust-lang/crates.io-index"
1942
+ checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
1943
+ dependencies = [
1944
+ "scopeguard",
1945
+ ]
1946
+
1947
  [[package]]
1948
  name = "log"
1949
  version = "0.4.29"
 
2388
  "ureq 3.3.0",
2389
  ]
2390
 
2391
+ [[package]]
2392
+ name = "parking_lot_core"
2393
+ version = "0.9.12"
2394
+ source = "registry+https://github.com/rust-lang/crates.io-index"
2395
+ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
2396
+ dependencies = [
2397
+ "cfg-if",
2398
+ "libc",
2399
+ "redox_syscall",
2400
+ "smallvec",
2401
+ "windows-link",
2402
+ ]
2403
+
2404
  [[package]]
2405
  name = "paste"
2406
  version = "1.0.15"
 
2898
  "crossbeam-utils",
2899
  ]
2900
 
2901
+ [[package]]
2902
+ name = "redox_syscall"
2903
+ version = "0.5.18"
2904
+ source = "registry+https://github.com/rust-lang/crates.io-index"
2905
+ checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
2906
+ dependencies = [
2907
+ "bitflags",
2908
+ ]
2909
+
2910
  [[package]]
2911
  name = "redox_users"
2912
  version = "0.5.2"
 
3128
  "windows-sys 0.61.2",
3129
  ]
3130
 
3131
+ [[package]]
3132
+ name = "scopeguard"
3133
+ version = "1.2.0"
3134
+ source = "registry+https://github.com/rust-lang/crates.io-index"
3135
+ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
3136
+
3137
  [[package]]
3138
  name = "security-framework"
3139
  version = "3.7.0"
crates/headroom-core/Cargo.toml CHANGED
@@ -29,6 +29,11 @@ md-5 = "0.10"
29
  # `sha2` for `_hash_field_name` in smart_crusher (SHA256 truncated to 16
30
  # hex chars). Python uses `hashlib.sha256` so we need byte-exact parity.
31
  sha2 = "0.10"
 
 
 
 
 
32
  # `regex` is already a transitive dep of tokenizers; depend on it directly so
33
  # our hunk-header parser and priority-pattern matcher have a stable surface.
34
  regex = "1"
@@ -57,3 +62,7 @@ criterion = { version = "0.5", features = ["html_reports"] }
57
  [[bench]]
58
  name = "tokenizer"
59
  harness = false
 
 
 
 
 
29
  # `sha2` for `_hash_field_name` in smart_crusher (SHA256 truncated to 16
30
  # hex chars). Python uses `hashlib.sha256` so we need byte-exact parity.
31
  sha2 = "0.10"
32
+ # `dashmap` for the CCR storage backend. Concurrent HashMap with sharded
33
+ # locking — distinct keys hashed to different shards never contend, so
34
+ # multi-worker proxy load doesn't queue on a single Mutex. Lock-free
35
+ # reads inside each shard via RwLock semantics.
36
+ dashmap = "6"
37
  # `regex` is already a transitive dep of tokenizers; depend on it directly so
38
  # our hunk-header parser and priority-pattern matcher have a stable surface.
39
  regex = "1"
 
62
  [[bench]]
63
  name = "tokenizer"
64
  harness = false
65
+
66
+ [[bench]]
67
+ name = "ccr_store"
68
+ harness = false
crates/headroom-core/benches/ccr_store.rs ADDED
@@ -0,0 +1,224 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ //! CCR store throughput benchmark — single-threaded and multi-threaded.
2
+ //!
3
+ //! Pins the win from PR9: replacing the single-`Mutex<HashMap>` design
4
+ //! with a `DashMap`-backed sharded store. The single-threaded numbers
5
+ //! should be roughly comparable (DashMap has a small per-op shard-hash
6
+ //! overhead vs a raw Mutex), but the multi-threaded numbers should
7
+ //! diverge sharply — distinct keys hit distinct shards and never
8
+ //! contend.
9
+ //!
10
+ //! Run with:
11
+ //! cargo bench -p headroom-core --bench ccr_store
12
+ //!
13
+ //! The critical numbers to watch are the `mt/N=8` rows: with the
14
+ //! Mutex design, all 8 threads serialize on one lock, so throughput
15
+ //! is ~1× the single-threaded figure. With DashMap, throughput should
16
+ //! scale near-linearly with cores.
17
+
18
+ use std::collections::{HashMap, VecDeque};
19
+ use std::sync::{Arc, Mutex};
20
+ use std::thread;
21
+ use std::time::{Duration, Instant};
22
+
23
+ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
24
+ use headroom_core::ccr::{CcrStore, InMemoryCcrStore};
25
+
26
+ // ─── Baseline: the old single-Mutex<HashMap> design ────────────────
27
+ //
28
+ // Inlined here so the bench is self-contained and shows the
29
+ // before/after gap directly. Same trait, same semantics; the only
30
+ // difference is "all ops serialize on one Mutex" vs "DashMap-sharded".
31
+
32
+ struct LegacyMutexStore {
33
+ inner: Mutex<LegacyInner>,
34
+ ttl: Duration,
35
+ capacity: usize,
36
+ }
37
+
38
+ struct LegacyInner {
39
+ map: HashMap<String, LegacyEntry>,
40
+ order: VecDeque<String>,
41
+ }
42
+
43
+ struct LegacyEntry {
44
+ payload: String,
45
+ inserted: Instant,
46
+ }
47
+
48
+ impl LegacyMutexStore {
49
+ fn new(capacity: usize, ttl: Duration) -> Self {
50
+ Self {
51
+ inner: Mutex::new(LegacyInner {
52
+ map: HashMap::new(),
53
+ order: VecDeque::new(),
54
+ }),
55
+ ttl,
56
+ capacity,
57
+ }
58
+ }
59
+ }
60
+
61
+ impl CcrStore for LegacyMutexStore {
62
+ fn put(&self, hash: &str, payload: &str) {
63
+ let mut g = self.inner.lock().unwrap();
64
+ if g.map.contains_key(hash) {
65
+ g.map.insert(
66
+ hash.to_string(),
67
+ LegacyEntry {
68
+ payload: payload.to_string(),
69
+ inserted: Instant::now(),
70
+ },
71
+ );
72
+ return;
73
+ }
74
+ while g.map.len() >= self.capacity {
75
+ let Some(oldest) = g.order.pop_front() else {
76
+ break;
77
+ };
78
+ g.map.remove(&oldest);
79
+ }
80
+ g.map.insert(
81
+ hash.to_string(),
82
+ LegacyEntry {
83
+ payload: payload.to_string(),
84
+ inserted: Instant::now(),
85
+ },
86
+ );
87
+ g.order.push_back(hash.to_string());
88
+ }
89
+
90
+ fn get(&self, hash: &str) -> Option<String> {
91
+ let mut g = self.inner.lock().unwrap();
92
+ let expired = match g.map.get(hash) {
93
+ Some(e) => e.inserted.elapsed() > self.ttl,
94
+ None => return None,
95
+ };
96
+ if expired {
97
+ g.map.remove(hash);
98
+ return None;
99
+ }
100
+ g.map.get(hash).map(|e| e.payload.clone())
101
+ }
102
+
103
+ fn len(&self) -> usize {
104
+ self.inner.lock().unwrap().map.len()
105
+ }
106
+ }
107
+
108
+ fn bench_put_single_threaded(c: &mut Criterion) {
109
+ let store = InMemoryCcrStore::new();
110
+ let payload = "x".repeat(512); // typical CCR payload size
111
+
112
+ let mut group = c.benchmark_group("ccr_store/put_st");
113
+ group.throughput(Throughput::Elements(1));
114
+ group.bench_function("new_keys", |b| {
115
+ let mut i = 0u64;
116
+ b.iter(|| {
117
+ let key = format!("k{i:012x}");
118
+ store.put(black_box(&key), black_box(&payload));
119
+ i += 1;
120
+ });
121
+ });
122
+ group.bench_function("same_key_overwrite", |b| {
123
+ b.iter(|| {
124
+ store.put(black_box("hot_key"), black_box(&payload));
125
+ });
126
+ });
127
+ group.finish();
128
+ }
129
+
130
+ fn bench_get_single_threaded(c: &mut Criterion) {
131
+ let store = InMemoryCcrStore::new();
132
+ let payload = "y".repeat(512);
133
+ for i in 0..1000u32 {
134
+ let key = format!("k{i:08x}");
135
+ store.put(&key, &payload);
136
+ }
137
+
138
+ let mut group = c.benchmark_group("ccr_store/get_st");
139
+ group.throughput(Throughput::Elements(1));
140
+ group.bench_function("hit", |b| {
141
+ let mut i = 0u32;
142
+ b.iter(|| {
143
+ let key = format!("k{:08x}", i % 1000);
144
+ let _ = black_box(store.get(black_box(&key)));
145
+ i = i.wrapping_add(1);
146
+ });
147
+ });
148
+ group.bench_function("miss", |b| {
149
+ let mut i = 0u32;
150
+ b.iter(|| {
151
+ let key = format!("absent_{i}");
152
+ let _ = black_box(store.get(black_box(&key)));
153
+ i = i.wrapping_add(1);
154
+ });
155
+ });
156
+ group.finish();
157
+ }
158
+
159
+ fn run_mt_workload(store: Arc<dyn CcrStore>, threads: usize, n: u64) -> Duration {
160
+ const ITERS_PER_THREAD: usize = 200;
161
+ let payload = Arc::new("z".repeat(256));
162
+ for i in 0..256u32 {
163
+ store.put(&format!("warm_{i:08x}"), &payload);
164
+ }
165
+ let start = Instant::now();
166
+ for _ in 0..n {
167
+ thread::scope(|scope| {
168
+ for tid in 0..threads {
169
+ let s = store.clone();
170
+ let p = payload.clone();
171
+ scope.spawn(move || {
172
+ for i in 0..ITERS_PER_THREAD {
173
+ if i & 1 == 0 {
174
+ let k = format!("t{tid}_k{i:08x}");
175
+ s.put(&k, &p);
176
+ } else {
177
+ let k = format!("warm_{:08x}", i % 256);
178
+ let _ = s.get(&k);
179
+ }
180
+ }
181
+ });
182
+ }
183
+ });
184
+ }
185
+ start.elapsed()
186
+ }
187
+
188
+ /// Multi-threaded mixed put/get — direct A/B between the legacy
189
+ /// `Mutex<HashMap>` design and the new DashMap-backed store. The
190
+ /// legacy version serializes every op on one lock; the new version
191
+ /// shards across keys so distinct hashes never contend.
192
+ fn bench_mixed_multi_threaded(c: &mut Criterion) {
193
+ let mut group = c.benchmark_group("ccr_store/mt_mixed");
194
+ group.throughput(Throughput::Elements(1));
195
+
196
+ for &threads in &[1usize, 2, 4, 8] {
197
+ // DashMap-backed (current design).
198
+ let label = format!("dashmap/threads={threads}");
199
+ group.bench_function(&label, |b| {
200
+ b.iter_custom(|n| {
201
+ let store: Arc<dyn CcrStore> = Arc::new(InMemoryCcrStore::new());
202
+ run_mt_workload(store, threads, n)
203
+ });
204
+ });
205
+ // Legacy Mutex<HashMap> (the design PR9 replaces).
206
+ let label = format!("legacy_mutex/threads={threads}");
207
+ group.bench_function(&label, |b| {
208
+ b.iter_custom(|n| {
209
+ let store: Arc<dyn CcrStore> =
210
+ Arc::new(LegacyMutexStore::new(1000, Duration::from_secs(300)));
211
+ run_mt_workload(store, threads, n)
212
+ });
213
+ });
214
+ }
215
+ group.finish();
216
+ }
217
+
218
+ criterion_group!(
219
+ benches,
220
+ bench_put_single_threaded,
221
+ bench_get_single_threaded,
222
+ bench_mixed_multi_threaded,
223
+ );
224
+ criterion_main!(benches);
crates/headroom-core/src/ccr.rs CHANGED
@@ -13,6 +13,18 @@
13
  //! feedback, no per-tool metadata. Those live in the runtime layer; this
14
  //! crate only needs put/get.
15
  //!
 
 
 
 
 
 
 
 
 
 
 
 
16
  //! # Pluggable backend
17
  //!
18
  //! Production deployments swap in their own [`CcrStore`] backed by Redis,
@@ -21,10 +33,12 @@
21
  //!
22
  //! [`CompressionStore`]: https://github.com/chopratejas/headroom/blob/main/headroom/cache/compression_store.py
23
 
24
- use std::collections::{HashMap, VecDeque};
25
  use std::sync::Mutex;
26
  use std::time::{Duration, Instant};
27
 
 
 
28
  /// Pluggable CCR storage backend. `Send + Sync` so it can sit behind an
29
  /// `Arc` and be shared across threads in the proxy.
30
  pub trait CcrStore: Send + Sync {
@@ -50,29 +64,29 @@ pub const DEFAULT_CAPACITY: usize = 1000;
50
  /// Default TTL — 5 minutes, matching Python.
51
  pub const DEFAULT_TTL: Duration = Duration::from_secs(300);
52
 
53
- /// Simple in-memory CCR store with TTL + bounded capacity.
 
54
  ///
55
  /// - **TTL**: 5 minutes by default. Entries past their TTL are dropped
56
- /// on the next `get`.
57
- /// - **Capacity**: 1000 entries by default. When full, the oldest
58
- /// insertion is evicted (FIFO).
59
- /// - **Locking**: single `Mutex`. The store sits on the cold path
60
- /// (one call per crushed array, not per token) so coarse locking is
61
- /// fine and keeps the implementation small.
 
62
  pub struct InMemoryCcrStore {
63
- inner: Mutex<Inner>,
 
 
 
 
 
64
  ttl: Duration,
65
  capacity: usize,
66
  }
67
 
68
- struct Inner {
69
- map: HashMap<String, Entry>,
70
- /// FIFO order of insertion for capacity eviction. Hashes that get
71
- /// re-stored stay at their original position — same content under
72
- /// the same hash is idempotent and rare.
73
- order: VecDeque<String>,
74
- }
75
-
76
  struct Entry {
77
  payload: String,
78
  inserted: Instant,
@@ -86,14 +100,28 @@ impl InMemoryCcrStore {
86
 
87
  pub fn with_capacity_and_ttl(capacity: usize, ttl: Duration) -> Self {
88
  Self {
89
- inner: Mutex::new(Inner {
90
- map: HashMap::new(),
91
- order: VecDeque::new(),
92
- }),
93
  ttl,
94
  capacity,
95
  }
96
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  }
98
 
99
  impl Default for InMemoryCcrStore {
@@ -104,59 +132,58 @@ impl Default for InMemoryCcrStore {
104
 
105
  impl CcrStore for InMemoryCcrStore {
106
  fn put(&self, hash: &str, payload: &str) {
107
- let mut g = self.inner.lock().expect("ccr store mutex poisoned");
108
-
109
- if g.map.contains_key(hash) {
110
- // Idempotent re-store. Same hash should mean same content;
111
- // overwrite the payload (cheap) and keep the original FIFO
112
- // position so eviction stays predictable.
113
- g.map.insert(
114
- hash.to_string(),
115
- Entry {
116
- payload: payload.to_string(),
117
- inserted: Instant::now(),
118
- },
119
- );
120
  return;
121
  }
122
 
123
- // New entry. Evict the oldest if we're at capacity.
124
- while g.map.len() >= self.capacity {
125
- let Some(oldest) = g.order.pop_front() else {
126
- break;
127
- };
128
- g.map.remove(&oldest);
 
 
 
 
 
 
 
 
 
 
 
 
 
129
  }
130
-
131
- g.map.insert(
132
- hash.to_string(),
133
- Entry {
134
- payload: payload.to_string(),
135
- inserted: Instant::now(),
136
- },
137
- );
138
- g.order.push_back(hash.to_string());
139
  }
140
 
141
  fn get(&self, hash: &str) -> Option<String> {
142
- let mut g = self.inner.lock().expect("ccr store mutex poisoned");
143
- let expired = match g.map.get(hash) {
144
- Some(e) => e.inserted.elapsed() > self.ttl,
145
- None => return None,
 
 
 
 
 
 
146
  };
147
- if expired {
148
- g.map.remove(hash);
149
  return None;
150
  }
151
- g.map.get(hash).map(|e| e.payload.clone())
152
  }
153
 
154
  fn len(&self) -> usize {
155
- self.inner
156
- .lock()
157
- .expect("ccr store mutex poisoned")
158
- .map
159
- .len()
160
  }
161
  }
162
 
@@ -220,4 +247,38 @@ mod tests {
220
  assert_eq!(store.get("h"), Some("v".to_string()));
221
  assert!(!store.is_empty());
222
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
223
  }
 
13
  //! feedback, no per-tool metadata. Those live in the runtime layer; this
14
  //! crate only needs put/get.
15
  //!
16
+ //! # Concurrency
17
+ //!
18
+ //! The default [`InMemoryCcrStore`] uses [`DashMap`] (sharded concurrent
19
+ //! hash map) so reads and writes targeting different keys never contend.
20
+ //! Only the FIFO insertion-order queue (used for capacity-bounded
21
+ //! eviction) sits behind a single `Mutex`, and that mutex is held just
22
+ //! long enough for an O(1) `push_back` or capacity-sweep.
23
+ //!
24
+ //! Profile under multi-worker load shows order-of-magnitude lower
25
+ //! contention than the previous single-`Mutex<HashMap>` design — see
26
+ //! `benches/ccr_store.rs`.
27
+ //!
28
  //! # Pluggable backend
29
  //!
30
  //! Production deployments swap in their own [`CcrStore`] backed by Redis,
 
33
  //!
34
  //! [`CompressionStore`]: https://github.com/chopratejas/headroom/blob/main/headroom/cache/compression_store.py
35
 
36
+ use std::collections::VecDeque;
37
  use std::sync::Mutex;
38
  use std::time::{Duration, Instant};
39
 
40
+ use dashmap::DashMap;
41
+
42
  /// Pluggable CCR storage backend. `Send + Sync` so it can sit behind an
43
  /// `Arc` and be shared across threads in the proxy.
44
  pub trait CcrStore: Send + Sync {
 
64
  /// Default TTL — 5 minutes, matching Python.
65
  pub const DEFAULT_TTL: Duration = Duration::from_secs(300);
66
 
67
+ /// In-memory CCR store backed by [`DashMap`] for sharded concurrent
68
+ /// access.
69
  ///
70
  /// - **TTL**: 5 minutes by default. Entries past their TTL are dropped
71
+ /// on the next `get` (lazy expiry — no background reaper thread).
72
+ /// - **Capacity**: 1000 entries by default. When `put` would push us
73
+ /// past capacity, the oldest entry (per insertion order) is evicted.
74
+ /// - **Concurrency**: gets and puts on distinct keys do not contend.
75
+ /// The only serialization point is the insertion-order queue used
76
+ /// for capacity eviction; that mutex is held for an O(1) push or a
77
+ /// small sweep.
78
  pub struct InMemoryCcrStore {
79
+ map: DashMap<String, Entry>,
80
+ /// FIFO insertion order. Stale entries (already removed from `map`
81
+ /// via TTL expiry) are tolerated — `pop_front` + `map.remove` is a
82
+ /// no-op for missing keys, and capacity-bounded sweeps loop until
83
+ /// they actually evict a real entry.
84
+ order: Mutex<VecDeque<String>>,
85
  ttl: Duration,
86
  capacity: usize,
87
  }
88
 
89
+ #[derive(Clone)]
 
 
 
 
 
 
 
90
  struct Entry {
91
  payload: String,
92
  inserted: Instant,
 
100
 
101
  pub fn with_capacity_and_ttl(capacity: usize, ttl: Duration) -> Self {
102
  Self {
103
+ map: DashMap::with_capacity(capacity),
104
+ order: Mutex::new(VecDeque::with_capacity(capacity)),
 
 
105
  ttl,
106
  capacity,
107
  }
108
  }
109
+
110
+ /// Sweep the order queue, dropping leading entries that no longer
111
+ /// exist in the map (already expired or evicted), then evict
112
+ /// real entries until `map.len() < capacity`. Called only from
113
+ /// `put` on a fresh-key insert path.
114
+ fn evict_until_under_capacity(&self) {
115
+ let mut guard = self.order.lock().expect("ccr order mutex poisoned");
116
+ while self.map.len() >= self.capacity {
117
+ let Some(oldest) = guard.pop_front() else {
118
+ break;
119
+ };
120
+ // `remove` is a no-op if `oldest` was already lazy-expired.
121
+ // Loop continues until we actually shrink the map.
122
+ self.map.remove(&oldest);
123
+ }
124
+ }
125
  }
126
 
127
  impl Default for InMemoryCcrStore {
 
132
 
133
  impl CcrStore for InMemoryCcrStore {
134
  fn put(&self, hash: &str, payload: &str) {
135
+ // Idempotent re-store fast-path: same hash overwrite payload
136
+ // in place, leave the order queue alone. Common when the same
137
+ // tool output flows through multiple times in a session.
138
+ if let Some(mut existing) = self.map.get_mut(hash) {
139
+ existing.payload = payload.to_string();
140
+ existing.inserted = Instant::now();
 
 
 
 
 
 
 
141
  return;
142
  }
143
 
144
+ // New entry. Cap-bound first (may sweep a few stale order
145
+ // entries), then insert and append to the FIFO queue.
146
+ if self.map.len() >= self.capacity {
147
+ self.evict_until_under_capacity();
148
+ }
149
+ let entry = Entry {
150
+ payload: payload.to_string(),
151
+ inserted: Instant::now(),
152
+ };
153
+ let prev = self.map.insert(hash.to_string(), entry);
154
+ if prev.is_none() {
155
+ // Truly new key — record in FIFO order. (If `prev.is_some()`
156
+ // it means another thread re-inserted between our get_mut
157
+ // miss and this insert; treat that as a fast-path overwrite
158
+ // and skip the queue append to avoid duplicates.)
159
+ self.order
160
+ .lock()
161
+ .expect("ccr order mutex poisoned")
162
+ .push_back(hash.to_string());
163
  }
 
 
 
 
 
 
 
 
 
164
  }
165
 
166
  fn get(&self, hash: &str) -> Option<String> {
167
+ // Read path: shard read-lock, check TTL, clone payload out.
168
+ // No global lock involvement at all — distinct hashes hash to
169
+ // distinct shards and never contend.
170
+ let expired_at = {
171
+ let entry = self.map.get(hash)?;
172
+ if entry.inserted.elapsed() > self.ttl {
173
+ Some(()) // signal expired; drop guard before we remove
174
+ } else {
175
+ return Some(entry.payload.clone());
176
+ }
177
  };
178
+ if expired_at.is_some() {
179
+ self.map.remove(hash);
180
  return None;
181
  }
182
+ None
183
  }
184
 
185
  fn len(&self) -> usize {
186
+ self.map.len()
 
 
 
 
187
  }
188
  }
189
 
 
247
  assert_eq!(store.get("h"), Some("v".to_string()));
248
  assert!(!store.is_empty());
249
  }
250
+
251
+ #[test]
252
+ fn concurrent_puts_and_gets_do_not_corrupt() {
253
+ // Smoke test for the concurrent design — N threads each do
254
+ // P puts and P gets against distinct keys. Every key written
255
+ // must be readable afterwards.
256
+ use std::sync::Arc;
257
+ use std::thread;
258
+
259
+ let store = Arc::new(InMemoryCcrStore::with_capacity_and_ttl(10_000, DEFAULT_TTL));
260
+ let n_threads = 8;
261
+ let per_thread = 200;
262
+
263
+ let mut handles = Vec::new();
264
+ for tid in 0..n_threads {
265
+ let s = store.clone();
266
+ handles.push(thread::spawn(move || {
267
+ for i in 0..per_thread {
268
+ let key = format!("t{tid}_k{i}");
269
+ let val = format!("v{tid}_{i}");
270
+ s.put(&key, &val);
271
+ }
272
+ for i in 0..per_thread {
273
+ let key = format!("t{tid}_k{i}");
274
+ let got = s.get(&key);
275
+ assert_eq!(got, Some(format!("v{tid}_{i}")));
276
+ }
277
+ }));
278
+ }
279
+ for h in handles {
280
+ h.join().unwrap();
281
+ }
282
+ assert_eq!(store.len(), n_threads * per_thread);
283
+ }
284
  }
crates/headroom-core/src/transforms/smart_crusher/crusher.rs CHANGED
@@ -667,11 +667,15 @@ impl SmartCrusher {
667
  // serves the original back via retrieval tool calls.
668
  let dropped_count = items.len().saturating_sub(result.len());
669
  let (ccr_hash, dropped_summary) = if dropped_count > 0 {
670
- let h = hash_array_for_ccr(items);
 
 
 
 
 
 
671
  let marker = format!("<<ccr:{h} {dropped_count}_rows_offloaded>>");
672
  if let Some(store) = &self.ccr_store {
673
- let canonical =
674
- serde_json::to_string(&Value::Array(items.to_vec())).unwrap_or_default();
675
  store.put(&h, &canonical);
676
  }
677
  (Some(h), marker)
@@ -898,14 +902,22 @@ fn estimate_array_bytes(item_strings: &[String]) -> usize {
898
  payload + separators + 2
899
  }
900
 
901
- /// 12-char SHA-256 hex prefix of the canonical JSON serialization of
902
- /// `[v0, v1, ...]`. Used as the CCR retrieval key when the lossy path
903
- /// drops rows. Same input → same hash, so the runtime can cache the
904
- /// original by-hash and retrieval is deterministic.
905
- fn hash_array_for_ccr(items: &[Value]) -> String {
 
 
 
 
 
 
 
 
 
906
  use sha2::{Digest, Sha256};
907
  let mut h = Sha256::new();
908
- let canonical = serde_json::to_string(&Value::Array(items.to_vec())).unwrap_or_default();
909
  h.update(canonical.as_bytes());
910
  h.finalize()
911
  .iter()
@@ -914,6 +926,15 @@ fn hash_array_for_ccr(items: &[Value]) -> String {
914
  .collect()
915
  }
916
 
 
 
 
 
 
 
 
 
 
917
  // ─── PR5 walker-integration helpers (string handling) ──────────────────────
918
  //
919
  // Parse-as-JSON-container, marker formatting, and humanize-bytes used to
 
667
  // serves the original back via retrieval tool calls.
668
  let dropped_count = items.len().saturating_sub(result.len());
669
  let (ccr_hash, dropped_summary) = if dropped_count > 0 {
670
+ // Serialize the original array exactly ONCE. The hash is
671
+ // taken over those bytes, and (if a store is configured) the
672
+ // same bytes get stored — eliminating a redundant tree clone
673
+ // (`items.to_vec()`) and a redundant `serde_json::to_string`
674
+ // pass that the previous version did per dropped array.
675
+ let canonical = canonical_array_json(items);
676
+ let h = hash_canonical(&canonical);
677
  let marker = format!("<<ccr:{h} {dropped_count}_rows_offloaded>>");
678
  if let Some(store) = &self.ccr_store {
 
 
679
  store.put(&h, &canonical);
680
  }
681
  (Some(h), marker)
 
902
  payload + separators + 2
903
  }
904
 
905
+ /// Serialize `[v0, v1, ...]` once into the canonical JSON form used by
906
+ /// the CCR retrieval contract. `serde_json` writes a slice of `Value` as
907
+ /// the same bytes it would write for `Value::Array(items.to_vec())`, so
908
+ /// we skip the array-wrapper allocation and the deep tree clone it
909
+ /// requires. Used by both the hash (input) and the store payload (write).
910
+ fn canonical_array_json(items: &[Value]) -> String {
911
+ serde_json::to_string(items).unwrap_or_default()
912
+ }
913
+
914
+ /// 12-char SHA-256 hex prefix of an already-serialized canonical JSON
915
+ /// string. Caller is responsible for producing the canonical form via
916
+ /// [`canonical_array_json`] (or another byte-equal serializer) — the
917
+ /// hash is over the bytes, so a stable serializer is the contract.
918
+ fn hash_canonical(canonical: &str) -> String {
919
  use sha2::{Digest, Sha256};
920
  let mut h = Sha256::new();
 
921
  h.update(canonical.as_bytes());
922
  h.finalize()
923
  .iter()
 
926
  .collect()
927
  }
928
 
929
+ /// Convenience: canonical-serialize `items` and hash the result. Kept
930
+ /// for sites (e.g. tests) that don't also need the canonical bytes for
931
+ /// storage. Production lossy path inlines `canonical_array_json` +
932
+ /// `hash_canonical` so the bytes are reused for the store payload.
933
+ #[cfg(test)]
934
+ fn hash_array_for_ccr(items: &[Value]) -> String {
935
+ hash_canonical(&canonical_array_json(items))
936
+ }
937
+
938
  // ─── PR5 walker-integration helpers (string handling) ──────────────────────
939
  //
940
  // Parse-as-JSON-container, marker formatting, and humanize-bytes used to
crates/headroom-py/src/lib.rs CHANGED
@@ -384,11 +384,18 @@ impl PyDiffCompressor {
384
 
385
  /// `compress(content: str, context: str = "") -> DiffCompressionResult`.
386
  /// Argument order and keyword names match the Python implementation.
 
 
 
 
 
 
387
  #[pyo3(signature = (content, context = ""))]
388
- fn compress(&self, content: &str, context: &str) -> PyDiffCompressionResult {
389
- PyDiffCompressionResult {
390
- inner: self.inner.compress(content, context),
391
- }
 
392
  }
393
 
394
  /// `compress_with_stats(content, context="") -> (result, stats)`.
@@ -398,10 +405,14 @@ impl PyDiffCompressor {
398
  #[pyo3(signature = (content, context = ""))]
399
  fn compress_with_stats(
400
  &self,
 
401
  content: &str,
402
  context: &str,
403
  ) -> (PyDiffCompressionResult, PyDiffCompressorStats) {
404
- let (result, stats) = self.inner.compress_with_stats(content, context);
 
 
 
405
  (
406
  PyDiffCompressionResult { inner: result },
407
  PyDiffCompressorStats { inner: stats },
@@ -644,20 +655,36 @@ impl PySmartCrusher {
644
 
645
  /// `crush(content, query="", bias=1.0) -> CrushResult`. Argument
646
  /// order and keyword names mirror the Python implementation.
 
 
 
 
 
 
647
  #[pyo3(signature = (content, query = "", bias = 1.0))]
648
- fn crush(&self, content: &str, query: &str, bias: f64) -> PyCrushResult {
649
- PyCrushResult {
650
- inner: self.inner.crush(content, query, bias),
651
- }
 
652
  }
653
 
654
  /// `smart_crush_content(content, query="", bias=1.0) -> (str, bool, str)`.
655
  /// Mirrors Python's `_smart_crush_content` — used by
656
  /// `smart_crush_tool_output` convenience function and direct
657
- /// callers that want the tuple form.
 
658
  #[pyo3(signature = (content, query = "", bias = 1.0))]
659
- fn smart_crush_content(&self, content: &str, query: &str, bias: f64) -> (String, bool, String) {
660
- self.inner.smart_crush_content(content, query, bias)
 
 
 
 
 
 
 
 
661
  }
662
 
663
  /// Crush a JSON array directly and return the structured result.
@@ -684,25 +711,39 @@ impl PySmartCrusher {
684
  query: &str,
685
  bias: f64,
686
  ) -> Bound<'py, PyDict> {
687
- // Errors here surface as Python `RuntimeError` via pyo3's panic
688
- // catcher callers are expected to pass valid array-shaped JSON.
689
- let parsed: serde_json::Value = serde_json::from_str(items_json)
690
- .unwrap_or_else(|e| panic!("items_json must be JSON: {e}"));
691
- let items = match parsed {
692
- serde_json::Value::Array(a) => a,
693
- other => panic!("items_json must be a JSON array, got {}", type_name(&other)),
694
- };
695
- let result = self.inner.crush_array(&items, query, bias);
696
- let kept_json = serde_json::to_string(&serde_json::Value::Array(result.items))
697
- .expect("serialize kept items");
 
 
 
 
 
 
 
 
 
 
 
 
 
 
698
  build_crush_array_dict(
699
  py,
700
  kept_json,
701
- result.ccr_hash,
702
- result.dropped_summary,
703
- result.strategy_info,
704
- result.compacted,
705
- result.compaction_kind,
706
  )
707
  }
708
 
@@ -719,15 +760,20 @@ impl PySmartCrusher {
719
  /// pass without per-array lossy crushing — useful when the caller
720
  /// wants document-shape compaction (forms, configs, mixed records)
721
  /// rather than statistical row drop.
722
- fn compact_document_json(&self, doc_json: &str) -> String {
723
- let parsed: serde_json::Value =
724
- serde_json::from_str(doc_json).unwrap_or_else(|e| panic!("doc_json must be JSON: {e}"));
725
- let mut dc = DocumentCompactor::new();
726
- if let Some(store) = self.inner.ccr_store() {
727
- dc = dc.with_ccr_store(store.clone());
728
- }
729
- let out = dc.compact(parsed);
730
- serde_json::to_string(&out).expect("serialize compacted document")
 
 
 
 
 
731
  }
732
 
733
  /// Look up an original payload by CCR hash.
 
384
 
385
  /// `compress(content: str, context: str = "") -> DiffCompressionResult`.
386
  /// Argument order and keyword names match the Python implementation.
387
+ ///
388
+ /// Releases the GIL across the Rust compress call so concurrent
389
+ /// Python threads (uvicorn workers, asyncio tasks) can keep
390
+ /// running while we hash + parse + filter the diff. The
391
+ /// `&str` inputs are copied to owned `String`s first because
392
+ /// PyO3 ties their lifetime to the GIL hold.
393
  #[pyo3(signature = (content, context = ""))]
394
+ fn compress(&self, py: Python<'_>, content: &str, context: &str) -> PyDiffCompressionResult {
395
+ let content = content.to_string();
396
+ let context = context.to_string();
397
+ let inner = py.allow_threads(|| self.inner.compress(&content, &context));
398
+ PyDiffCompressionResult { inner }
399
  }
400
 
401
  /// `compress_with_stats(content, context="") -> (result, stats)`.
 
405
  #[pyo3(signature = (content, context = ""))]
406
  fn compress_with_stats(
407
  &self,
408
+ py: Python<'_>,
409
  content: &str,
410
  context: &str,
411
  ) -> (PyDiffCompressionResult, PyDiffCompressorStats) {
412
+ let content = content.to_string();
413
+ let context = context.to_string();
414
+ let (result, stats) =
415
+ py.allow_threads(|| self.inner.compress_with_stats(&content, &context));
416
  (
417
  PyDiffCompressionResult { inner: result },
418
  PyDiffCompressorStats { inner: stats },
 
655
 
656
  /// `crush(content, query="", bias=1.0) -> CrushResult`. Argument
657
  /// order and keyword names mirror the Python implementation.
658
+ ///
659
+ /// Releases the GIL across the Rust crush call. Concurrent Python
660
+ /// threads in the proxy keep running during the JSON parse +
661
+ /// recursive process_value + per-array compression work. `&str`
662
+ /// inputs are copied to owned `String`s up-front since PyO3 ties
663
+ /// their lifetime to the GIL hold.
664
  #[pyo3(signature = (content, query = "", bias = 1.0))]
665
+ fn crush(&self, py: Python<'_>, content: &str, query: &str, bias: f64) -> PyCrushResult {
666
+ let content = content.to_string();
667
+ let query = query.to_string();
668
+ let inner = py.allow_threads(|| self.inner.crush(&content, &query, bias));
669
+ PyCrushResult { inner }
670
  }
671
 
672
  /// `smart_crush_content(content, query="", bias=1.0) -> (str, bool, str)`.
673
  /// Mirrors Python's `_smart_crush_content` — used by
674
  /// `smart_crush_tool_output` convenience function and direct
675
+ /// callers that want the tuple form. Releases the GIL across the
676
+ /// compute (same rationale as `crush`).
677
  #[pyo3(signature = (content, query = "", bias = 1.0))]
678
+ fn smart_crush_content(
679
+ &self,
680
+ py: Python<'_>,
681
+ content: &str,
682
+ query: &str,
683
+ bias: f64,
684
+ ) -> (String, bool, String) {
685
+ let content = content.to_string();
686
+ let query = query.to_string();
687
+ py.allow_threads(|| self.inner.smart_crush_content(&content, &query, bias))
688
  }
689
 
690
  /// Crush a JSON array directly and return the structured result.
 
711
  query: &str,
712
  bias: f64,
713
  ) -> Bound<'py, PyDict> {
714
+ // GIL-release pattern: own the inputs, do all heavy compute
715
+ // (JSON parse, crush, re-serialize) without the GIL, then
716
+ // re-acquire to build the PyDict from the owned outputs.
717
+ let items_json = items_json.to_string();
718
+ let query = query.to_string();
719
+ let (kept_json, ccr_hash, dropped_summary, strategy_info, compacted, compaction_kind) = py
720
+ .allow_threads(|| {
721
+ let parsed: serde_json::Value = serde_json::from_str(&items_json)
722
+ .unwrap_or_else(|e| panic!("items_json must be JSON: {e}"));
723
+ let items = match parsed {
724
+ serde_json::Value::Array(a) => a,
725
+ other => panic!("items_json must be a JSON array, got {}", type_name(&other)),
726
+ };
727
+ let result = self.inner.crush_array(&items, &query, bias);
728
+ let kept_json = serde_json::to_string(&serde_json::Value::Array(result.items))
729
+ .expect("serialize kept items");
730
+ (
731
+ kept_json,
732
+ result.ccr_hash,
733
+ result.dropped_summary,
734
+ result.strategy_info,
735
+ result.compacted,
736
+ result.compaction_kind,
737
+ )
738
+ });
739
  build_crush_array_dict(
740
  py,
741
  kept_json,
742
+ ccr_hash,
743
+ dropped_summary,
744
+ strategy_info,
745
+ compacted,
746
+ compaction_kind,
747
  )
748
  }
749
 
 
760
  /// pass without per-array lossy crushing — useful when the caller
761
  /// wants document-shape compaction (forms, configs, mixed records)
762
  /// rather than statistical row drop.
763
+ fn compact_document_json(&self, py: Python<'_>, doc_json: &str) -> String {
764
+ // Heavy: JSON parse + recursive walker + tabular compaction +
765
+ // re-serialize. None of it touches Python; release the GIL.
766
+ let doc_json = doc_json.to_string();
767
+ py.allow_threads(|| {
768
+ let parsed: serde_json::Value = serde_json::from_str(&doc_json)
769
+ .unwrap_or_else(|e| panic!("doc_json must be JSON: {e}"));
770
+ let mut dc = DocumentCompactor::new();
771
+ if let Some(store) = self.inner.ccr_store() {
772
+ dc = dc.with_ccr_store(store.clone());
773
+ }
774
+ let out = dc.compact(parsed);
775
+ serde_json::to_string(&out).expect("serialize compacted document")
776
+ })
777
  }
778
 
779
  /// Look up an original payload by CCR hash.