# baseline.py — Metadata Hierarchy Builder — Baseline (Taxonomizer) # # Pure Taxonomizer baseline — NO hardcoded, domain-specific patterns. # The only lexical resource is a generic English stop-word list (standard # information-retrieval practice, not dataset-specific). # # Pipeline (dataset-only, no external APIs, no sentence-transformers): # 1. Load metadata file (CSV / TSV / XLSX / JSON) # 2. Detect column roles (leaf / group / text / meta) # 3. Build canonical schema (_leaf_id, _leaf_label, _group_path, _text) # 4. Represent each variable as a TF-IDF text object # 5. Recursively cluster variables (agglomerative, cosine distance) into an # abstract-to-concrete taxonomy; internal-node labels are the most # discriminative terms of each cluster — derived from the data, not hardcoded # 6. Visualise (Sunburst / Treemap) # 7. Export VIANNA-compatible JSON + canonical CSV # # Papers: # [TAX] Taxonomizer (Sultanum et al.) — leaf=attribute, internal=abstract group # built bottom-up by recursively clustering item feature vectors and # labelling each internal node with its members' shared/discriminative terms # [GON] Goncalves et al. — TF-IDF text objects + cosine distance # [HIE] HiExpan (adapted) — discriminative-term node labelling from __future__ import annotations import csv, json, re, warnings from collections import defaultdict from pathlib import Path import tempfile import numpy as np import pandas as pd import plotly.graph_objects as go import streamlit as st from sklearn.cluster import AgglomerativeClustering from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics import normalized_mutual_info_score, adjusted_rand_score, silhouette_score from sklearn.metrics.pairwise import cosine_distances from sklearn.preprocessing import LabelEncoder warnings.filterwarnings('ignore') st.set_page_config(page_title='Metadata Hierarchy — Baseline', page_icon='🌿', layout='wide') st.title('Metadata Hierarchy Builder — Baseline (Taxonomizer)') st.caption( 'Pure Taxonomizer baseline: TF-IDF text objects + recursive agglomerative ' 'clustering into an abstract-to-concrete taxonomy, with internal-node labels ' 'derived from each cluster’s discriminative terms. No hardcoded domain ' 'patterns, no external APIs, no sentence embeddings — works on any dataset.' ) # ───────────────────────────────────────────────────────────────────────────── # CONSTANTS # ───────────────────────────────────────────────────────────────────────────── LEAF_KEYS = 'variable var field column attribute name code id item indicator question measure concept'.split() GROUP_KEYS = 'task category domain module section table dataset assessment test variant group topic instrument form subscale construct'.split() TEXT_KEYS = 'description definition desc label title question meaning note notes text display full details explanation comment'.split() META_KEYS = 'type dtype data_type datatype unit units format decimal precision values value coding codebook range min max scale'.split() # ───────────────────────────────────────────────────────────────────────────── # FILE LOADING # ───────────────────────────────────────────────────────────────────────────── def safe_name(name: str) -> str: return ''.join(ch if ch.isalnum() or ch in '-_.' else '_' for ch in name) def try_read_csv(path: Path) -> pd.DataFrame: best, best_score = None, -1 for enc in ['utf-8-sig', 'utf-8', 'latin1']: for sep in [None, ',', '\t', ';', '|']: try: df = pd.read_csv(path, sep=sep, engine='python', encoding=enc) score = df.shape[1] * 10 - float(df.isna().mean().mean()) if score > best_score: best, best_score = df, score except Exception: pass if best is None: raise ValueError(f'Could not read {path.name}') best.columns = [str(c).strip().replace(';', '') for c in best.columns] # Repair comma-packed rows (AI-Mind format) if len(best) > 0: first = best.iloc[:, 0].astype(str) other_null = best.iloc[:, 1:].isna().mean().mean() if best.shape[1] > 1 else 1.0 if first.str.contains(',').mean() > 0.50 and other_null > 0.70: lines = path.read_text(encoding='utf-8-sig', errors='replace').splitlines() if lines: header = [h.strip().replace(';', '') for h in lines[0].split(',')] rows = [] for line in lines[1:]: line = line.strip().rstrip(';') if not line: continue if line.startswith('"') and line.endswith('"'): line = line[1:-1] try: parts = next(csv.reader([line], quotechar='"')) except Exception: continue if len(parts) >= len(header): rows.append(parts[:len(header)]) if rows: best = pd.DataFrame(rows, columns=header) best.columns = [str(c).strip().replace(';', '') for c in best.columns] return best def load_any(path: Path) -> pd.DataFrame: s = path.suffix.lower() if s in ['.csv', '.tsv', '.txt']: return try_read_csv(path) if s in ['.xlsx', '.xls']: return pd.read_excel(path) if s == '.json': obj = json.loads(path.read_text(encoding='utf-8', errors='replace')) if isinstance(obj, list): return pd.json_normalize(obj) if isinstance(obj, dict): for v in obj.values(): if isinstance(v, list): return pd.json_normalize(v) raise ValueError(f'Unsupported file type: {s}') def save_upload(f) -> Path: tmp = Path(tempfile.mkdtemp(prefix='baseline_')) p = tmp / safe_name(f.name) p.write_bytes(f.getbuffer()) return p # ───────────────────────────────────────────────────────────────────────────── # ROLE DETECTION [GON] # ───────────────────────────────────────────────────────────────────────────── def norm(c: str) -> str: return re.sub(r'[^a-z0-9]+', '_', str(c).strip().lower()).strip('_') def kscore(c: str, keys: list) -> int: nc = norm(c) return sum(1 for k in keys if k in nc) def profile_columns(df: pd.DataFrame) -> pd.DataFrame: out = [] n = max(len(df), 1) for col in df.columns: s = df[col] non = float(s.notna().mean()) nun = int(s.nunique(dropna=True)) ur = nun / n avg = float(s.dropna().astype(str).map(len).mean()) if s.notna().any() else 0 out.append({ 'column': str(col), 'non_null': round(non, 3), 'unique_values': nun, 'unique_ratio': round(ur, 3), 'avg_length': round(avg, 1), 'leaf_score': 4*kscore(col, LEAF_KEYS) + (3 if 0.5 <= ur <= 1 else 0) + (1 if avg < 80 else 0), 'group_score': 4*kscore(col, GROUP_KEYS) + (3 if 1 < nun < min(n*0.5, 80) else 0) + (1 if avg < 60 else 0), 'text_score': 5*kscore(col, TEXT_KEYS) + (4 if avg > 50 else 0) + (1 if non > 0.5 else 0), 'metadata_score': 4*kscore(col, META_KEYS) + (2 if 1 < nun < min(n*0.8, 100) else 0), }) return pd.DataFrame(out) def detect_roles(df: pd.DataFrame) -> tuple: prof = profile_columns(df) leaf = prof.sort_values(['leaf_score', 'unique_ratio'], ascending=False).head(1)['column'].tolist() text = (prof[(prof.text_score >= 4) | (prof.avg_length > 80)] .sort_values('text_score', ascending=False)['column'].tolist()) or leaf.copy() group = (prof[(prof.group_score >= 4) & (~prof.column.isin(leaf)) & (prof.unique_values > 1)] .sort_values('group_score', ascending=False)['column'].head(3).tolist()) meta = (prof[(prof.metadata_score >= 4) & (~prof.column.isin(text + leaf + group))] .sort_values('metadata_score', ascending=False)['column'].head(5).tolist()) return {'leaf_cols': leaf, 'group_cols': group, 'text_cols': text, 'metadata_cols': meta}, prof # ───────────────────────────────────────────────────────────────────────────── # CANONICAL SCHEMA [GON] # ───────────────────────────────────────────────────────────────────────────── def sv(x) -> str: return '' if pd.isna(x) else str(x).strip() def build_canonical(df: pd.DataFrame, cfg: dict, source: str) -> pd.DataFrame: leaf_cols = cfg.get('leaf_cols', []) group_cols = cfg.get('group_cols', []) text_cols = cfg.get('text_cols', []) meta_cols = cfg.get('metadata_cols', []) rows = [] for i, row in df.iterrows(): leaf_parts = [sv(row.get(c, '')) for c in leaf_cols] leaf_parts = [p for p in leaf_parts if p] label = ' / '.join(leaf_parts) if leaf_parts else f'variable_{i+1}' group_parts = [sv(row.get(c, '')) for c in group_cols] group_parts = [p for p in group_parts if p and p.lower() not in ['nan', 'none']] gpath = ' > '.join(group_parts) if group_parts else 'Ungrouped' parts = [] for c in list(dict.fromkeys(group_cols + leaf_cols + text_cols + meta_cols)): v = sv(row.get(c, '')) if v: parts.append(f'{c}: {v}') text = ' | '.join(parts) if parts else label rows.append({ '_source_file': source, '_row_index': int(i), '_leaf_label': label, '_leaf_id': f'{gpath}.{label}' if gpath != 'Ungrouped' else label, '_group_path': gpath, '_text': text, }) can = pd.DataFrame(rows) if can['_leaf_id'].duplicated().any(): cnt: dict = defaultdict(int) ids = [] for lid in can['_leaf_id']: cnt[lid] += 1 ids.append(lid if cnt[lid] == 1 else f'{lid}__{cnt[lid]}') can['_leaf_id'] = ids return can # ───────────────────────────────────────────────────────────────────────────── # TAXONOMIZER CORE [TAX + GON] # # Everything here is data-driven: TF-IDF over the variable text objects, cosine # distance, agglomerative clustering with the number of clusters chosen by # silhouette, and internal-node labels taken from each cluster's most # discriminative terms. The ONLY lexical resource is the generic English # stop-word list (standard IR practice — not dataset-specific). # ───────────────────────────────────────────────────────────────────────────── def vectorize_texts(texts: list): """TF-IDF text objects [GON]. Generic English stop-words only.""" vec = TfidfVectorizer(stop_words='english', ngram_range=(1, 2), max_features=2000, min_df=1, sublinear_tf=True) X = vec.fit_transform(texts) return X, vec def best_k(dist: np.ndarray, n: int, k_min: int = 2, k_max: int = 8) -> int: """Pick the number of clusters that maximises the silhouette score. Fully data-driven — no fixed cluster count. Returns 1 if no split with >=2 clusters is well separated. """ k_hi = min(k_max, n - 1) if k_hi < k_min: return 1 best, best_s = 1, -1.0 for k in range(k_min, k_hi + 1): labels = AgglomerativeClustering(n_clusters=k, metric='precomputed', linkage='average').fit_predict(dist) if len(set(labels)) < 2: continue try: s = silhouette_score(dist, labels, metric='precomputed') except Exception: continue if s > best_s: best_s, best = s, k return best def discriminative_label(inside: np.ndarray, outside, terms: np.ndarray, used: set, top_n: int = 2) -> str: """Label a cluster by the terms that most separate it from its siblings. inside = mean TF-IDF vector of the cluster's members outside = mean TF-IDF vector of the sibling pool (or 0 if none) """ scores = inside - (outside if outside is not None else 0) picks: list = [] for i in np.argsort(scores)[::-1]: term = terms[i] if len(term) <= 2 or scores[i] <= 0 or term in used: continue picks.append(term) if len(picks) >= top_n: break if not picks: # degenerate: fall back to highest raw mean term for i in np.argsort(inside)[::-1]: if len(terms[i]) > 2: picks = [terms[i]] break return ' / '.join(p.title() for p in picks) if picks else 'Group' # ───────────────────────────────────────────────────────────────────────────── # HIERARCHY CONSTRUCTION [TAX + GON] # ───────────────────────────────────────────────────────────────────────────── def _nmap(nodes: list) -> dict: return {int(n['id']): n for n in nodes} def _next_id(nodes: list) -> int: return max((int(n['id']) for n in nodes), default=0) + 1 def _add_child(nodes: list, parent_id: int, child_id: int): m = _nmap(nodes) p = m.get(int(parent_id)) if p is None: return rel = list(p.get('related', [])) if int(child_id) not in rel: rel.append(int(child_id)) p['related'] = rel def _make_agg(nid: int, name: str, desc: str = '') -> dict: return {'id': int(nid), 'name': str(name), 'related': [], 'type': 'aggregation', 'isShown': True, 'desc': desc, 'dtype': 'determine'} def _leaf_ids(nodes: list, nid: int) -> list: m = _nmap(nodes) out: list = [] def rec(x): n = m.get(int(x)) if not n: return if n.get('type') == 'attribute': out.append(int(x)) return for c in n.get('related', []): rec(int(c)) rec(nid) return list(dict.fromkeys(out)) def build_hierarchy(can: pd.DataFrame, project: str = 'project', max_depth: int = 3, min_cluster_size: int = 6, branch_max: int = 8) -> list: """Pure Taxonomizer construction [TAX]. Builds an abstract-to-concrete taxonomy by recursively clustering the variables' TF-IDF text objects. At each level the number of clusters is chosen by silhouette; each resulting internal node is labelled with the terms that most discriminate its members from their siblings. No group column, no hardcoded patterns are used in construction — so the recovered structure can be fairly evaluated against the original group column. """ # ── leaf attribute nodes (ids 1..N) ────────────────────────────────────── nodes: list = [{'id': 0, 'name': project, 'type': 'root', 'dtype': 'root', 'isShown': True, 'related': [], 'desc': 'Root node'}] row_to_node: list = [] for i, (_, r) in enumerate(can.iterrows(), start=1): nodes.append({'id': i, 'name': r['_leaf_label'], 'dtype': 'determine', 'related': [], 'isShown': True, 'type': 'attribute', 'desc': r['_text'], 'metadata': {'leaf_id': r['_leaf_id'], 'group_path': r['_group_path']}}) row_to_node.append(i) row_to_node = np.array(row_to_node) # ── TF-IDF text objects + full cosine distance matrix [GON] ─────────────── texts = (can['_leaf_label'].astype(str) + ' . ' + can['_text'].astype(str)).tolist() X, vec = vectorize_texts(texts) Xd = X.toarray() terms = vec.get_feature_names_out() full_dist = cosine_distances(X).astype(float) np.fill_diagonal(full_dist, 0.0) # ── recursive clustering ───────────────────────────────────────────────── def attach_leaves(parent_id: int, idx: np.ndarray): for i in idx: _add_child(nodes, parent_id, int(row_to_node[i])) def recurse(parent_id: int, idx: np.ndarray, depth: int, used: set): n = len(idx) if n <= min_cluster_size or depth >= max_depth: attach_leaves(parent_id, idx) return sub = full_dist[np.ix_(idx, idx)] k_cap = min(branch_max, max(2, n // min_cluster_size)) k = best_k(sub, n, k_min=2, k_max=k_cap) if k <= 1: attach_leaves(parent_id, idx) return labels = AgglomerativeClustering(n_clusters=k, metric='precomputed', linkage='average').fit_predict(sub) pool_Xd = Xd[idx] for c in range(k): mask = labels == c members = idx[mask] if len(members) == 0: continue if len(members) == 1: # don't create singleton internal nodes _add_child(nodes, parent_id, int(row_to_node[members[0]])) continue inside = pool_Xd[mask].mean(axis=0) outside = pool_Xd[~mask].mean(axis=0) if (~mask).any() else None label = discriminative_label(inside, outside, terms, used) nid = _next_id(nodes) nodes.append(_make_agg(nid, label, desc=f'Cluster of {len(members)} variables — ' f'discriminative terms: {label}')) _add_child(nodes, parent_id, nid) recurse(nid, members, depth + 1, used | {label.lower()}) recurse(0, np.arange(len(can)), 0, set()) for n in nodes: n['related'] = list(dict.fromkeys(int(x) for x in n.get('related', []))) return nodes # ───────────────────────────────────────────────────────────────────────────── # VISUALISATION # ───────────────────────────────────────────────────────────────────────────── def _parent_map(nodes: list) -> dict: pm: dict = {} for n in nodes: for c in n.get('related', []): if int(c) not in pm: pm[int(c)] = int(n['id']) return pm # ───────────────────────────────────────────────────────────────────────────── # EVALUATION HELPERS # ───────────────────────────────────────────────────────────────────────────── def _eval_cluster_assignments(nodes: list, can: pd.DataFrame) -> list[int]: """Return predicted cluster id (depth-1 aggregation ancestor) for each row in can.""" pm = _parent_map(nodes) def depth1(nid: int) -> int: # Walk up until our parent is root (id==0) or we have no parent while pm.get(nid, -1) not in (-1, 0): nid = pm[nid] return nid lid_to_nid = {n['metadata']['leaf_id']: int(n['id']) for n in nodes if n.get('type') == 'attribute' and 'metadata' in n} return [depth1(lid_to_nid[lid]) if lid in lid_to_nid else -1 for lid in can['_leaf_id']] def _purity(y_true, y_pred) -> float: from collections import Counter clusters: dict = {} for t, p in zip(y_true, y_pred): clusters.setdefault(p, []).append(t) correct = sum(Counter(v).most_common(1)[0][1] for v in clusters.values()) return correct / max(len(y_true), 1) def _structural_stats(nodes: list) -> dict: pm = _parent_map(nodes) def depth_of(nid: int) -> int: d = 0 while nid in pm: nid = pm[nid]; d += 1 return d agg = [n for n in nodes if n.get('type') == 'aggregation'] leafs = [n for n in nodes if n.get('type') == 'attribute'] depths = [depth_of(int(n['id'])) for n in leafs] branches = [len(n.get('related', [])) for n in agg] singletons = sum(1 for b in branches if b == 1) return { 'n_aggregation_nodes': len(agg), 'max_depth': int(max(depths, default=0)), 'avg_leaf_depth': round(float(np.mean(depths)), 2) if depths else 0.0, 'avg_branching_factor': round(float(np.mean(branches)), 2) if branches else 0.0, 'singleton_nodes_%': round(100.0 * singletons / max(len(agg), 1), 1), } def _wrap(text: str, width: int = 70) -> str: """Wrap long hover text onto multiple
lines so it never runs off-screen.""" import textwrap text = str(text).replace('<', '<') lines: list = [] for para in text.split('\n'): wrapped = textwrap.wrap(para, width=width) or [''] lines.extend(wrapped) return '
'.join(lines) def plot_sunburst(nodes: list, max_depth: int = 4) -> go.Figure: pm = _parent_map(nodes) ids, labels, parents, values, hover = [], [], [], [], [] for n in nodes: nid = int(n['id']) lc = len(_leaf_ids(nodes, nid)) ids.append(str(nid)) labels.append(str(n.get('name', ''))[:40]) parents.append('' if nid == 0 else str(pm.get(nid, 0))) values.append(max(1, lc)) desc = _wrap(n.get('desc', '')) hover.append(f'{_wrap(n.get("name",""))}
Type: {n.get("type","")}' f'
Variables: {lc}

{desc}') fig = go.Figure(go.Sunburst( ids=ids, labels=labels, parents=parents, values=values, branchvalues='total', hovertext=hover, hoverinfo='text', maxdepth=max_depth, insidetextorientation='radial', marker=dict(colorscale='Greens', line=dict(width=1, color='white')), )) fig.update_layout(height=700, margin=dict(l=10, r=10, t=40, b=10), title='Click a sector to drill down — click centre to go back') return fig def plot_treemap(nodes: list) -> go.Figure: pm = _parent_map(nodes) ids, labels, parents, values, hover = [], [], [], [], [] for n in nodes: nid = int(n['id']) lc = len(_leaf_ids(nodes, nid)) ids.append(str(nid)) labels.append(str(n.get('name', ''))[:40]) parents.append('' if nid == 0 else str(pm.get(nid, 0))) values.append(max(1, lc)) desc = _wrap(n.get('desc', '')) hover.append(f'{_wrap(n.get("name",""))}
Variables: {lc}
{desc}') fig = go.Figure(go.Treemap( ids=ids, labels=labels, parents=parents, values=values, branchvalues='total', hovertext=hover, hoverinfo='text', textinfo='label+value', marker=dict(colorscale='Greens', line=dict(width=1, color='white')), )) fig.update_layout(height=700, margin=dict(l=10, r=10, t=10, b=10)) return fig # ───────────────────────────────────────────────────────────────────────────── # SIDEBAR # ───────────────────────────────────────────────────────────────────────────── with st.sidebar: st.header('1. Upload') uploaded = st.file_uploader( 'Upload a metadata file', type=['csv', 'tsv', 'txt', 'xlsx', 'xls', 'json'], accept_multiple_files=False, ) st.header('2. Taxonomizer settings') tx_max_depth = st.slider('Max taxonomy depth', 2, 5, 3, 1, help='How many abstract-to-concrete levels to build') tx_min_size = st.slider('Min cluster size', 3, 20, 6, 1, help='Clusters smaller than this stop splitting (leaves attach directly)') tx_branch = st.slider('Max branches per node', 3, 12, 8, 1, help='Upper bound on clusters per split; the actual number is chosen by silhouette') st.header('3. Display') max_items = st.slider('Maximum variables', 25, 1200, 300, 25) group_filter = st.text_input('Group filter (optional)', value='', help='Filter rows whose group path contains this text') display_depth = st.slider('Sunburst depth', 2, 6, 4, 1) # ───────────────────────────────────────────────────────────────────────────── # MAIN # ───────────────────────────────────────────────────────────────────────────── if not uploaded: st.info('Upload a metadata CSV / XLSX / JSON file to begin.') st.markdown(""" ### Baseline algorithm — pure Taxonomizer The simplest of the three approaches — no hardcoded domain patterns, no external APIs, no neural embeddings. Works on any dataset. | Step | Method | Paper | |------|--------|-------| | Text object | Concatenate all metadata fields per variable | Goncalves et al. | | Representation | TF-IDF (generic English stop-words only) | Goncalves et al. | | Hierarchy construction | Recursive agglomerative clustering (cosine), #clusters chosen by silhouette | Taxonomizer (Sultanum et al.) | | Node labelling | Most discriminative terms of each cluster vs its siblings | Taxonomizer / HiExpan | The group column is **not** used for construction, so the recovered taxonomy can be fairly evaluated against it (NMI / ARI / Purity in the Evaluation tab). **Approach 1** adds SBERT embeddings + Wikidata/BioPortal enrichment + HiExpan refinement. **Approach 2** adds NMF/FASTopic aspect discovery + GMM clustering + optional LLM labels. """) st.stop() path = save_upload(uploaded) @st.cache_data(show_spinner=False) def _load_profile(path_str: str): df = load_any(Path(path_str)) cfg, prof = detect_roles(df) return df, cfg, prof with st.spinner('Loading file…'): df, auto_cfg, prof = _load_profile(str(path)) st.subheader('Step 1 — File preview') with st.expander(f'📄 {uploaded.name} ({len(df):,} rows, {len(df.columns)} columns)', expanded=False): st.dataframe(df.head(10), use_container_width=True) score_cols = [c for c in ['column', 'leaf_score', 'group_score', 'text_score', 'metadata_score'] if c in prof.columns] st.dataframe(prof[score_cols].sort_values('leaf_score', ascending=False), use_container_width=True) st.subheader('Step 2 — Confirm column roles') cols = list(df.columns) with st.expander('Column configuration', expanded=True): left, right = st.columns(2) with left: leaf_cols = st.multiselect('Leaf variable column(s)', cols, default=[c for c in auto_cfg.get('leaf_cols', []) if c in cols], key='leaf') group_cols = st.multiselect('Group/task column(s)', cols, default=[c for c in auto_cfg.get('group_cols', []) if c in cols], key='group') with right: text_cols = st.multiselect('Text/description column(s)', cols, default=[c for c in auto_cfg.get('text_cols', []) if c in cols], key='text') meta_cols = st.multiselect('Metadata/type column(s)', cols, default=[c for c in auto_cfg.get('metadata_cols', []) if c in cols], key='meta') if not leaf_cols: st.error('Choose at least one leaf variable column.') st.stop() cfg = {'leaf_cols': leaf_cols, 'group_cols': group_cols, 'text_cols': text_cols, 'metadata_cols': meta_cols} if st.button('Build baseline hierarchy', type='primary'): with st.spinner('Building hierarchy…'): _can = build_canonical(df, cfg, source=Path(uploaded.name).stem) if group_filter.strip(): _can = _can[_can['_group_path'].str.contains( group_filter.strip(), case=False, na=False)].copy() if len(_can) > max_items: _can = _can.head(max_items).copy() _can = _can.reset_index(drop=True) if len(_can) < 2: st.error('Need at least 2 variables after filtering.') st.stop() _pname = Path(uploaded.name).stem _nodes = build_hierarchy(_can, project=_pname, max_depth=tx_max_depth, min_cluster_size=tx_min_size, branch_max=tx_branch) st.session_state['_bl_nodes'] = _nodes st.session_state['_bl_can'] = _can st.session_state['_bl_project'] = _pname if '_bl_nodes' not in st.session_state: st.info('Configure columns above then click **Build baseline hierarchy**.') st.stop() nodes = st.session_state['_bl_nodes'] can = st.session_state['_bl_can'] project_name = st.session_state['_bl_project'] _sm = _structural_stats(nodes) n_leaves = len([n for n in nodes if n['type'] == 'attribute']) n_internal = len([n for n in nodes if n['type'] == 'aggregation']) st.divider() c1, c2, c3, c4 = st.columns(4) c1.metric('Variables', n_leaves) c2.metric('Aggregation nodes', n_internal) c3.metric('Max depth', _sm['max_depth']) c4.metric('Avg branching', _sm['avg_branching_factor']) tabs = st.tabs(['Sunburst', 'Treemap', 'Node detail', 'Canonical table', 'Export', '📊 Evaluation']) with tabs[0]: st.plotly_chart(plot_sunburst(nodes, max_depth=display_depth), use_container_width=True) st.caption('Green = Baseline. Click a sector to drill down; click the centre to go back.') with tabs[1]: st.plotly_chart(plot_treemap(nodes), use_container_width=True) with tabs[2]: nm = _nmap(nodes) agg_nodes = [n for n in nodes if n['type'] in ('aggregation', 'root')] options = [f'{n["name"]} [{len(_leaf_ids(nodes, int(n["id"])))} vars]' for n in agg_nodes] if options: sel = st.selectbox('Select a node', options) sel_name = sel.split(' [')[0] sel_node = next((n for n in agg_nodes if n['name'] == sel_name), None) if sel_node: lids = _leaf_ids(nodes, int(sel_node['id'])) leaf_ids_set = {nm[i]['metadata']['leaf_id'] for i in lids if i in nm and 'metadata' in nm[i]} sub = can[can['_leaf_id'].isin(leaf_ids_set)] st.write(f'**{len(lids)} variables** under "{sel_node["name"]}"') st.dataframe(sub[['_leaf_label', '_group_path', '_text']].reset_index(drop=True), use_container_width=True) with tabs[3]: st.dataframe(can, use_container_width=True) with tabs[4]: _base = safe_name(project_name) col1, col2 = st.columns(2) with col1: st.download_button( 'Hierarchy JSON', data=json.dumps(nodes, indent=2, ensure_ascii=False).encode('utf-8'), file_name=f'{_base}_baseline_hierarchy.json', mime='application/json', use_container_width=True, ) with col2: st.download_button( 'Canonical CSV', data=can.to_csv(index=False).encode('utf-8'), file_name=f'{_base}_baseline_canonical.csv', mime='text/csv', use_container_width=True, ) st.divider() # ── Save directly into the project's outputs/baseline/ folder ────────────── _out_dir = Path(__file__).resolve().parent / 'outputs' / 'baseline' st.markdown('### Save to project folder') st.caption( 'The download buttons above go to your browser’s Downloads folder (a browser ' f'restriction). This button instead writes the files into `{_out_dir}` with the ' 'dataset name — convenient for `evaluate_all.py`.' ) if st.button('💾 Save all to outputs/baseline/', type='primary', use_container_width=True): try: _out_dir.mkdir(parents=True, exist_ok=True) (_out_dir / f'{_base}_baseline_hierarchy.json').write_text( json.dumps(nodes, indent=2, ensure_ascii=False), encoding='utf-8') can.to_csv(_out_dir / f'{_base}_baseline_canonical.csv', index=False) st.success(f'Saved to `{_out_dir}`:\n\n' f'- {_base}_baseline_hierarchy.json\n' f'- {_base}_baseline_canonical.csv') except Exception as _e: st.error(f'Could not save: {_e}') with tabs[5]: import hierarchy_eval as he st.subheader('Hierarchy Quality Evaluation') st.caption( 'The group column is a *construction input* (Gonçalves text object), so it ' 'cannot serve as ground truth. The primary metrics below are **reference-free** ' '— they assess the hierarchy itself, with no gold standard.' ) with st.spinner('Computing reference-free metrics…'): tm = he.traco_metrics(nodes) npmi = he.npmi_coherence(nodes, can['_text'].tolist()) # ── PRIMARY: reference-free hierarchy quality ───────────────────────────── st.markdown('#### Primary — reference-free hierarchy quality') p1, p2, p3 = st.columns(3) p1.metric('Parent–child coherence', tm['pc_coherence'], help='TraCo (Wu et al., AAAI 2024). Mean similarity of each node to its parent. ' 'Higher = children correctly nest under their parent theme.') p2.metric('Sibling diversity', tm['sibling_diversity'], help='TraCo (Wu et al., AAAI 2024). Mean distance between sibling nodes. ' 'Higher = siblings are distinct (LOW = redundant/repeated siblings).') p3.metric('NPMI label coherence', npmi, help='Lau et al., EACL 2014. Whether node-label terms genuinely co-occur in the ' 'data. Higher = meaningful labels, not arbitrary term salads.') st.caption(f'Embedding backend: **{tm["encoder"]}**. ' 'Coherence & diversity ∈ [−1, 1]; NPMI ∈ ≈[−1, 1].') # ── Structural metrics ──────────────────────────────────────────────────── st.markdown('#### Structural statistics') sm = he.structural_stats(nodes) s1, s2, s3, s4, s5 = st.columns(5) s1.metric('Aggregation nodes', sm['n_aggregation_nodes']) s2.metric('Max leaf depth', sm['max_depth']) s3.metric('Avg leaf depth', sm['avg_leaf_depth']) s4.metric('Avg branching', sm['avg_branching_factor']) s5.metric('Singleton nodes', f"{sm['singleton_nodes_%']}%", help='Aggregation nodes with a single child (sparse-hierarchy indicator)') # ── SECONDARY: group preservation (caveated) ────────────────────────────── st.markdown('#### Secondary — group-structure preservation *(descriptive)*') st.caption( '⚠️ The group column was an **input** to construction, so these are NOT accuracy ' 'metrics — they only describe how much the discovered hierarchy still reflects the ' 'pre-existing group column. High values are expected and not evidence of quality.' ) gp = he.group_preservation(nodes, can) g1, g2, g3 = st.columns(3) g1.metric('NMI', gp['NMI']); g2.metric('ARI', gp['ARI']); g3.metric('Purity', gp['Purity'])