| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from __future__ import annotations |
| import csv, json, re, warnings |
| from collections import defaultdict |
| from pathlib import Path |
| import tempfile |
|
|
| import numpy as np |
| import pandas as pd |
| import plotly.graph_objects as go |
| import streamlit as st |
| from sklearn.cluster import AgglomerativeClustering |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics import normalized_mutual_info_score, adjusted_rand_score, silhouette_score |
| from sklearn.metrics.pairwise import cosine_distances |
| from sklearn.preprocessing import LabelEncoder |
|
|
| warnings.filterwarnings('ignore') |
|
|
| st.set_page_config(page_title='Metadata Hierarchy β Baseline', page_icon='πΏ', layout='wide') |
| st.title('Metadata Hierarchy Builder β Baseline (Taxonomizer)') |
| st.caption( |
| 'Pure Taxonomizer baseline: TF-IDF text objects + recursive agglomerative ' |
| 'clustering into an abstract-to-concrete taxonomy, with internal-node labels ' |
| 'derived from each clusterβs discriminative terms. No hardcoded domain ' |
| 'patterns, no external APIs, no sentence embeddings β works on any dataset.' |
| ) |
|
|
| |
| |
| |
| LEAF_KEYS = 'variable var field column attribute name code id item indicator question measure concept'.split() |
| GROUP_KEYS = 'task category domain module section table dataset assessment test variant group topic instrument form subscale construct'.split() |
| TEXT_KEYS = 'description definition desc label title question meaning note notes text display full details explanation comment'.split() |
| META_KEYS = 'type dtype data_type datatype unit units format decimal precision values value coding codebook range min max scale'.split() |
|
|
| |
| |
| |
| def safe_name(name: str) -> str: |
| return ''.join(ch if ch.isalnum() or ch in '-_.' else '_' for ch in name) |
|
|
| def try_read_csv(path: Path) -> pd.DataFrame: |
| best, best_score = None, -1 |
| for enc in ['utf-8-sig', 'utf-8', 'latin1']: |
| for sep in [None, ',', '\t', ';', '|']: |
| try: |
| df = pd.read_csv(path, sep=sep, engine='python', encoding=enc) |
| score = df.shape[1] * 10 - float(df.isna().mean().mean()) |
| if score > best_score: |
| best, best_score = df, score |
| except Exception: |
| pass |
| if best is None: |
| raise ValueError(f'Could not read {path.name}') |
| best.columns = [str(c).strip().replace(';', '') for c in best.columns] |
| |
| if len(best) > 0: |
| first = best.iloc[:, 0].astype(str) |
| other_null = best.iloc[:, 1:].isna().mean().mean() if best.shape[1] > 1 else 1.0 |
| if first.str.contains(',').mean() > 0.50 and other_null > 0.70: |
| lines = path.read_text(encoding='utf-8-sig', errors='replace').splitlines() |
| if lines: |
| header = [h.strip().replace(';', '') for h in lines[0].split(',')] |
| rows = [] |
| for line in lines[1:]: |
| line = line.strip().rstrip(';') |
| if not line: |
| continue |
| if line.startswith('"') and line.endswith('"'): |
| line = line[1:-1] |
| try: |
| parts = next(csv.reader([line], quotechar='"')) |
| except Exception: |
| continue |
| if len(parts) >= len(header): |
| rows.append(parts[:len(header)]) |
| if rows: |
| best = pd.DataFrame(rows, columns=header) |
| best.columns = [str(c).strip().replace(';', '') for c in best.columns] |
| return best |
|
|
| def load_any(path: Path) -> pd.DataFrame: |
| s = path.suffix.lower() |
| if s in ['.csv', '.tsv', '.txt']: |
| return try_read_csv(path) |
| if s in ['.xlsx', '.xls']: |
| return pd.read_excel(path) |
| if s == '.json': |
| obj = json.loads(path.read_text(encoding='utf-8', errors='replace')) |
| if isinstance(obj, list): |
| return pd.json_normalize(obj) |
| if isinstance(obj, dict): |
| for v in obj.values(): |
| if isinstance(v, list): |
| return pd.json_normalize(v) |
| raise ValueError(f'Unsupported file type: {s}') |
|
|
| def save_upload(f) -> Path: |
| tmp = Path(tempfile.mkdtemp(prefix='baseline_')) |
| p = tmp / safe_name(f.name) |
| p.write_bytes(f.getbuffer()) |
| return p |
|
|
| |
| |
| |
| def norm(c: str) -> str: |
| return re.sub(r'[^a-z0-9]+', '_', str(c).strip().lower()).strip('_') |
|
|
| def kscore(c: str, keys: list) -> int: |
| nc = norm(c) |
| return sum(1 for k in keys if k in nc) |
|
|
| def profile_columns(df: pd.DataFrame) -> pd.DataFrame: |
| out = [] |
| n = max(len(df), 1) |
| for col in df.columns: |
| s = df[col] |
| non = float(s.notna().mean()) |
| nun = int(s.nunique(dropna=True)) |
| ur = nun / n |
| avg = float(s.dropna().astype(str).map(len).mean()) if s.notna().any() else 0 |
| out.append({ |
| 'column': str(col), |
| 'non_null': round(non, 3), |
| 'unique_values': nun, |
| 'unique_ratio': round(ur, 3), |
| 'avg_length': round(avg, 1), |
| 'leaf_score': 4*kscore(col, LEAF_KEYS) + (3 if 0.5 <= ur <= 1 else 0) + (1 if avg < 80 else 0), |
| 'group_score': 4*kscore(col, GROUP_KEYS) + (3 if 1 < nun < min(n*0.5, 80) else 0) + (1 if avg < 60 else 0), |
| 'text_score': 5*kscore(col, TEXT_KEYS) + (4 if avg > 50 else 0) + (1 if non > 0.5 else 0), |
| 'metadata_score': 4*kscore(col, META_KEYS) + (2 if 1 < nun < min(n*0.8, 100) else 0), |
| }) |
| return pd.DataFrame(out) |
|
|
| def detect_roles(df: pd.DataFrame) -> tuple: |
| prof = profile_columns(df) |
| leaf = prof.sort_values(['leaf_score', 'unique_ratio'], ascending=False).head(1)['column'].tolist() |
| text = (prof[(prof.text_score >= 4) | (prof.avg_length > 80)] |
| .sort_values('text_score', ascending=False)['column'].tolist()) or leaf.copy() |
| group = (prof[(prof.group_score >= 4) & (~prof.column.isin(leaf)) & (prof.unique_values > 1)] |
| .sort_values('group_score', ascending=False)['column'].head(3).tolist()) |
| meta = (prof[(prof.metadata_score >= 4) & (~prof.column.isin(text + leaf + group))] |
| .sort_values('metadata_score', ascending=False)['column'].head(5).tolist()) |
| return {'leaf_cols': leaf, 'group_cols': group, 'text_cols': text, 'metadata_cols': meta}, prof |
|
|
| |
| |
| |
| def sv(x) -> str: |
| return '' if pd.isna(x) else str(x).strip() |
|
|
| def build_canonical(df: pd.DataFrame, cfg: dict, source: str) -> pd.DataFrame: |
| leaf_cols = cfg.get('leaf_cols', []) |
| group_cols = cfg.get('group_cols', []) |
| text_cols = cfg.get('text_cols', []) |
| meta_cols = cfg.get('metadata_cols', []) |
| rows = [] |
| for i, row in df.iterrows(): |
| leaf_parts = [sv(row.get(c, '')) for c in leaf_cols] |
| leaf_parts = [p for p in leaf_parts if p] |
| label = ' / '.join(leaf_parts) if leaf_parts else f'variable_{i+1}' |
| group_parts = [sv(row.get(c, '')) for c in group_cols] |
| group_parts = [p for p in group_parts if p and p.lower() not in ['nan', 'none']] |
| gpath = ' > '.join(group_parts) if group_parts else 'Ungrouped' |
| parts = [] |
| for c in list(dict.fromkeys(group_cols + leaf_cols + text_cols + meta_cols)): |
| v = sv(row.get(c, '')) |
| if v: |
| parts.append(f'{c}: {v}') |
| text = ' | '.join(parts) if parts else label |
| rows.append({ |
| '_source_file': source, |
| '_row_index': int(i), |
| '_leaf_label': label, |
| '_leaf_id': f'{gpath}.{label}' if gpath != 'Ungrouped' else label, |
| '_group_path': gpath, |
| '_text': text, |
| }) |
| can = pd.DataFrame(rows) |
| if can['_leaf_id'].duplicated().any(): |
| cnt: dict = defaultdict(int) |
| ids = [] |
| for lid in can['_leaf_id']: |
| cnt[lid] += 1 |
| ids.append(lid if cnt[lid] == 1 else f'{lid}__{cnt[lid]}') |
| can['_leaf_id'] = ids |
| return can |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def vectorize_texts(texts: list): |
| """TF-IDF text objects [GON]. Generic English stop-words only.""" |
| vec = TfidfVectorizer(stop_words='english', ngram_range=(1, 2), |
| max_features=2000, min_df=1, sublinear_tf=True) |
| X = vec.fit_transform(texts) |
| return X, vec |
|
|
| def best_k(dist: np.ndarray, n: int, k_min: int = 2, k_max: int = 8) -> int: |
| """Pick the number of clusters that maximises the silhouette score. |
| |
| Fully data-driven β no fixed cluster count. Returns 1 if no split with |
| >=2 clusters is well separated. |
| """ |
| k_hi = min(k_max, n - 1) |
| if k_hi < k_min: |
| return 1 |
| best, best_s = 1, -1.0 |
| for k in range(k_min, k_hi + 1): |
| labels = AgglomerativeClustering(n_clusters=k, metric='precomputed', |
| linkage='average').fit_predict(dist) |
| if len(set(labels)) < 2: |
| continue |
| try: |
| s = silhouette_score(dist, labels, metric='precomputed') |
| except Exception: |
| continue |
| if s > best_s: |
| best_s, best = s, k |
| return best |
|
|
| def discriminative_label(inside: np.ndarray, outside, terms: np.ndarray, |
| used: set, top_n: int = 2) -> str: |
| """Label a cluster by the terms that most separate it from its siblings. |
| |
| inside = mean TF-IDF vector of the cluster's members |
| outside = mean TF-IDF vector of the sibling pool (or 0 if none) |
| """ |
| scores = inside - (outside if outside is not None else 0) |
| picks: list = [] |
| for i in np.argsort(scores)[::-1]: |
| term = terms[i] |
| if len(term) <= 2 or scores[i] <= 0 or term in used: |
| continue |
| picks.append(term) |
| if len(picks) >= top_n: |
| break |
| if not picks: |
| for i in np.argsort(inside)[::-1]: |
| if len(terms[i]) > 2: |
| picks = [terms[i]] |
| break |
| return ' / '.join(p.title() for p in picks) if picks else 'Group' |
|
|
| |
| |
| |
| def _nmap(nodes: list) -> dict: |
| return {int(n['id']): n for n in nodes} |
|
|
| def _next_id(nodes: list) -> int: |
| return max((int(n['id']) for n in nodes), default=0) + 1 |
|
|
| def _add_child(nodes: list, parent_id: int, child_id: int): |
| m = _nmap(nodes) |
| p = m.get(int(parent_id)) |
| if p is None: |
| return |
| rel = list(p.get('related', [])) |
| if int(child_id) not in rel: |
| rel.append(int(child_id)) |
| p['related'] = rel |
|
|
| def _make_agg(nid: int, name: str, desc: str = '') -> dict: |
| return {'id': int(nid), 'name': str(name), 'related': [], |
| 'type': 'aggregation', 'isShown': True, 'desc': desc, 'dtype': 'determine'} |
|
|
| def _leaf_ids(nodes: list, nid: int) -> list: |
| m = _nmap(nodes) |
| out: list = [] |
| def rec(x): |
| n = m.get(int(x)) |
| if not n: |
| return |
| if n.get('type') == 'attribute': |
| out.append(int(x)) |
| return |
| for c in n.get('related', []): |
| rec(int(c)) |
| rec(nid) |
| return list(dict.fromkeys(out)) |
|
|
| def build_hierarchy(can: pd.DataFrame, project: str = 'project', |
| max_depth: int = 3, min_cluster_size: int = 6, |
| branch_max: int = 8) -> list: |
| """Pure Taxonomizer construction [TAX]. |
| |
| Builds an abstract-to-concrete taxonomy by recursively clustering the |
| variables' TF-IDF text objects. At each level the number of clusters is |
| chosen by silhouette; each resulting internal node is labelled with the |
| terms that most discriminate its members from their siblings. No group |
| column, no hardcoded patterns are used in construction β so the recovered |
| structure can be fairly evaluated against the original group column. |
| """ |
| |
| nodes: list = [{'id': 0, 'name': project, 'type': 'root', |
| 'dtype': 'root', 'isShown': True, 'related': [], 'desc': 'Root node'}] |
| row_to_node: list = [] |
| for i, (_, r) in enumerate(can.iterrows(), start=1): |
| nodes.append({'id': i, 'name': r['_leaf_label'], 'dtype': 'determine', |
| 'related': [], 'isShown': True, 'type': 'attribute', |
| 'desc': r['_text'], |
| 'metadata': {'leaf_id': r['_leaf_id'], 'group_path': r['_group_path']}}) |
| row_to_node.append(i) |
| row_to_node = np.array(row_to_node) |
|
|
| |
| texts = (can['_leaf_label'].astype(str) + ' . ' + can['_text'].astype(str)).tolist() |
| X, vec = vectorize_texts(texts) |
| Xd = X.toarray() |
| terms = vec.get_feature_names_out() |
| full_dist = cosine_distances(X).astype(float) |
| np.fill_diagonal(full_dist, 0.0) |
|
|
| |
| def attach_leaves(parent_id: int, idx: np.ndarray): |
| for i in idx: |
| _add_child(nodes, parent_id, int(row_to_node[i])) |
|
|
| def recurse(parent_id: int, idx: np.ndarray, depth: int, used: set): |
| n = len(idx) |
| if n <= min_cluster_size or depth >= max_depth: |
| attach_leaves(parent_id, idx) |
| return |
|
|
| sub = full_dist[np.ix_(idx, idx)] |
| k_cap = min(branch_max, max(2, n // min_cluster_size)) |
| k = best_k(sub, n, k_min=2, k_max=k_cap) |
| if k <= 1: |
| attach_leaves(parent_id, idx) |
| return |
|
|
| labels = AgglomerativeClustering(n_clusters=k, metric='precomputed', |
| linkage='average').fit_predict(sub) |
| pool_Xd = Xd[idx] |
| for c in range(k): |
| mask = labels == c |
| members = idx[mask] |
| if len(members) == 0: |
| continue |
| if len(members) == 1: |
| _add_child(nodes, parent_id, int(row_to_node[members[0]])) |
| continue |
| inside = pool_Xd[mask].mean(axis=0) |
| outside = pool_Xd[~mask].mean(axis=0) if (~mask).any() else None |
| label = discriminative_label(inside, outside, terms, used) |
| nid = _next_id(nodes) |
| nodes.append(_make_agg(nid, label, |
| desc=f'Cluster of {len(members)} variables β ' |
| f'discriminative terms: {label}')) |
| _add_child(nodes, parent_id, nid) |
| recurse(nid, members, depth + 1, used | {label.lower()}) |
|
|
| recurse(0, np.arange(len(can)), 0, set()) |
|
|
| for n in nodes: |
| n['related'] = list(dict.fromkeys(int(x) for x in n.get('related', []))) |
| return nodes |
|
|
| |
| |
| |
| def _parent_map(nodes: list) -> dict: |
| pm: dict = {} |
| for n in nodes: |
| for c in n.get('related', []): |
| if int(c) not in pm: |
| pm[int(c)] = int(n['id']) |
| return pm |
|
|
| |
| |
| |
| def _eval_cluster_assignments(nodes: list, can: pd.DataFrame) -> list[int]: |
| """Return predicted cluster id (depth-1 aggregation ancestor) for each row in can.""" |
| pm = _parent_map(nodes) |
| def depth1(nid: int) -> int: |
| |
| while pm.get(nid, -1) not in (-1, 0): |
| nid = pm[nid] |
| return nid |
| lid_to_nid = {n['metadata']['leaf_id']: int(n['id']) |
| for n in nodes if n.get('type') == 'attribute' and 'metadata' in n} |
| return [depth1(lid_to_nid[lid]) if lid in lid_to_nid else -1 |
| for lid in can['_leaf_id']] |
|
|
| def _purity(y_true, y_pred) -> float: |
| from collections import Counter |
| clusters: dict = {} |
| for t, p in zip(y_true, y_pred): |
| clusters.setdefault(p, []).append(t) |
| correct = sum(Counter(v).most_common(1)[0][1] for v in clusters.values()) |
| return correct / max(len(y_true), 1) |
|
|
| def _structural_stats(nodes: list) -> dict: |
| pm = _parent_map(nodes) |
| def depth_of(nid: int) -> int: |
| d = 0 |
| while nid in pm: |
| nid = pm[nid]; d += 1 |
| return d |
| agg = [n for n in nodes if n.get('type') == 'aggregation'] |
| leafs = [n for n in nodes if n.get('type') == 'attribute'] |
| depths = [depth_of(int(n['id'])) for n in leafs] |
| branches = [len(n.get('related', [])) for n in agg] |
| singletons = sum(1 for b in branches if b == 1) |
| return { |
| 'n_aggregation_nodes': len(agg), |
| 'max_depth': int(max(depths, default=0)), |
| 'avg_leaf_depth': round(float(np.mean(depths)), 2) if depths else 0.0, |
| 'avg_branching_factor': round(float(np.mean(branches)), 2) if branches else 0.0, |
| 'singleton_nodes_%': round(100.0 * singletons / max(len(agg), 1), 1), |
| } |
|
|
| def _wrap(text: str, width: int = 70) -> str: |
| """Wrap long hover text onto multiple <br> lines so it never runs off-screen.""" |
| import textwrap |
| text = str(text).replace('<', '<') |
| lines: list = [] |
| for para in text.split('\n'): |
| wrapped = textwrap.wrap(para, width=width) or [''] |
| lines.extend(wrapped) |
| return '<br>'.join(lines) |
|
|
| def plot_sunburst(nodes: list, max_depth: int = 4) -> go.Figure: |
| pm = _parent_map(nodes) |
| ids, labels, parents, values, hover = [], [], [], [], [] |
| for n in nodes: |
| nid = int(n['id']) |
| lc = len(_leaf_ids(nodes, nid)) |
| ids.append(str(nid)) |
| labels.append(str(n.get('name', ''))[:40]) |
| parents.append('' if nid == 0 else str(pm.get(nid, 0))) |
| values.append(max(1, lc)) |
| desc = _wrap(n.get('desc', '')) |
| hover.append(f'<b>{_wrap(n.get("name",""))}</b><br>Type: {n.get("type","")}' |
| f'<br>Variables: {lc}<br><br>{desc}') |
| fig = go.Figure(go.Sunburst( |
| ids=ids, labels=labels, parents=parents, values=values, |
| branchvalues='total', hovertext=hover, hoverinfo='text', |
| maxdepth=max_depth, insidetextorientation='radial', |
| marker=dict(colorscale='Greens', line=dict(width=1, color='white')), |
| )) |
| fig.update_layout(height=700, margin=dict(l=10, r=10, t=40, b=10), |
| title='Click a sector to drill down β click centre to go back') |
| return fig |
|
|
| def plot_treemap(nodes: list) -> go.Figure: |
| pm = _parent_map(nodes) |
| ids, labels, parents, values, hover = [], [], [], [], [] |
| for n in nodes: |
| nid = int(n['id']) |
| lc = len(_leaf_ids(nodes, nid)) |
| ids.append(str(nid)) |
| labels.append(str(n.get('name', ''))[:40]) |
| parents.append('' if nid == 0 else str(pm.get(nid, 0))) |
| values.append(max(1, lc)) |
| desc = _wrap(n.get('desc', '')) |
| hover.append(f'<b>{_wrap(n.get("name",""))}</b><br>Variables: {lc}<br>{desc}') |
| fig = go.Figure(go.Treemap( |
| ids=ids, labels=labels, parents=parents, values=values, |
| branchvalues='total', hovertext=hover, hoverinfo='text', |
| textinfo='label+value', |
| marker=dict(colorscale='Greens', line=dict(width=1, color='white')), |
| )) |
| fig.update_layout(height=700, margin=dict(l=10, r=10, t=10, b=10)) |
| return fig |
|
|
| |
| |
| |
| with st.sidebar: |
| st.header('1. Upload') |
| uploaded = st.file_uploader( |
| 'Upload a metadata file', |
| type=['csv', 'tsv', 'txt', 'xlsx', 'xls', 'json'], |
| accept_multiple_files=False, |
| ) |
| st.header('2. Taxonomizer settings') |
| tx_max_depth = st.slider('Max taxonomy depth', 2, 5, 3, 1, |
| help='How many abstract-to-concrete levels to build') |
| tx_min_size = st.slider('Min cluster size', 3, 20, 6, 1, |
| help='Clusters smaller than this stop splitting (leaves attach directly)') |
| tx_branch = st.slider('Max branches per node', 3, 12, 8, 1, |
| help='Upper bound on clusters per split; the actual number is chosen by silhouette') |
|
|
| st.header('3. Display') |
| max_items = st.slider('Maximum variables', 25, 1200, 300, 25) |
| group_filter = st.text_input('Group filter (optional)', value='', |
| help='Filter rows whose group path contains this text') |
| display_depth = st.slider('Sunburst depth', 2, 6, 4, 1) |
|
|
| |
| |
| |
| if not uploaded: |
| st.info('Upload a metadata CSV / XLSX / JSON file to begin.') |
| st.markdown(""" |
| ### Baseline algorithm β pure Taxonomizer |
| |
| The simplest of the three approaches β no hardcoded domain patterns, no |
| external APIs, no neural embeddings. Works on any dataset. |
| |
| | Step | Method | Paper | |
| |------|--------|-------| |
| | Text object | Concatenate all metadata fields per variable | Goncalves et al. | |
| | Representation | TF-IDF (generic English stop-words only) | Goncalves et al. | |
| | Hierarchy construction | Recursive agglomerative clustering (cosine), #clusters chosen by silhouette | Taxonomizer (Sultanum et al.) | |
| | Node labelling | Most discriminative terms of each cluster vs its siblings | Taxonomizer / HiExpan | |
| |
| The group column is **not** used for construction, so the recovered taxonomy |
| can be fairly evaluated against it (NMI / ARI / Purity in the Evaluation tab). |
| |
| **Approach 1** adds SBERT embeddings + Wikidata/BioPortal enrichment + HiExpan refinement. |
| |
| **Approach 2** adds NMF/FASTopic aspect discovery + GMM clustering + optional LLM labels. |
| """) |
| st.stop() |
|
|
| path = save_upload(uploaded) |
|
|
| @st.cache_data(show_spinner=False) |
| def _load_profile(path_str: str): |
| df = load_any(Path(path_str)) |
| cfg, prof = detect_roles(df) |
| return df, cfg, prof |
|
|
| with st.spinner('Loading fileβ¦'): |
| df, auto_cfg, prof = _load_profile(str(path)) |
|
|
| st.subheader('Step 1 β File preview') |
| with st.expander(f'π {uploaded.name} ({len(df):,} rows, {len(df.columns)} columns)', |
| expanded=False): |
| st.dataframe(df.head(10), use_container_width=True) |
| score_cols = [c for c in ['column', 'leaf_score', 'group_score', 'text_score', 'metadata_score'] |
| if c in prof.columns] |
| st.dataframe(prof[score_cols].sort_values('leaf_score', ascending=False), |
| use_container_width=True) |
|
|
| st.subheader('Step 2 β Confirm column roles') |
| cols = list(df.columns) |
| with st.expander('Column configuration', expanded=True): |
| left, right = st.columns(2) |
| with left: |
| leaf_cols = st.multiselect('Leaf variable column(s)', cols, |
| default=[c for c in auto_cfg.get('leaf_cols', []) if c in cols], key='leaf') |
| group_cols = st.multiselect('Group/task column(s)', cols, |
| default=[c for c in auto_cfg.get('group_cols', []) if c in cols], key='group') |
| with right: |
| text_cols = st.multiselect('Text/description column(s)', cols, |
| default=[c for c in auto_cfg.get('text_cols', []) if c in cols], key='text') |
| meta_cols = st.multiselect('Metadata/type column(s)', cols, |
| default=[c for c in auto_cfg.get('metadata_cols', []) if c in cols], key='meta') |
|
|
| if not leaf_cols: |
| st.error('Choose at least one leaf variable column.') |
| st.stop() |
|
|
| cfg = {'leaf_cols': leaf_cols, 'group_cols': group_cols, |
| 'text_cols': text_cols, 'metadata_cols': meta_cols} |
|
|
| if st.button('Build baseline hierarchy', type='primary'): |
| with st.spinner('Building hierarchyβ¦'): |
| _can = build_canonical(df, cfg, source=Path(uploaded.name).stem) |
|
|
| if group_filter.strip(): |
| _can = _can[_can['_group_path'].str.contains( |
| group_filter.strip(), case=False, na=False)].copy() |
|
|
| if len(_can) > max_items: |
| _can = _can.head(max_items).copy() |
|
|
| _can = _can.reset_index(drop=True) |
|
|
| if len(_can) < 2: |
| st.error('Need at least 2 variables after filtering.') |
| st.stop() |
|
|
| _pname = Path(uploaded.name).stem |
| _nodes = build_hierarchy(_can, project=_pname, |
| max_depth=tx_max_depth, |
| min_cluster_size=tx_min_size, |
| branch_max=tx_branch) |
|
|
| st.session_state['_bl_nodes'] = _nodes |
| st.session_state['_bl_can'] = _can |
| st.session_state['_bl_project'] = _pname |
|
|
| if '_bl_nodes' not in st.session_state: |
| st.info('Configure columns above then click **Build baseline hierarchy**.') |
| st.stop() |
|
|
| nodes = st.session_state['_bl_nodes'] |
| can = st.session_state['_bl_can'] |
| project_name = st.session_state['_bl_project'] |
|
|
| _sm = _structural_stats(nodes) |
| n_leaves = len([n for n in nodes if n['type'] == 'attribute']) |
| n_internal = len([n for n in nodes if n['type'] == 'aggregation']) |
|
|
| st.divider() |
| c1, c2, c3, c4 = st.columns(4) |
| c1.metric('Variables', n_leaves) |
| c2.metric('Aggregation nodes', n_internal) |
| c3.metric('Max depth', _sm['max_depth']) |
| c4.metric('Avg branching', _sm['avg_branching_factor']) |
|
|
| tabs = st.tabs(['Sunburst', 'Treemap', 'Node detail', 'Canonical table', 'Export', 'π Evaluation']) |
|
|
| with tabs[0]: |
| st.plotly_chart(plot_sunburst(nodes, max_depth=display_depth), use_container_width=True) |
| st.caption('Green = Baseline. Click a sector to drill down; click the centre to go back.') |
|
|
| with tabs[1]: |
| st.plotly_chart(plot_treemap(nodes), use_container_width=True) |
|
|
| with tabs[2]: |
| nm = _nmap(nodes) |
| agg_nodes = [n for n in nodes if n['type'] in ('aggregation', 'root')] |
| options = [f'{n["name"]} [{len(_leaf_ids(nodes, int(n["id"])))} vars]' |
| for n in agg_nodes] |
| if options: |
| sel = st.selectbox('Select a node', options) |
| sel_name = sel.split(' [')[0] |
| sel_node = next((n for n in agg_nodes if n['name'] == sel_name), None) |
| if sel_node: |
| lids = _leaf_ids(nodes, int(sel_node['id'])) |
| leaf_ids_set = {nm[i]['metadata']['leaf_id'] |
| for i in lids if i in nm and 'metadata' in nm[i]} |
| sub = can[can['_leaf_id'].isin(leaf_ids_set)] |
| st.write(f'**{len(lids)} variables** under "{sel_node["name"]}"') |
| st.dataframe(sub[['_leaf_label', '_group_path', '_text']].reset_index(drop=True), |
| use_container_width=True) |
|
|
| with tabs[3]: |
| st.dataframe(can, use_container_width=True) |
|
|
| with tabs[4]: |
| _base = safe_name(project_name) |
| col1, col2 = st.columns(2) |
| with col1: |
| st.download_button( |
| 'Hierarchy JSON', |
| data=json.dumps(nodes, indent=2, ensure_ascii=False).encode('utf-8'), |
| file_name=f'{_base}_baseline_hierarchy.json', |
| mime='application/json', |
| use_container_width=True, |
| ) |
| with col2: |
| st.download_button( |
| 'Canonical CSV', |
| data=can.to_csv(index=False).encode('utf-8'), |
| file_name=f'{_base}_baseline_canonical.csv', |
| mime='text/csv', |
| use_container_width=True, |
| ) |
|
|
| st.divider() |
| |
| _out_dir = Path(__file__).resolve().parent / 'outputs' / 'baseline' |
| st.markdown('### Save to project folder') |
| st.caption( |
| 'The download buttons above go to your browserβs Downloads folder (a browser ' |
| f'restriction). This button instead writes the files into `{_out_dir}` with the ' |
| 'dataset name β convenient for `evaluate_all.py`.' |
| ) |
| if st.button('πΎ Save all to outputs/baseline/', type='primary', |
| use_container_width=True): |
| try: |
| _out_dir.mkdir(parents=True, exist_ok=True) |
| (_out_dir / f'{_base}_baseline_hierarchy.json').write_text( |
| json.dumps(nodes, indent=2, ensure_ascii=False), encoding='utf-8') |
| can.to_csv(_out_dir / f'{_base}_baseline_canonical.csv', index=False) |
| st.success(f'Saved to `{_out_dir}`:\n\n' |
| f'- {_base}_baseline_hierarchy.json\n' |
| f'- {_base}_baseline_canonical.csv') |
| except Exception as _e: |
| st.error(f'Could not save: {_e}') |
|
|
| with tabs[5]: |
| import hierarchy_eval as he |
|
|
| st.subheader('Hierarchy Quality Evaluation') |
| st.caption( |
| 'The group column is a *construction input* (GonΓ§alves text object), so it ' |
| 'cannot serve as ground truth. The primary metrics below are **reference-free** ' |
| 'β they assess the hierarchy itself, with no gold standard.' |
| ) |
|
|
| with st.spinner('Computing reference-free metricsβ¦'): |
| tm = he.traco_metrics(nodes) |
| npmi = he.npmi_coherence(nodes, can['_text'].tolist()) |
|
|
| |
| st.markdown('#### Primary β reference-free hierarchy quality') |
| p1, p2, p3 = st.columns(3) |
| p1.metric('Parentβchild coherence', tm['pc_coherence'], |
| help='TraCo (Wu et al., AAAI 2024). Mean similarity of each node to its parent. ' |
| 'Higher = children correctly nest under their parent theme.') |
| p2.metric('Sibling diversity', tm['sibling_diversity'], |
| help='TraCo (Wu et al., AAAI 2024). Mean distance between sibling nodes. ' |
| 'Higher = siblings are distinct (LOW = redundant/repeated siblings).') |
| p3.metric('NPMI label coherence', npmi, |
| help='Lau et al., EACL 2014. Whether node-label terms genuinely co-occur in the ' |
| 'data. Higher = meaningful labels, not arbitrary term salads.') |
| st.caption(f'Embedding backend: **{tm["encoder"]}**. ' |
| 'Coherence & diversity β [β1, 1]; NPMI β β[β1, 1].') |
|
|
| |
| st.markdown('#### Structural statistics') |
| sm = he.structural_stats(nodes) |
| s1, s2, s3, s4, s5 = st.columns(5) |
| s1.metric('Aggregation nodes', sm['n_aggregation_nodes']) |
| s2.metric('Max leaf depth', sm['max_depth']) |
| s3.metric('Avg leaf depth', sm['avg_leaf_depth']) |
| s4.metric('Avg branching', sm['avg_branching_factor']) |
| s5.metric('Singleton nodes', f"{sm['singleton_nodes_%']}%", |
| help='Aggregation nodes with a single child (sparse-hierarchy indicator)') |
|
|
| |
| st.markdown('#### Secondary β group-structure preservation *(descriptive)*') |
| st.caption( |
| 'β οΈ The group column was an **input** to construction, so these are NOT accuracy ' |
| 'metrics β they only describe how much the discovered hierarchy still reflects the ' |
| 'pre-existing group column. High values are expected and not evidence of quality.' |
| ) |
| gp = he.group_preservation(nodes, can) |
| g1, g2, g3 = st.columns(3) |
| g1.metric('NMI', gp['NMI']); g2.metric('ARI', gp['ARI']); g3.metric('Purity', gp['Purity']) |
|
|