import os, random, hashlib, json, re, unicodedata import torch import pandas as pd from flask import Flask, request, jsonify, session, send_from_directory from flask_cors import CORS from werkzeug.middleware.proxy_fix import ProxyFix from sentence_transformers import SentenceTransformer, util import sqlite3 from datetime import datetime try: from dotenv import load_dotenv load_dotenv() except ImportError: pass try: from groq import Groq as _Groq except ImportError: _Groq = None BASE_DIR = os.path.abspath(os.path.dirname(__file__)) DB_PATH = os.path.join(BASE_DIR, "platform.db") EMB_DIR = os.path.join(BASE_DIR, "embeddings") DIST_DIR = os.path.join(BASE_DIR, "frontend", "dist") os.makedirs(EMB_DIR, exist_ok=True) TOP_K_RECOMMANDATIONS = 5 app = Flask(__name__, static_folder=DIST_DIR, static_url_path="") # ── Trust the HF Spaces reverse-proxy headers (X-Forwarded-Proto etc.) ──────── app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) app.secret_key = os.environ.get("SECRET_KEY", "change-me-in-production-32chars!!") # ── Session-cookie settings ─────────────────────────────────────────────────── # HF Spaces serves over HTTPS and embeds the app in an iframe, so we need # SameSite=None + Secure so cookies are sent in cross-site iframe contexts. app.config["SESSION_COOKIE_SECURE"] = True # only sent over HTTPS app.config["SESSION_COOKIE_SAMESITE"] = "None" # required for iframe embedding app.config["SESSION_COOKIE_HTTPONLY"] = True # CORS: dev localhost origins + any explicit override via env var _cors_origins = os.environ.get( "CORS_ORIGINS", "http://localhost:5173,http://localhost:5174,http://localhost:5175,http://localhost:5176" ).split(",") CORS(app, supports_credentials=True, origins=_cors_origins) # ────────────────────────────────────────────── # Sentence-transformer model (shared, load once) # ────────────────────────────────────────────── print("🔄 Chargement du modèle…") try: model = SentenceTransformer( "OrdalieTech/Solon-embeddings-mini-beta-1.1", device="cpu", trust_remote_code=True ) print("✓ Modèle principal (Solon) chargé") except Exception as e: print(f"⚠️ Modèle principal échoué ({e}), bascule sur secours…") model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2", device="cpu") print("✓ Modèle de secours chargé") # ────────────────────────────────────────────── # LLaMA mini fallback via Groq # ────────────────────────────────────────────── _GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "") _GROQ_MODEL = os.environ.get("GROQ_MODEL", "llama-3.1-8b-instant") _groq_client = _Groq(api_key=_GROQ_API_KEY) if (_Groq and _GROQ_API_KEY) else None if _groq_client: print(f"✓ LLaMA mini (Groq / {_GROQ_MODEL}) prêt") else: print("ℹ️ GROQ_API_KEY non défini — fallback LLM désactivé") def _llm_fallback(question: str, org_id: int | None = None) -> str: """Call LLaMA mini via Groq when similarity is too low for a DB match.""" if not _groq_client: return _organisations_message() # Build lightweight org context for the prompt org_context = "" try: with get_db() as conn: if org_id: rows = conn.execute( "SELECT name FROM organisations WHERE id=?", (org_id,) ).fetchall() else: rows = conn.execute("SELECT name FROM organisations").fetchall() org_names = [r["name"] for r in rows] if org_names: org_context = ( "Les organisations disponibles sur cette plateforme sont : " + ", ".join(org_names) + "." ) except Exception: pass system_prompt = ( "Tu es globalAdministration-Chatbot, un assistant virtuel spécialisé dans les services administratifs " "et bancaires en Algérie (banques, poste, sécurité sociale, mairie, etc.). " "Ton nom est globalAdministration-Chatbot, ne mentionne jamais d'autre nom. " + org_context + " " "Réponds de manière concise, utile et en français. " "Si la question sort totalement de ce domaine, oriente poliment l'utilisateur " "vers les services disponibles." ) try: chat = _groq_client.chat.completions.create( model=_GROQ_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": question}, ], max_tokens=512, temperature=0.4, ) return chat.choices[0].message.content.strip() except Exception as e: print(f"⚠️ Groq LLM error: {e}") return _organisations_message() # ────────────────────────────────────────────── # Database helpers # ────────────────────────────────────────────── def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): with get_db() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS organisations ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, slug TEXT NOT NULL UNIQUE, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, org_id INTEGER REFERENCES organisations(id), username TEXT NOT NULL UNIQUE, password TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'admin', created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS services ( id INTEGER PRIMARY KEY AUTOINCREMENT, org_id INTEGER NOT NULL REFERENCES organisations(id), name TEXT NOT NULL, description TEXT, link TEXT, latitude REAL, longitude REAL, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, org_id INTEGER NOT NULL REFERENCES organisations(id), processus TEXT, procedure TEXT, intent TEXT, sub_intent TEXT, question TEXT NOT NULL, response TEXT NOT NULL, service TEXT, service_link TEXT, latitude REAL, longitude REAL, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); """) print("✓ Base de données initialisée") def hash_password(pw: str) -> str: return hashlib.sha256(pw.encode()).hexdigest() # ────────────────────────────────────────────── # Text normalisation # ────────────────────────────────────────────── def normalise(text: str) -> str: """Lowercase, strip accents, collapse punctuation/whitespace.""" text = text.lower().strip() # remove accents text = unicodedata.normalize("NFD", text) text = "".join(c for c in text if unicodedata.category(c) != "Mn") # collapse punctuation to space text = re.sub(r"[^\w\s]", " ", text) text = re.sub(r"\s+", " ", text).strip() return text def clean_question(q: str) -> str: """Strip the trailing synthetic index number (e.g. ' 42') from dataset questions.""" return re.sub(r'\s+\d+\s*$', '', (q or '').strip()) def build_index_text(row) -> str: """ Index text for embedding. For this dataset, questions follow the template 'Comment faire [Procedure] [Processus] N', so: - strip the trailing number to reduce noise - append Processus and Procedure (short, highly discriminating) so queries like 'bloquer paiement' rank 'Paiement/Blocage' entries highest """ q = clean_question(row["question"] or "") proc = (row["processus"] or "").strip() procd = (row["procedure"] or "").strip() parts = [q] if proc: parts.append(proc) if procd: parts.append(procd) return " | ".join(parts) # ────────────────────────────────────────────── # Embedding cache (per-org, keyed on row_ids) # ────────────────────────────────────────────── _emb_cache: dict[int, tuple[torch.Tensor, list[int], int]] = {} def get_org_embeddings(org_id: int): """Return (tensor, row_ids_list) for an org, recompute if stale.""" with get_db() as conn: rows = conn.execute( "SELECT id, question, processus, procedure, intent, sub_intent, response " "FROM entries WHERE org_id=? ORDER BY id", (org_id,) ).fetchall() if not rows: return None, [] row_ids = [r["id"] for r in rows] texts = [build_index_text(r) for r in rows] cache_key = hash(tuple(row_ids)) if org_id in _emb_cache and _emb_cache[org_id][2] == cache_key: return _emb_cache[org_id][0], _emb_cache[org_id][1] # try to load from disk emb_file = os.path.join(EMB_DIR, f"org_{org_id}.pt") meta_file = os.path.join(EMB_DIR, f"org_{org_id}_meta.json") if os.path.exists(emb_file) and os.path.exists(meta_file): with open(meta_file) as f: meta = json.load(f) if meta.get("row_ids") == row_ids and meta.get("v", 1) >= 4: emb = torch.load(emb_file, map_location="cpu", weights_only=True) _emb_cache[org_id] = (emb, row_ids, cache_key) return emb, row_ids # recompute print(f"🔨 Recalcul embeddings org {org_id} ({len(rows)} entrées)…") emb = model.encode(texts, convert_to_tensor=True, normalize_embeddings=True, batch_size=64, show_progress_bar=False) torch.save(emb, emb_file) with open(meta_file, "w") as f: json.dump({"row_ids": row_ids, "v": 4}, f) _emb_cache[org_id] = (emb, row_ids, cache_key) return emb, row_ids def invalidate_org_cache(org_id: int): _emb_cache.pop(org_id, None) for ext in [".pt", "_meta.json"]: p = os.path.join(EMB_DIR, f"org_{org_id}{ext}") if os.path.exists(p): os.remove(p) # ────────────────────────────────────────────── # Core search # ────────────────────────────────────────────── # Confidence thresholds – raw cosine similarity (0-1 scale) _CONF_THRESHOLD = 0.52 # minimum raw cosine to show a direct answer def _display_confidence(raw_cosine: float) -> float: """Convert raw cosine similarity to a user-friendly display percentage. Mapping: raw < threshold → 0.0 (fallback path will be taken anyway) raw = threshold (0.52) → 50.0 % raw = 0.70 → ~59 % raw = 0.90 → ~71 % raw = 1.00 → 100 % This ensures every shown answer reads ≥ 50 % while unmatched queries read 0 %, making the two cases visually distinct. """ c = max(0.0, min(1.0, raw_cosine)) if c < _CONF_THRESHOLD: return 0.0 return round((c - _CONF_THRESHOLD) / (1.0 - _CONF_THRESHOLD) * 50.0 + 50.0, 1) # Keep legacy names so existing threshold comparisons still work CONFIDENCE_GOOD = int(_CONF_THRESHOLD * 100) # 40 CONFIDENCE_LOW = int(_CONF_THRESHOLD * 100) # 40 def _organisations_message() -> str: """Build a human-readable list of available organisations and their domains.""" with get_db() as conn: org_rows = conn.execute( "SELECT o.name, GROUP_CONCAT(DISTINCT e.processus) AS processus_list " "FROM organisations o " "LEFT JOIN entries e ON e.org_id = o.id " "GROUP BY o.id ORDER BY o.name" ).fetchall() if not org_rows: return "Je n'ai pas trouvé de réponse précise à votre question." lines = [] for o in org_rows: procs = [ p.strip() for p in (o["processus_list"] or "").split(",") if p.strip() and p.strip().lower() not in ("none", "nan", "null", "") ] unique_procs = list(dict.fromkeys(procs))[:5] if unique_procs: lines.append(f"• {o['name']} : {', '.join(unique_procs)}") else: lines.append(f"• {o['name']}") return ( "Je n'ai pas compris votre question. " "Voici les organisations disponibles et leurs domaines :\n\n" + "\n".join(lines) + "\n\nPosez-moi une question sur l'une de ces organisations pour que je puisse vous aider." ) _GREETINGS = re.compile( r"^\s*(bonjour|bonsoir|salut|salam|hello|hi|hey|coucou|bonne?\s*journ[ée]e?|" r"bonne?\s*soir[ée]e?|bonne?\s*nuit|[aà]\s*bient[oô]t|au\s*revoir|bye|" r"merci|shukran|s[îi]l\s*(vous|te)\s*pla[iî]t|svp|bonne?\s*chance|" r"comment\s+[çc]a\s+va|[çc]a\s+va|good\s*(morning|evening|night|afternoon))\s*[!?.]*\s*$", re.IGNORECASE, ) _GREETING_REPLY = ( "Bonjour ! 👋 Je suis globalAdministration-Chatbot, votre assistant virtuel.\n\n" "Je peux répondre à vos questions sur les services proposés par nos organisations partenaires " "(banques, poste, sécurité sociale, mairie…).\n\n" "Comment puis-je vous aider aujourd'hui ?" ) # French stopwords — excluded when deciding whether a query has meaningful content _STOPWORDS_FR = { "les", "des", "une", "est", "que", "qui", "dans", "sur", "par", "pour", "avec", "son", "ses", "leur", "leurs", "aux", "ces", "cet", "cette", "vous", "nous", "ils", "elles", "tout", "tous", "toute", "toutes", "mais", "car", "donc", "puis", "aussi", "tres", "bien", "plus", "non", "pas", "plus", "moi", "toi", "lui", "elle", "soi", "eux", # question helpers (the real semantic word comes after these) "comment", "combien", "quand", "pourquoi", "quoi", "quel", "quelle", "quels", "quelles", "faire", "fais", "fait", "peut", "peux", "dois", "doit", "avoir", "etre", "avez", "avons", "sont", "serait", "sera", "ont", "une", "mon", "ton", "notre", "votre", } def process_question(question: str, org_id: int | None = None): """Search across all orgs (or a specific org) and return the best match.""" with get_db() as conn: if org_id: orgs = conn.execute("SELECT id FROM organisations WHERE id=?", (org_id,)).fetchall() else: orgs = conn.execute("SELECT id FROM organisations").fetchall() # ── Intercept greetings / social phrases ──────────────────────────────── if _GREETINGS.match(question.strip()): return { "response": _GREETING_REPLY, "confidence": 100, "matched": "—", "intent": "Salutation", "org": None, "org_id": None, "recs": [], "service": None, "link": None, "lat": None, "lon": None, } # Reject queries that are too short or contain no real word (≥ 3 chars) norm_q = normalise(question) real_words = [w for w in norm_q.split() if len(w) >= 3 and w not in _STOPWORDS_FR] if len(norm_q) < 4 or not real_words: return { "response": _llm_fallback(question, org_id), "confidence": 0, "matched": "—", "intent": "LLM", "org": None, "org_id": None, "recs": [], "service": None, "link": None, "lat": None, "lon": None, } # Encode both raw and normalised variants for better recall emb_raw = model.encode(question, convert_to_tensor=True, normalize_embeddings=True) emb_norm = model.encode(norm_q, convert_to_tensor=True, normalize_embeddings=True) # Use the mean of the two embeddings as the query vector emb_q = torch.nn.functional.normalize(emb_raw + emb_norm, dim=0) best_score = -1.0 best_entry = None best_org_id = None all_candidates: list[tuple[float, int, int]] = [] # (score, entry_id, org_id) for org in orgs: oid = org["id"] emb_base, row_ids = get_org_embeddings(oid) if emb_base is None: continue scores = util.pytorch_cos_sim(emb_q, emb_base)[0] n_cands = min(TOP_K_RECOMMANDATIONS * 3, len(row_ids)) top_idx = torch.topk(scores, n_cands).indices.tolist() for idx in top_idx: all_candidates.append((float(scores[idx]), row_ids[idx], oid)) idx_best = int(torch.argmax(scores)) s = float(scores[idx_best]) if s > best_score: best_score = s best_entry = row_ids[idx_best] best_org_id = oid confidence = _display_confidence(best_score) # ── fetch helper ──────────────────────────────────────────────────────── def fetch_entry(eid: int): with get_db() as conn: return conn.execute( "SELECT e.*, o.name AS org_name FROM entries e " "JOIN organisations o ON o.id = e.org_id WHERE e.id=?", (eid,) ).fetchone() def clean(v): s = str(v).strip() if v is not None else None return None if not s or s.lower() in ("nan", "none", "null", "") else s # ── Recommendations from the same org, diverse intents ────────────────── same_org = [(sc, eid) for sc, eid, oid in all_candidates if oid == best_org_id] same_org.sort(key=lambda x: x[0], reverse=True) rec_ids: list[int] = [] seen_ids: set[int] = {best_entry} if best_entry else set() intent_counts: dict[str, int] = {} # intent -> count (max 2 per intent) for sc, eid in same_org: if eid in seen_ids: continue r = fetch_entry(eid) if not r: continue intent = (r["intent"] or "").strip().lower() if intent_counts.get(intent, 0) >= 2: continue rec_ids.append(eid) seen_ids.add(eid) intent_counts[intent] = intent_counts.get(intent, 0) + 1 if len(rec_ids) >= TOP_K_RECOMMANDATIONS: break recs = [] for eid in rec_ids: r = fetch_entry(eid) if r: recs.append({ "question": r["question"], "intent": r["intent"] or "", "processus": r["processus"] or "", "org": r["org_name"], }) # ── No match → LLaMA mini fallback ───────────────────────────────────── # confidence == 0 iff raw_score < _CONF_THRESHOLD (see _display_confidence) if confidence == 0 or best_entry is None: llm_response = _llm_fallback(question, org_id) return { "response": llm_response, "confidence": confidence, "matched": "—", "intent": "LLM", "org": None, "org_id": None, "recs": recs, "service": None, "link": None, "lat": None, "lon": None, } row = fetch_entry(best_entry) # ── Direct answer ──────────────────────────────────────────────────────── response_text = row["response"] return { "response": response_text, "confidence": confidence, "matched": clean_question(row["question"]), "intent": row["intent"] or "", "org": row["org_name"], "org_id": row["org_id"], "recs": recs, "service": clean(row["service"]), "link": clean(row["service_link"]), "lat": row["latitude"], "lon": row["longitude"], } # ────────────────────────────────────────────── # Auth helpers # ────────────────────────────────────────────── def current_user(): uid = session.get("user_id") if not uid: return None with get_db() as conn: return conn.execute( "SELECT u.*, o.name as org_name, o.slug as org_slug " "FROM users u LEFT JOIN organisations o ON o.id=u.org_id WHERE u.id=?", (uid,) ).fetchone() def login_required(fn): from functools import wraps @wraps(fn) def wrapper(*args, **kwargs): if not current_user(): return jsonify({"error": "Non authentifié"}), 401 return fn(*args, **kwargs) return wrapper def superadmin_required(fn): from functools import wraps @wraps(fn) def wrapper(*args, **kwargs): u = current_user() if not u: return jsonify({"error": "Non authentifié"}), 401 if u["role"] != "superadmin": return jsonify({"error": "Accès refusé"}), 403 return fn(*args, **kwargs) return wrapper # ══════════════════════════════════════════════ # PUBLIC ROUTES # ══════════════════════════════════════════════ @app.route("/ask", methods=["POST"]) def ask(): data = request.get_json(force=True) question = (data.get("question") or "").strip() if not question: return jsonify({"error": "Question vide"}), 400 return jsonify(process_question(question)) @app.route("/api/services", methods=["GET"]) def get_services(): org_id = request.args.get("org_id", type=int) with get_db() as conn: if org_id: rows = conn.execute( "SELECT e.service, e.latitude, e.longitude, e.processus, e.service_link, o.name as org_name " "FROM entries e JOIN organisations o ON o.id=e.org_id " "WHERE e.org_id=? AND e.service IS NOT NULL AND e.latitude IS NOT NULL AND e.longitude IS NOT NULL", (org_id,) ).fetchall() else: rows = conn.execute( "SELECT e.service, e.latitude, e.longitude, e.processus, e.service_link, o.name as org_name " "FROM entries e JOIN organisations o ON o.id=e.org_id " "WHERE e.service IS NOT NULL AND e.latitude IS NOT NULL AND e.longitude IS NOT NULL" ).fetchall() services = [] seen = set() for r in rows: name = (r["service"] or "").strip() if not name or name in seen: continue try: services.append({ "service": name, "lat": float(r["latitude"]), "lon": float(r["longitude"]), "processus": r["processus"] or "", "link": r["service_link"] or "", "org": r["org_name"], }) seen.add(name) except Exception: pass return jsonify({"status": "success", "services": services}) @app.route("/api/organisations", methods=["GET"]) def list_organisations(): with get_db() as conn: rows = conn.execute( "SELECT o.id, o.name, o.slug, o.created_at, " "COUNT(DISTINCT e.id) as entry_count, " "COUNT(DISTINCT s.id) as service_count " "FROM organisations o " "LEFT JOIN entries e ON e.org_id = o.id " "LEFT JOIN services s ON s.org_id = o.id " "GROUP BY o.id ORDER BY o.name" ).fetchall() return jsonify([dict(r) for r in rows]) @app.route("/api/organisations//services", methods=["GET"]) def org_services(org_id): with get_db() as conn: org = conn.execute("SELECT id, name FROM organisations WHERE id=?", (org_id,)).fetchone() if not org: return jsonify({"error": "Organisation introuvable"}), 404 rows = conn.execute( "SELECT * FROM services WHERE org_id=? ORDER BY name", (org_id,) ).fetchall() return jsonify({"org": dict(org), "services": [dict(r) for r in rows]}) # ══════════════════════════════════════════════ # AUTH ROUTES # ══════════════════════════════════════════════ @app.route("/api/auth/login", methods=["POST"]) def login(): data = request.get_json(force=True) username = (data.get("username") or "").strip() password = (data.get("password") or "").strip() if not username or not password: return jsonify({"error": "Identifiants manquants"}), 400 with get_db() as conn: user = conn.execute( "SELECT u.*, o.name as org_name, o.slug as org_slug " "FROM users u LEFT JOIN organisations o ON o.id=u.org_id " "WHERE u.username=? AND u.password=?", (username, hash_password(password)) ).fetchone() if not user: return jsonify({"error": "Identifiants incorrects"}), 401 session["user_id"] = user["id"] return jsonify({ "id": user["id"], "username": user["username"], "role": user["role"], "org_id": user["org_id"], "org_name": user["org_name"], "org_slug": user["org_slug"], }) @app.route("/api/auth/logout", methods=["POST"]) def logout(): session.clear() return jsonify({"ok": True}) @app.route("/api/auth/me", methods=["GET"]) def me(): u = current_user() if not u: return jsonify({"error": "Non authentifié"}), 401 return jsonify({ "id": u["id"], "username": u["username"], "role": u["role"], "org_id": u["org_id"], "org_name": u["org_name"], "org_slug": u["org_slug"], }) # ══════════════════════════════════════════════ # ADMIN ROUTES (org-admin only) # ══════════════════════════════════════════════ @app.route("/api/admin/entries", methods=["GET"]) @login_required def admin_list_entries(): u = current_user() with get_db() as conn: rows = conn.execute( "SELECT * FROM entries WHERE org_id=? ORDER BY id DESC", (u["org_id"],) ).fetchall() return jsonify([dict(r) for r in rows]) @app.route("/api/admin/entries", methods=["POST"]) @login_required def admin_add_entry(): u = current_user() d = request.get_json(force=True) required = ("question", "response") for field in required: if not (d.get(field) or "").strip(): return jsonify({"error": f"Champ obligatoire manquant : {field}"}), 400 def lat_lon(v): try: return float(str(v).replace(',', '.')) except Exception: return None with get_db() as conn: cur = conn.execute( """INSERT INTO entries (org_id, processus, procedure, intent, sub_intent, question, response, service, service_link, latitude, longitude) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", ( u["org_id"], (d.get("processus") or "").strip() or None, (d.get("procedure") or "").strip() or None, (d.get("intent") or "").strip() or None, (d.get("sub_intent") or "").strip() or None, d["question"].strip(), d["response"].strip(), (d.get("service") or "").strip() or None, (d.get("service_link") or "").strip() or None, lat_lon(d.get("latitude")), lat_lon(d.get("longitude")), ) ) new_id = cur.lastrowid invalidate_org_cache(u["org_id"]) return jsonify({"ok": True, "id": new_id}), 201 @app.route("/api/admin/entries/", methods=["PUT"]) @login_required def admin_update_entry(entry_id): u = current_user() d = request.get_json(force=True) with get_db() as conn: row = conn.execute( "SELECT id FROM entries WHERE id=? AND org_id=?", (entry_id, u["org_id"]) ).fetchone() if not row: return jsonify({"error": "Entrée introuvable"}), 404 def lat_lon(v): try: return float(str(v).replace(',', '.')) except: return None conn.execute( """UPDATE entries SET processus=?, procedure=?, intent=?, sub_intent=?, question=?, response=?, service=?, service_link=?, latitude=?, longitude=? WHERE id=?""", ( (d.get("processus") or "").strip() or None, (d.get("procedure") or "").strip() or None, (d.get("intent") or "").strip() or None, (d.get("sub_intent") or "").strip() or None, (d.get("question") or "").strip(), (d.get("response") or "").strip(), (d.get("service") or "").strip() or None, (d.get("service_link") or "").strip() or None, lat_lon(d.get("latitude")), lat_lon(d.get("longitude")), entry_id, ) ) invalidate_org_cache(u["org_id"]) return jsonify({"ok": True}) @app.route("/api/admin/entries/", methods=["DELETE"]) @login_required def admin_delete_entry(entry_id): u = current_user() with get_db() as conn: row = conn.execute( "SELECT id FROM entries WHERE id=? AND org_id=?", (entry_id, u["org_id"]) ).fetchone() if not row: return jsonify({"error": "Entrée introuvable"}), 404 conn.execute("DELETE FROM entries WHERE id=?", (entry_id,)) invalidate_org_cache(u["org_id"]) return jsonify({"ok": True}) @app.route("/api/admin/entries/import", methods=["POST"]) @login_required def admin_import_csv(): """Accept a CSV upload and bulk-insert rows for this org.""" u = current_user() if "file" not in request.files: return jsonify({"error": "Aucun fichier fourni"}), 400 f = request.files["file"] try: try: df = pd.read_csv(f, sep=";", encoding="latin-1") except Exception: f.seek(0) df = pd.read_csv(f, sep=None, engine="python", encoding="utf-8") df.columns = df.columns.str.strip() except Exception as e: return jsonify({"error": f"Erreur lecture CSV : {e}"}), 400 def col(row, *names): for n in names: if n in df.columns: v = str(row[n]).strip() return None if v.lower() in ('nan','none','null','') else v return None def lat_lon(v): try: return float(str(v).replace(',','.')) except: return None inserted = 0 with get_db() as conn: for _, row in df.iterrows(): q = col(row, "Question") r = col(row, "Response") if not q or not r: continue conn.execute( """INSERT INTO entries (org_id,processus,procedure,intent,sub_intent, question,response,service,service_link,latitude,longitude) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", ( u["org_id"], col(row,"Processus"), col(row,"Procedure"), col(row,"Intent"), col(row,"SubIntent"), q, r, col(row,"Service"), col(row,"ServiceLink"), lat_lon(col(row,"Latitude")), lat_lon(col(row,"Longitude")), ) ) inserted += 1 invalidate_org_cache(u["org_id"]) return jsonify({"ok": True, "inserted": inserted}) # ── Org-admin: Services CRUD ───────────────────────────────────────────────── @app.route("/api/admin/services", methods=["GET"]) @login_required def admin_list_services(): u = current_user() with get_db() as conn: rows = conn.execute( "SELECT * FROM services WHERE org_id=? ORDER BY name", (u["org_id"],) ).fetchall() return jsonify([dict(r) for r in rows]) @app.route("/api/admin/services", methods=["POST"]) @login_required def admin_add_service(): u = current_user() d = request.get_json(force=True) name = (d.get("name") or "").strip() if not name: return jsonify({"error": "Le nom du service est obligatoire"}), 400 def lat_lon(v): try: return float(str(v).replace(',', '.')) except: return None with get_db() as conn: cur = conn.execute( "INSERT INTO services (org_id, name, description, link, latitude, longitude) VALUES (?,?,?,?,?,?)", (u["org_id"], name, (d.get("description") or "").strip() or None, (d.get("link") or "").strip() or None, lat_lon(d.get("latitude")), lat_lon(d.get("longitude"))) ) new_id = cur.lastrowid return jsonify({"ok": True, "id": new_id}), 201 @app.route("/api/admin/services/", methods=["PUT"]) @login_required def admin_update_service(service_id): u = current_user() d = request.get_json(force=True) with get_db() as conn: row = conn.execute( "SELECT id FROM services WHERE id=? AND org_id=?", (service_id, u["org_id"]) ).fetchone() if not row: return jsonify({"error": "Service introuvable"}), 404 def lat_lon(v): try: return float(str(v).replace(',', '.')) except: return None conn.execute( "UPDATE services SET name=?, description=?, link=?, latitude=?, longitude=? WHERE id=?", ( (d.get("name") or "").strip(), (d.get("description") or "").strip() or None, (d.get("link") or "").strip() or None, lat_lon(d.get("latitude")), lat_lon(d.get("longitude")), service_id, ) ) return jsonify({"ok": True}) @app.route("/api/admin/services/", methods=["DELETE"]) @login_required def admin_delete_service(service_id): u = current_user() with get_db() as conn: row = conn.execute( "SELECT id FROM services WHERE id=? AND org_id=?", (service_id, u["org_id"]) ).fetchone() if not row: return jsonify({"error": "Service introuvable"}), 404 conn.execute("DELETE FROM services WHERE id=?", (service_id,)) return jsonify({"ok": True}) # ══════════════════════════════════════════════ # SUPER-ADMIN: register org + admin user # ══════════════════════════════════════════════ @app.route("/api/auth/register", methods=["POST"]) def public_register(): """ Public self-registration: any organisation can sign up. Creates the org + first admin user in one step. """ d = request.get_json(force=True) org_name = (d.get("org_name") or "").strip() org_slug = (d.get("org_slug") or "").strip().lower().replace(" ", "-") username = (d.get("username") or "").strip() password = (d.get("password") or "").strip() if not all([org_name, org_slug, username, password]): return jsonify({"error": "Tous les champs sont obligatoires"}), 400 if len(password) < 6: return jsonify({"error": "Le mot de passe doit contenir au moins 6 caractères"}), 400 if not org_slug.replace("-", "").isalnum(): return jsonify({"error": "L'identifiant d'organisation ne peut contenir que des lettres, chiffres et tirets"}), 400 try: with get_db() as conn: cur = conn.execute( "INSERT INTO organisations (name, slug) VALUES (?,?)", (org_name, org_slug) ) org_id = cur.lastrowid conn.execute( "INSERT INTO users (org_id, username, password, role) VALUES (?,?,?,?)", (org_id, username, hash_password(password), "admin") ) except sqlite3.IntegrityError as e: msg = str(e) if "organisations.name" in msg or "organisations.slug" in msg: return jsonify({"error": "Ce nom ou identifiant d'organisation est déjà utilisé"}), 409 if "users.username" in msg: return jsonify({"error": "Ce nom d'utilisateur est déjà pris"}), 409 return jsonify({"error": "Conflit de données"}), 409 return jsonify({"ok": True, "org_id": org_id}), 201 @app.route("/api/superadmin/register", methods=["POST"]) def superadmin_register(): """ Protected by a static token passed in header X-Admin-Token. Used to create new organisations and their first admin user. """ token = request.headers.get("X-Admin-Token", "") expected = os.environ.get("SUPERADMIN_TOKEN", "superadmin-secret-token") if token != expected: return jsonify({"error": "Non autorisé"}), 403 d = request.get_json(force=True) org_name = (d.get("org_name") or "").strip() org_slug = (d.get("org_slug") or "").strip().lower().replace(" ", "-") username = (d.get("username") or "").strip() password = (d.get("password") or "").strip() if not all([org_name, org_slug, username, password]): return jsonify({"error": "Champs manquants"}), 400 try: with get_db() as conn: cur = conn.execute( "INSERT INTO organisations (name, slug) VALUES (?,?)", (org_name, org_slug) ) org_id = cur.lastrowid conn.execute( "INSERT INTO users (org_id, username, password, role) VALUES (?,?,?,?)", (org_id, username, hash_password(password), "admin") ) except sqlite3.IntegrityError as e: return jsonify({"error": f"Conflit : {e}"}), 409 return jsonify({"ok": True, "org_id": org_id}), 201 # ══════════════════════════════════════════════ # SUPERADMIN DASHBOARD ROUTES (role=superadmin) # ══════════════════════════════════════════════ @app.route("/api/superadmin/organisations", methods=["GET"]) @superadmin_required def superadmin_list_orgs(): with get_db() as conn: rows = conn.execute( "SELECT o.id, o.name, o.slug, o.created_at, " "COUNT(DISTINCT e.id) as entry_count, " "COUNT(DISTINCT s.id) as service_count, " "COUNT(DISTINCT u.id) as user_count " "FROM organisations o " "LEFT JOIN entries e ON e.org_id = o.id " "LEFT JOIN services s ON s.org_id = o.id " "LEFT JOIN users u ON u.org_id = o.id " "GROUP BY o.id ORDER BY o.name" ).fetchall() return jsonify([dict(r) for r in rows]) @app.route("/api/superadmin/organisations", methods=["POST"]) @superadmin_required def superadmin_create_org(): d = request.get_json(force=True) org_name = (d.get("org_name") or "").strip() org_slug = (d.get("org_slug") or "").strip().lower().replace(" ", "-") username = (d.get("username") or "").strip() password = (d.get("password") or "").strip() if not all([org_name, org_slug, username, password]): return jsonify({"error": "Tous les champs sont obligatoires"}), 400 if len(password) < 6: return jsonify({"error": "Mot de passe trop court (min 6 caractères)"}), 400 try: with get_db() as conn: cur = conn.execute( "INSERT INTO organisations (name, slug) VALUES (?,?)", (org_name, org_slug) ) org_id = cur.lastrowid conn.execute( "INSERT INTO users (org_id, username, password, role) VALUES (?,?,?,?)", (org_id, username, hash_password(password), "admin") ) except sqlite3.IntegrityError as e: return jsonify({"error": f"Conflit : {e}"}), 409 return jsonify({"ok": True, "org_id": org_id}), 201 @app.route("/api/superadmin/organisations/", methods=["DELETE"]) @superadmin_required def superadmin_delete_org(org_id): with get_db() as conn: row = conn.execute("SELECT id FROM organisations WHERE id=?", (org_id,)).fetchone() if not row: return jsonify({"error": "Organisation introuvable"}), 404 for oid in [org_id]: invalidate_org_cache(oid) conn.executescript(f""" DELETE FROM services WHERE org_id={org_id}; DELETE FROM entries WHERE org_id={org_id}; DELETE FROM users WHERE org_id={org_id}; DELETE FROM organisations WHERE id={org_id}; """) return jsonify({"ok": True}) @app.route("/api/superadmin/entries", methods=["GET"]) @superadmin_required def superadmin_list_entries(): with get_db() as conn: rows = conn.execute( "SELECT e.*, o.name as org_name FROM entries e " "JOIN organisations o ON o.id = e.org_id ORDER BY e.id DESC" ).fetchall() return jsonify([dict(r) for r in rows]) @app.route("/api/superadmin/entries/", methods=["DELETE"]) @superadmin_required def superadmin_delete_entry(entry_id): with get_db() as conn: row = conn.execute("SELECT org_id FROM entries WHERE id=?", (entry_id,)).fetchone() if not row: return jsonify({"error": "Introuvable"}), 404 org_id = row["org_id"] conn.execute("DELETE FROM entries WHERE id=?", (entry_id,)) invalidate_org_cache(org_id) return jsonify({"ok": True}) @app.route("/api/superadmin/services", methods=["GET"]) @superadmin_required def superadmin_list_services(): with get_db() as conn: rows = conn.execute( "SELECT s.*, o.name as org_name FROM services s " "JOIN organisations o ON o.id = s.org_id ORDER BY o.name, s.name" ).fetchall() return jsonify([dict(r) for r in rows]) @app.route("/api/superadmin/services/", methods=["DELETE"]) @superadmin_required def superadmin_delete_service(service_id): with get_db() as conn: row = conn.execute("SELECT id FROM services WHERE id=?", (service_id,)).fetchone() if not row: return jsonify({"error": "Introuvable"}), 404 conn.execute("DELETE FROM services WHERE id=?", (service_id,)) return jsonify({"ok": True}) # ────────────────────────────────────────────── # Serve React SPA (catch-all for client-side routing) # ────────────────────────────────────────────── def _spa_index(): idx = os.path.join(DIST_DIR, "index.html") if os.path.exists(idx): return send_from_directory(DIST_DIR, "index.html") return ( "

Frontend not built

" "

Run cd frontend && npm run build first.

", 503, ) @app.route("/", defaults={"path": ""}) @app.route("/") def serve_spa(path: str): """Serve the built React app. API routes are registered before this.""" if path: target = os.path.join(DIST_DIR, path) if os.path.exists(target) and os.path.isfile(target): return send_from_directory(DIST_DIR, path) return _spa_index() @app.errorhandler(404) def not_found(_e): """Catch 404s so React Router handles client-side routes.""" return _spa_index() # ────────────────────────────────────────────── # Bootstrap # ────────────────────────────────────────────── init_db() if __name__ == "__main__": print("🚀 Serveur lancé sur http://127.0.0.1:7860") app.run(host="0.0.0.0", port=7860, debug=False)