Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import datetime | |
| import logging | |
| import re | |
| from fastapi import FastAPI, Depends, HTTPException, status, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, ConfigDict, Field, field_validator | |
| import rapidfuzz | |
| import pandas as pd | |
| from typing import List, Optional, Dict, Any | |
| # ── Carga del diccionario de normalización ──────────────────────────────────── | |
| _DICT_PATH = os.path.join(os.path.dirname(__file__), "diccionario_normalizacion.json") | |
| try: | |
| with open(_DICT_PATH, "r", encoding="utf-8") as _f: | |
| DICCIONARIO_NORMALIZACION = json.load(_f) | |
| except Exception: | |
| DICCIONARIO_NORMALIZACION = {} | |
| from sqlalchemy.orm import Session | |
| from database import ( | |
| get_db, | |
| DimEstudiante, | |
| DimDocente, | |
| DimModulo, | |
| DimTiempo, | |
| DimOrigenDocumental, | |
| Users, | |
| FactRendimientoAcademico, | |
| FactSituacionFinanciera, | |
| FactEvaluacionDocente, | |
| FactCobranzasProyectadas, | |
| FactMarketingInscripciones, | |
| FactRentabilidadPresupuesto, | |
| LogAuditoriaNlp, | |
| DimCategoriaFinanciera, | |
| ) | |
| from ner_engine import ner_engine | |
| from similarity import find_best_match, get_top_matches | |
| from typing import List, Optional | |
| import jwt | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from passlib.context import CryptContext | |
| SECRET_KEY = os.getenv("JWT_SECRET", "super-secret-local-key") | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 | |
| pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") | |
| def verify_password(plain_password, hashed_password): | |
| if hashed_password == "$placeholder$": | |
| return plain_password == "admin123" | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def get_password_hash(password): | |
| return pwd_context.hash(password) | |
| def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None): | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username: str = payload.get("sub") | |
| if username is None: | |
| raise credentials_exception | |
| except jwt.PyJWTError: | |
| raise credentials_exception | |
| user = db.query(Users).filter(Users.username == username).first() | |
| if user is None: | |
| raise credentials_exception | |
| return user | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI( | |
| title="GiraGroup BI Backend Cloud", | |
| description="API para Tecnologías Emergentes II con BETO y Supabase", | |
| version="1.0.0" | |
| ) | |
| # CORS: sólo permite peticiones desde el frontend registrado | |
| _ALLOWED_ORIGINS = [ | |
| origin.strip() | |
| for origin in os.getenv("CORS_ALLOWED_ORIGINS", "http://localhost:5173,https://giragroup-bi-frontend-tei-jgc45f654-dazz-s-projects.vercel.app,https://giragroup-bi-frontend-tei-ii.vercel.app").split(",") | |
| if origin.strip() | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=_ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Valores válidos para el CHECK constraint de Supabase | |
| TIPO_DOC_VALIDO = "SHEET" | |
| # Clave interna para el endpoint de diagnóstico (solo debugging, nunca pública) | |
| _DIAG_SECRET = os.getenv("DIAG_SECRET", "") | |
| class ProcessSheetPayload(BaseModel): | |
| # texto_celda: sin caracteres de control ni HTML, permite cualquier longitud y contenido | |
| texto_celda: str = Field(default="Sin nombre", max_length=1000) | |
| nota_detectada: float = Field(default=0.0) | |
| asistencia: float = Field(default=100.0) | |
| incumplimiento_tareas: float = Field(default=0.0) | |
| id_docente: int = Field(default=1) | |
| id_modulo: int = Field(default=1) | |
| id_tiempo: int = Field(default=1) | |
| id_documento: int = Field(default=1) | |
| id_usuario: int = Field(default=1) | |
| codigo_estudiante: Optional[str] = None | |
| programa: Optional[str] = None | |
| modulo: Optional[str] = None | |
| docente: Optional[str] = None | |
| semestre: Optional[str] = None | |
| institucion: Optional[str] = None | |
| tipo_fuente: Optional[str] = None | |
| genero: Optional[str] = None | |
| ciudad: Optional[str] = None | |
| pos_code: Optional[str] = None | |
| estado_inscripcion: Optional[str] = None | |
| estado_academico: Optional[str] = None | |
| # Financial fields | |
| monto_deuda: Optional[float] = 0.0 | |
| cuotas_impagas: Optional[int] = 0 | |
| monto_ejecutado: Optional[float] = 0.0 | |
| monto_meta: Optional[float] = 0.0 | |
| estado_cartera: Optional[str] = None | |
| tipo_alerta: Optional[str] = None | |
| # Marketing fields | |
| leads: Optional[int] = 0 | |
| reservas: Optional[int] = 0 | |
| inscritos: Optional[int] = 0 | |
| costo: Optional[float] = 0.0 | |
| pregunta: Optional[str] = None | |
| puntuacion: Optional[float] = 0.0 | |
| # Custom fields for specific processing logic | |
| gestion: Optional[int] = None | |
| mes: Optional[str] = None | |
| proyecciones_mensuales: Optional[Dict[str, float]] = None | |
| def sanitize_texto(cls, v: str) -> str: | |
| if not v: | |
| return "Sin nombre" | |
| # Eliminar etiquetas HTML, caracteres de control y secuencias peligrosas | |
| v = re.sub(r'<[^>]*>', '', v) # strip HTML tags | |
| v = re.sub(r'[\x00-\x1f\x7f]', '', v) # strip control chars | |
| v = v.strip() | |
| if not v: | |
| return "Sin nombre" | |
| return v | |
| class ProcessSheetPayloadRaw(BaseModel): | |
| texto_celda: str | |
| nota_detectada: float | |
| asistencia: float | |
| incumplimiento_tareas: float | |
| codigo_estudiante: Optional[str] = None | |
| programa: Optional[str] = None | |
| modulo: Optional[str] = None | |
| docente: Optional[str] = None | |
| semestre: Optional[str] = None | |
| institucion: Optional[str] = None | |
| tipo_fuente: Optional[str] = None | |
| genero: Optional[str] = None | |
| ciudad: Optional[str] = None | |
| pos_code: Optional[str] = None | |
| estado_inscripcion: Optional[str] = None | |
| estado_academico: Optional[str] = None | |
| # Financial fields | |
| monto_deuda: Optional[float] = 0.0 | |
| cuotas_impagas: Optional[int] = 0 | |
| # Marketing fields | |
| leads: Optional[int] = 0 | |
| reservas: Optional[int] = 0 | |
| inscritos: Optional[int] = 0 | |
| costo: Optional[float] = 0.0 | |
| # Survey fields | |
| pregunta: Optional[str] = None | |
| puntuacion: Optional[float] = 0.0 | |
| from typing import Union | |
| class BatchPayload(BaseModel): | |
| records: List[ProcessSheetPayloadRaw] | |
| def read_root(): | |
| return { | |
| "status": "healthy", | |
| "service": "GiraGroup BI Backend API Cloud", | |
| "ner_initialized": ner_engine._initialized or ner_engine.pipeline is not None | |
| } | |
| class Token(BaseModel): | |
| access_token: str | |
| token_type: str | |
| role: str | |
| def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): | |
| user = db.query(Users).filter(Users.username == form_data.username).first() | |
| if not user or not verify_password(form_data.password, user.hashed_password): | |
| # Auto-seed the user if it's one of the test users and doesn't exist | |
| test_users = { | |
| "directivo@giragroup.com": {"password": "Directivo@123", "role": "comite_directivo"}, | |
| "academico@giragroup.com": {"password": "Academico@123", "role": "coordinador_academico"}, | |
| "datos@giragroup.com": {"password": "Datos@123", "role": "analista_datos_marketing"}, | |
| "admin@giragroup.com": {"password": "Admin@123", "role": "admin"} | |
| } | |
| if form_data.username in test_users and form_data.password == test_users[form_data.username]["password"]: | |
| if not user: | |
| user = Users( | |
| username=form_data.username, | |
| hashed_password=get_password_hash(form_data.password), | |
| role=test_users[form_data.username]["role"] | |
| ) | |
| db.add(user) | |
| db.commit() | |
| db.refresh(user) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = create_access_token( | |
| data={"sub": user.username, "role": user.role}, expires_delta=access_token_expires | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer", "role": user.role} | |
| def diagnostico_db( | |
| secret: str = Query(default=""), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Endpoint de diagnóstico: protegido por DIAG_SECRET. | |
| En producción, configurar DIAG_SECRET en los Secrets del Space. | |
| Sin la clave correcta devuelve 403. | |
| """ | |
| if not _DIAG_SECRET or secret != _DIAG_SECRET: | |
| raise HTTPException(status_code=403, detail="Acceso denegado al diagnóstico.") | |
| resultados = {} | |
| tablas = { | |
| "dim_estudiante": DimEstudiante, | |
| "dim_docente": DimDocente, | |
| "dim_modulo": DimModulo, | |
| "dim_tiempo": DimTiempo, | |
| "dim_origen_documental": DimOrigenDocumental, | |
| "users": Users, | |
| "fact_rendimiento_academico": FactRendimientoAcademico, | |
| } | |
| for nombre, modelo in tablas.items(): | |
| try: | |
| count = db.query(modelo).count() | |
| resultados[nombre] = {"ok": True, "count": count} | |
| except Exception as e: | |
| resultados[nombre] = {"ok": False, "error": str(e)} | |
| todo_ok = all(v["ok"] for v in resultados.values()) | |
| return { | |
| "conexion": "ok", | |
| "tablas": resultados, | |
| "listo_para_produccion": todo_ok | |
| } | |
| def procesar_registro_tabular(payload: ProcessSheetPayload, db: Session = Depends(get_db)): | |
| try: | |
| # 1. NLP con BETO | |
| entidades = ner_engine.extract_entities(payload.texto_celda) | |
| confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) if entidades else 1.0 | |
| forzar_revision = confianza_ia < 0.60 | |
| # 2. Dimensión Estudiante — Vinculación por Niveles | |
| nombre_resuelto = payload.texto_celda[:200].strip() | |
| estudiante = None | |
| confianza_vinculacion = 0.0 | |
| nivel_vinculacion = 0 | |
| # Nivel 1: ID Único | |
| if payload.codigo_estudiante: | |
| estudiante = db.query(DimEstudiante).filter(DimEstudiante.codigo_estudiante == payload.codigo_estudiante).first() | |
| if estudiante: | |
| confianza_vinculacion = 1.0 | |
| nivel_vinculacion = 1 | |
| if not estudiante: | |
| # Nivel 2 y 3: Fuzzy matching | |
| estudiantes_existentes = db.query(DimEstudiante).all() | |
| best_match, score = find_best_match(nombre_resuelto, estudiantes_existentes) | |
| if best_match and score >= 0.80: | |
| estudiante = best_match | |
| confianza_vinculacion = score | |
| nivel_vinculacion = 2 if payload.programa and payload.semestre else 3 | |
| if not estudiante: | |
| estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=payload.codigo_estudiante) | |
| db.add(estudiante) | |
| db.flush() | |
| # 3. Dimensión Docente — columnas reales: nombre_completo, area_especialidad | |
| if not db.query(DimDocente).filter(DimDocente.id_docente == payload.id_docente).first(): | |
| db.add(DimDocente( | |
| id_docente=payload.id_docente, | |
| nombre_completo="Docente Generico", | |
| area_especialidad="Generico" | |
| )) | |
| # 4. Dimensión Módulo — columnas reales: nombre_modulo, nombre_institucion, programa | |
| if not db.query(DimModulo).filter(DimModulo.id_modulo == payload.id_modulo).first(): | |
| db.add(DimModulo( | |
| id_modulo=payload.id_modulo, | |
| nombre_modulo=payload.modulo or "Modulo Generico", | |
| nombre_institucion=payload.institucion or "GiraGroup", | |
| programa=payload.programa or "General" | |
| )) | |
| # 5. Dimensión Tiempo — columnas reales: gestion, semestre, mes | |
| if not db.query(DimTiempo).filter(DimTiempo.id_tiempo == payload.id_tiempo).first(): | |
| db.add(DimTiempo( | |
| id_tiempo=payload.id_tiempo, | |
| gestion=2026, | |
| semestre=1, | |
| mes="Mayo" | |
| )) | |
| # 6. Dimensión Origen Documental — tabla real: dim_origen_documental | |
| # CHECK: tipo_documento IN ('SHEET', 'FORM', 'MOODLE', 'XLSX') | |
| if not db.query(DimOrigenDocumental).filter( | |
| DimOrigenDocumental.id_documento == payload.id_documento | |
| ).first(): | |
| db.add(DimOrigenDocumental( | |
| id_documento=payload.id_documento, | |
| tipo_documento=TIPO_DOC_VALIDO, | |
| nombre_archivo="carga_automatica" | |
| )) | |
| # 7. Usuario — tabla real: users (id, username, hashed_password, role) | |
| if not db.query(Users).filter(Users.id == payload.id_usuario).first(): | |
| db.add(Users( | |
| id=payload.id_usuario, | |
| username=f"sistema_{payload.id_usuario}", | |
| hashed_password="$placeholder$", | |
| role="admin" | |
| )) | |
| db.flush() | |
| # 8. Alertas estratégicas | |
| alertas_disparadas = [] | |
| if payload.nota_detectada <= 70.0: | |
| alertas_disparadas.append("RIESGO_ACADEMICO_CRITICO") | |
| if payload.asistencia < 70.0 or payload.incumplimiento_tareas > 30.0: | |
| alertas_disparadas.append("RIESGO_DESERCION_ALTA") | |
| # 9. Insertar hecho con las FK correctas | |
| nuevo_hecho = FactRendimientoAcademico( | |
| id_estudiante=estudiante.id_estudiante, | |
| id_docente=payload.id_docente, | |
| id_modulo=payload.id_modulo, | |
| id_tiempo=payload.id_tiempo, | |
| id_documento=payload.id_documento, | |
| id_usuario_carga=payload.id_usuario, | |
| nota_final=payload.nota_detectada, | |
| asistencia_pct=payload.asistencia, | |
| incumplimiento_actividades_pct=payload.incumplimiento_tareas, | |
| nivel_confianza_ia=confianza_ia, | |
| requiere_revision=forzar_revision | |
| ) | |
| db.add(nuevo_hecho) | |
| db.commit() | |
| return { | |
| "status": "processed", | |
| "id_estudiante_asignado": estudiante.id_estudiante, | |
| "confianza_modelo_beto": round(confianza_ia, 4), | |
| "confianza_vinculacion": round(confianza_vinculacion, 4), | |
| "nivel_vinculacion": nivel_vinculacion, | |
| "requiere_auditoria_humana": forzar_revision, | |
| "alertas_estrategicas": alertas_disparadas | |
| } | |
| except Exception as err: | |
| db.rollback() | |
| logger.error(f"Error crítico en backend 500: {err}") | |
| raise HTTPException(status_code=500, detail=str(err)) | |
| def analyze_nlp_only(payload: ProcessSheetPayload, db: Session = Depends(get_db)): | |
| """ | |
| Fase 1: Solo ejecuta el modelo NLP (BETO) sobre el texto y devuelve las métricas. | |
| NO inserta en la base de datos. Usado para el Staging area en el Frontend. | |
| """ | |
| entidades = ner_engine.extract_entities(payload.texto_celda) | |
| confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) if entidades else 1.0 | |
| nombre_resuelto = payload.texto_celda[:200].strip() | |
| # 1. Consultar log_auditoria_nlp (MLOps Memory) | |
| log_memoria = db.query(LogAuditoriaNlp).filter( | |
| LogAuditoriaNlp.texto_original == nombre_resuelto | |
| ).order_by(LogAuditoriaNlp.created_at.desc()).first() | |
| candidatos_difusos = [] | |
| regla_aplicada = False | |
| if log_memoria: | |
| # BETO "recuerda" la decisión humana previa | |
| nombre_resuelto = log_memoria.correccion_humana | |
| confianza_ia = 1.0 | |
| forzar_revision = False | |
| regla_aplicada = True | |
| else: | |
| # Fuzzy Matching | |
| estudiantes_existentes = db.query(DimEstudiante).all() | |
| best_match, score = find_best_match(nombre_resuelto, estudiantes_existentes) | |
| # Generar Top 3 candidatos para el dropdown de resolución | |
| from rapidfuzz import fuzz | |
| for est in estudiantes_existentes: | |
| s = fuzz.token_sort_ratio(nombre_resuelto.lower(), est.nombre_completo.lower()) / 100.0 | |
| if s > 0.4: | |
| candidatos_difusos.append({"id": est.id_estudiante, "nombre": est.nombre_completo, "score": round(s, 2)}) | |
| candidatos_difusos = sorted(candidatos_difusos, key=lambda x: x["score"], reverse=True)[:3] | |
| if score > 0.8: | |
| confianza_ia = score | |
| # Enforce range limits strictly for non-financial files | |
| fuera_de_rango = False | |
| if payload.tipo_fuente not in ["FINANCE", "BUDGET", "MARKETING", "SURVEYS"]: | |
| fuera_de_rango = ( | |
| payload.nota_detectada < 0 or payload.nota_detectada > 100 or | |
| payload.asistencia < 0 or payload.asistencia > 100 or | |
| payload.incumplimiento_tareas < 0 or payload.incumplimiento_tareas > 100 | |
| ) | |
| forzar_revision = (confianza_ia < 0.50) or fuera_de_rango | |
| alertas_disparadas = [] | |
| if payload.tipo_fuente not in ["FINANCE", "BUDGET", "MARKETING", "SURVEYS"]: | |
| if fuera_de_rango: | |
| alertas_disparadas.append("ERROR_VALOR_FUERA_RANGO") | |
| if payload.nota_detectada <= 70.0: | |
| alertas_disparadas.append("RIESGO_ACADEMICO_CRITICO") | |
| if payload.asistencia < 70.0 or payload.incumplimiento_tareas > 30.0: | |
| alertas_disparadas.append("RIESGO_DESERCION_ALTA") | |
| return { | |
| "status": "analyzed", | |
| "confianza_modelo_beto": round(confianza_ia, 4), | |
| "requiere_auditoria_humana": forzar_revision, | |
| "alertas_estrategicas": alertas_disparadas, | |
| "entidades_nlp": entidades, | |
| "candidatos_difusos": candidatos_difusos, | |
| "regla_memoria_aplicada": regla_aplicada, | |
| "nombre_resuelto": nombre_resuelto | |
| } | |
| def nlp_quality_check(payload: ProcessSheetPayloadRaw): | |
| """ | |
| Evalúa la calidad del dato sin aplicar clamping (diagnóstico en lugar de corrección silenciosa). | |
| """ | |
| inconsistencias = [] | |
| # Enforce range checks only for non-financial files | |
| if payload.tipo_fuente != "FINANCE": | |
| if payload.nota_detectada > 100 or payload.nota_detectada < 0: | |
| inconsistencias.append({"campo": "nota", "original": payload.nota_detectada, "corregido": max(0, min(100, payload.nota_detectada)), "tipo": "FUERA_RANGO"}) | |
| if payload.asistencia > 100 or payload.asistencia < 0: | |
| inconsistencias.append({"campo": "asistencia", "original": payload.asistencia, "corregido": max(0, min(100, payload.asistencia)), "tipo": "FUERA_RANGO"}) | |
| if payload.incumplimiento_tareas > 100 or payload.incumplimiento_tareas < 0: | |
| inconsistencias.append({"campo": "incumplimiento_tareas", "original": payload.incumplimiento_tareas, "corregido": max(0, min(100, payload.incumplimiento_tareas)), "tipo": "FUERA_RANGO"}) | |
| nombre_limpio = payload.texto_celda.strip() | |
| if not nombre_limpio or nombre_limpio.lower() in ["sin nombre", "desconocido"]: | |
| inconsistencias.append({"campo": "nombre", "original": payload.texto_celda, "corregido": "Estudiante (Sin Nombre)", "tipo": "NOMBRE_VACIO"}) | |
| elif len(nombre_limpio) < 3 or nombre_limpio.replace('.', '').replace(',', '').isdigit(): | |
| inconsistencias.append({"campo": "nombre", "original": payload.texto_celda, "corregido": nombre_limpio, "tipo": "NOMBRE_SOSPECHOSO"}) | |
| return { | |
| "status": "checked", | |
| "inconsistencias": inconsistencias, | |
| "score_calidad": 1.0 if not inconsistencias else max(0.0, 1.0 - (len(inconsistencias) * 0.2)) | |
| } | |
| from typing import List | |
| def procesar_lote_tabular(payloads: List[ProcessSheetPayload], db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)): | |
| """ | |
| Fase 3: Recibe una lista de registros (ya confirmados/editados por el usuario | |
| en la Fase 2) y los inserta masivamente usando SQLAlchemy bulk operations. | |
| """ | |
| if current_user.role not in ["coordinador_academico", "analista_datos_marketing", "admin"]: | |
| raise HTTPException(status_code=403, detail="Acceso denegado: Rol no autorizado para ingesta.") | |
| try: | |
| # Filter payloads by role to prevent RLS violations | |
| filtered_payloads = [] | |
| for p in payloads: | |
| area = getattr(p, 'tipo_fuente', 'ACADEMIC') | |
| if not area: area = 'ACADEMIC' | |
| if current_user.role == "coordinador_academico": | |
| if area in ["ACADEMIC", "SURVEYS"]: | |
| filtered_payloads.append(p) | |
| elif current_user.role == "analista_datos_marketing": | |
| if area in ["MARKETING", "BUDGET", "FINANCE"]: | |
| filtered_payloads.append(p) | |
| else: # admin | |
| filtered_payloads.append(p) | |
| payloads = filtered_payloads | |
| # Cargar dimensiones en memoria para evitar N queries | |
| estudiantes_db = db.query(DimEstudiante).all() | |
| existing_docentes = {d.nombre_completo: d.id_docente for d in db.query(DimDocente).all()} | |
| # For default/generic fallback | |
| docentes_db = set(existing_docentes.values()) | |
| # Load modulos by pos_code and name | |
| modulos_by_pos = {m.pos_code: m.id_modulo for m in db.query(DimModulo).filter(DimModulo.pos_code != None).all()} | |
| modulos_by_name = {m.nombre_modulo: m.id_modulo for m in db.query(DimModulo).all()} | |
| modulos_db = set(modulos_by_name.values()) | |
| tiempos_db_list = db.query(DimTiempo).all() | |
| docs_db = {d.id_documento for d in db.query(DimOrigenDocumental).all()} | |
| users_db = {u.id for u in db.query(Users).all()} | |
| existing_categorias = {c.nombre_categoria: c.id_categoria for c in db.query(DimCategoriaFinanciera).all()} | |
| categorias_db = set(existing_categorias.values()) | |
| # Load existing facts to perform upserts/deduplication | |
| existing_marketing = {(m.id_modulo, m.id_tiempo): m.id_hecho_mkt for m in db.query(FactMarketingInscripciones).all()} | |
| existing_surveys = {(s.id_docente, s.id_modulo, s.id_estudiante, s.id_tiempo, s.pregunta_bloque): s.id_hecho_eval for s in db.query(FactEvaluacionDocente).all()} | |
| existing_budget = {(b.id_categoria, b.id_tiempo): b.id_hecho_rent for b in db.query(FactRentabilidadPresupuesto).all()} | |
| existing_finance = {(f.id_estudiante, f.id_tiempo): f.id_hecho_fin for f in db.query(FactSituacionFinanciera).all()} | |
| existing_cobranzas = {(c.id_estudiante, c.id_tiempo): c.id_hecho_cobro for c in db.query(FactCobranzasProyectadas).all()} | |
| existing_academic = {(a.id_estudiante, a.id_modulo): a.id_hecho_aca for a in db.query(FactRendimientoAcademico).all()} | |
| for payload in payloads: | |
| # Dimensiones requeridas | |
| if payload.docente: | |
| docente_name = payload.docente.strip() | |
| if docente_name not in existing_docentes: | |
| new_id = max(existing_docentes.values() or [0]) + 1 | |
| db.add(DimDocente(id_docente=new_id, nombre_completo=docente_name, area_especialidad="Generico")) | |
| db.flush() | |
| existing_docentes[docente_name] = new_id | |
| docentes_db.add(new_id) | |
| payload.id_docente = existing_docentes[docente_name] | |
| else: | |
| if payload.id_docente not in docentes_db: | |
| db.add(DimDocente(id_docente=payload.id_docente, nombre_completo="Docente Generico", area_especialidad="Generico")) | |
| db.flush() | |
| docentes_db.add(payload.id_docente) | |
| # Resolucion relacional por llave POS-CI | |
| pos_val = payload.pos_code or (payload.programa if payload.programa and payload.programa.startswith("POS-") else None) | |
| id_modulo_val = None | |
| if pos_val: | |
| id_modulo_val = modulos_by_pos.get(pos_val) | |
| if not id_modulo_val and payload.modulo: | |
| id_modulo_val = modulos_by_name.get(payload.modulo) | |
| if not id_modulo_val: | |
| new_id = max(modulos_by_name.values() or [0]) + 1 | |
| new_modulo = DimModulo( | |
| id_modulo=new_id, | |
| nombre_modulo=payload.modulo or f"Modulo {pos_val}" or "Modulo Generico", | |
| nombre_institucion=payload.institucion or "GiraGroup", | |
| programa=payload.programa or "General", | |
| pos_code=pos_val | |
| ) | |
| db.add(new_modulo) | |
| db.flush() | |
| id_modulo_val = new_id | |
| modulos_by_name[new_modulo.nombre_modulo] = new_id | |
| if pos_val: | |
| modulos_by_pos[pos_val] = new_id | |
| modulos_db.add(new_id) | |
| payload.id_modulo = id_modulo_val | |
| # Resolución dinámica de DimTiempo | |
| gestion_val = int(payload.gestion) if getattr(payload, 'gestion', None) else 2026 | |
| mes_val = str(payload.mes).capitalize() if getattr(payload, 'mes', None) else "Mayo" | |
| tiempo_obj = next((t for t in tiempos_db_list if t.gestion == gestion_val and t.mes.lower() == mes_val.lower()), None) | |
| if not tiempo_obj: | |
| new_id = max((t.id_tiempo for t in tiempos_db_list), default=0) + 1 | |
| tiempo_obj = DimTiempo(id_tiempo=new_id, gestion=gestion_val, semestre=1, mes=mes_val) | |
| db.add(tiempo_obj) | |
| db.flush() | |
| tiempos_db_list.append(tiempo_obj) | |
| payload.id_tiempo = tiempo_obj.id_tiempo | |
| if payload.id_documento not in docs_db: | |
| db.add(DimOrigenDocumental(id_documento=payload.id_documento, tipo_documento="SHEET", nombre_archivo="carga_automatica")) | |
| db.flush() | |
| docs_db.add(payload.id_documento) | |
| if payload.id_usuario not in users_db: | |
| db.add(Users(id=payload.id_usuario, username=f"sistema_{payload.id_usuario}", hashed_password="$placeholder$", role="admin")) | |
| db.flush() | |
| users_db.add(payload.id_usuario) | |
| # Para presupuestos asumimos categoría base por defecto (ID 1) si no existe | |
| area_check = getattr(payload, 'tipo_fuente', 'ACADEMIC') | |
| if not area_check: area_check = 'ACADEMIC' | |
| if area_check == 'BUDGET': | |
| cat_name = payload.programa.strip() if getattr(payload, 'programa', None) else "Presupuesto General" | |
| if cat_name not in existing_categorias: | |
| new_id = max(existing_categorias.values() or [0]) + 1 | |
| db.add(DimCategoriaFinanciera(id_categoria=new_id, nombre_categoria=cat_name, tipo="EGRESO")) | |
| db.flush() | |
| existing_categorias[cat_name] = new_id | |
| categorias_db.add(new_id) | |
| for payload in payloads: | |
| area = getattr(payload, 'tipo_fuente', 'ACADEMIC') | |
| if not area: area = 'ACADEMIC' | |
| if area in ['BUDGET', 'FINANCE', 'MARKETING', 'SURVEYS']: | |
| confianza_ia = 0.95 | |
| forzar_revision = False | |
| else: | |
| entidades = ner_engine.extract_entities(payload.texto_celda) | |
| confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) if entidades else 1.0 | |
| forzar_revision = confianza_ia < 0.50 | |
| nombre_resuelto = payload.texto_celda[:200].strip() | |
| estudiante = next((e for e in estudiantes_db if e.codigo_estudiante == payload.codigo_estudiante), None) if payload.codigo_estudiante else None | |
| if not estudiante: | |
| best_match, score = find_best_match(nombre_resuelto, estudiantes_db) | |
| if best_match and score >= 0.80: | |
| estudiante = best_match | |
| if not estudiante: | |
| estudiante = DimEstudiante( | |
| nombre_completo=nombre_resuelto, | |
| codigo_estudiante=payload.codigo_estudiante, | |
| genero=payload.genero, | |
| ciudad=payload.ciudad | |
| ) | |
| db.add(estudiante) | |
| db.flush() | |
| estudiantes_db.append(estudiante) | |
| # Preparar Hechos con lógica de deduplicación / Upsert | |
| if area == "MARKETING": | |
| key = (payload.id_modulo, payload.id_tiempo) | |
| if key in existing_marketing: | |
| db_mkt = db.query(FactMarketingInscripciones).filter_by(id_hecho_mkt=existing_marketing[key]).first() | |
| if db_mkt: | |
| db_mkt.leads = getattr(payload, 'leads', 1) | |
| db_mkt.reservas = getattr(payload, 'reservas', 0) | |
| db_mkt.inscritos = getattr(payload, 'inscritos', 0) | |
| db_mkt.costo_programa = getattr(payload, 'costo', 0) | |
| else: | |
| new_mkt = FactMarketingInscripciones( | |
| id_modulo=payload.id_modulo, id_tiempo=payload.id_tiempo, | |
| leads=getattr(payload, 'leads', 1), reservas=getattr(payload, 'reservas', 0), | |
| inscritos=getattr(payload, 'inscritos', 0), costo_programa=getattr(payload, 'costo', 0) | |
| ) | |
| db.add(new_mkt) | |
| db.flush() | |
| existing_marketing[key] = new_mkt.id_hecho_mkt | |
| elif area == "SURVEYS": | |
| key = (payload.id_docente, payload.id_modulo, estudiante.id_estudiante, payload.id_tiempo, getattr(payload, 'pregunta', 'General')) | |
| if key in existing_surveys: | |
| db_srv = db.query(FactEvaluacionDocente).filter_by(id_hecho_eval=existing_surveys[key]).first() | |
| if db_srv: | |
| db_srv.puntuacion = getattr(payload, 'puntuacion', 5.0) | |
| db_srv.comentario = nombre_resuelto | |
| else: | |
| new_srv = FactEvaluacionDocente( | |
| id_docente=payload.id_docente, id_modulo=payload.id_modulo, | |
| id_estudiante=estudiante.id_estudiante, id_tiempo=payload.id_tiempo, | |
| pregunta_bloque=getattr(payload, 'pregunta', 'General'), | |
| puntuacion=getattr(payload, 'puntuacion', 5.0), comentario=nombre_resuelto | |
| ) | |
| db.add(new_srv) | |
| db.flush() | |
| existing_surveys[key] = new_srv.id_hecho_eval | |
| elif area == "BUDGET": | |
| cat_name = payload.programa.strip() if getattr(payload, 'programa', None) else "Presupuesto General" | |
| cat_id = existing_categorias.get(cat_name, 1) | |
| key = (cat_id, payload.id_tiempo) | |
| if key in existing_budget: | |
| db_budget = db.query(FactRentabilidadPresupuesto).filter_by(id_hecho_rent=existing_budget[key]).first() | |
| if db_budget: | |
| db_budget.monto_ejecutado = getattr(payload, 'monto_ejecutado', getattr(payload, 'costo', 0)) | |
| db_budget.monto_meta = getattr(payload, 'monto_meta', getattr(payload, 'costo', 0)) | |
| db_budget.id_modulo = payload.id_modulo | |
| else: | |
| new_budget = FactRentabilidadPresupuesto( | |
| id_modulo=payload.id_modulo, id_tiempo=payload.id_tiempo, | |
| id_categoria=cat_id, | |
| monto_ejecutado=getattr(payload, 'monto_ejecutado', getattr(payload, 'costo', 0)), | |
| monto_meta=getattr(payload, 'monto_meta', getattr(payload, 'costo', 0)) | |
| ) | |
| db.add(new_budget) | |
| db.flush() | |
| existing_budget[key] = new_budget.id_hecho_rent | |
| elif area == "FINANCE": | |
| if getattr(payload, 'proyecciones_mensuales', None): | |
| for mes_key, monto_esp in payload.proyecciones_mensuales.items(): | |
| parts = str(mes_key).upper().replace('MONTO', '').strip().split() | |
| mes_str = parts[0] if len(parts) > 0 else 'ENERO' | |
| gestion_val = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else getattr(payload, 'gestion', 2024) | |
| if not gestion_val: gestion_val = 2024 | |
| tiempo_proj = next((t for t in tiempos_db_list if t.gestion == gestion_val and t.mes.lower() == mes_str.lower()), None) | |
| if not tiempo_proj: | |
| new_id = max((t.id_tiempo for t in tiempos_db_list), default=0) + 1 | |
| tiempo_proj = DimTiempo(id_tiempo=new_id, gestion=gestion_val, semestre=1, mes=mes_str.capitalize()) | |
| db.add(tiempo_proj) | |
| db.flush() | |
| tiempos_db_list.append(tiempo_proj) | |
| key = (estudiante.id_estudiante, tiempo_proj.id_tiempo) | |
| if key in existing_cobranzas: | |
| db_cobranza = db.query(FactCobranzasProyectadas).filter_by(id_hecho_cobro=existing_cobranzas[key]).first() | |
| if db_cobranza: | |
| db_cobranza.monto_esperado = float(monto_esp) | |
| else: | |
| new_cobranza = FactCobranzasProyectadas( | |
| id_estudiante=estudiante.id_estudiante, | |
| id_tiempo=tiempo_proj.id_tiempo, | |
| monto_esperado=float(monto_esp), | |
| estado_pago="PENDIENTE" | |
| ) | |
| db.add(new_cobranza) | |
| db.flush() | |
| existing_cobranzas[key] = new_cobranza.id_hecho_cobro | |
| else: | |
| key = (estudiante.id_estudiante, payload.id_tiempo) | |
| if key in existing_finance: | |
| db_finance = db.query(FactSituacionFinanciera).filter_by(id_hecho_fin=existing_finance[key]).first() | |
| if db_finance: | |
| db_finance.monto_deuda = getattr(payload, 'monto_deuda', 0) | |
| db_finance.cuotas_impagas = getattr(payload, 'cuotas_impagas', 0) | |
| db_finance.estado_cartera = getattr(payload, 'estado_cartera', 'PENDIENTE') | |
| db_finance.tipo_alerta = getattr(payload, 'tipo_alerta', 'NINGUNA') | |
| else: | |
| new_finance = FactSituacionFinanciera( | |
| id_estudiante=estudiante.id_estudiante, | |
| id_tiempo=payload.id_tiempo, | |
| monto_deuda=getattr(payload, 'monto_deuda', 0), | |
| cuotas_impagas=getattr(payload, 'cuotas_impagas', 0), | |
| estado_cartera=getattr(payload, 'estado_cartera', 'PENDIENTE'), | |
| tipo_alerta=getattr(payload, 'tipo_alerta', 'NINGUNA') | |
| ) | |
| db.add(new_finance) | |
| db.flush() | |
| existing_finance[key] = new_finance.id_hecho_fin | |
| # Also update/insert academic record if estado_academico is provided! | |
| est_aca_val = getattr(payload, 'estado_academico', None) | |
| if est_aca_val: | |
| acad_key = (estudiante.id_estudiante, payload.id_modulo) | |
| if acad_key in existing_academic: | |
| db_academic = db.query(FactRendimientoAcademico).filter_by(id_hecho_aca=existing_academic[acad_key]).first() | |
| if db_academic: | |
| db_academic.estado_academico = est_aca_val | |
| else: | |
| new_academic = FactRendimientoAcademico( | |
| id_estudiante=estudiante.id_estudiante, | |
| id_docente=payload.id_docente, | |
| id_modulo=payload.id_modulo, | |
| id_tiempo=payload.id_tiempo, | |
| id_documento=payload.id_documento, | |
| id_usuario_carga=payload.id_usuario, | |
| nota_final=0.0, | |
| asistencia_pct=100.0, | |
| incumplimiento_actividades_pct=0.0, | |
| nivel_confianza_ia=0.95, | |
| requiere_revision=False, | |
| estado_academico=est_aca_val | |
| ) | |
| db.add(new_academic) | |
| db.flush() | |
| existing_academic[acad_key] = new_academic.id_hecho_aca | |
| else: # ACADEMIC | |
| key = (estudiante.id_estudiante, payload.id_modulo) | |
| if key in existing_academic: | |
| db_academic = db.query(FactRendimientoAcademico).filter_by(id_hecho_aca=existing_academic[key]).first() | |
| if db_academic: | |
| db_academic.id_docente = payload.id_docente | |
| db_academic.id_tiempo = payload.id_tiempo | |
| db_academic.id_documento = payload.id_documento | |
| db_academic.id_usuario_carga = payload.id_usuario | |
| db_academic.nota_final = payload.nota_detectada | |
| db_academic.asistencia_pct = payload.asistencia | |
| db_academic.incumplimiento_actividades_pct = payload.incumplimiento_tareas | |
| db_academic.nivel_confianza_ia = confianza_ia | |
| db_academic.requiere_revision = forzar_revision | |
| db_academic.estado_academico = getattr(payload, 'estado_academico', None) | |
| else: | |
| new_academic = FactRendimientoAcademico( | |
| id_estudiante=estudiante.id_estudiante, id_docente=payload.id_docente, | |
| id_modulo=payload.id_modulo, id_tiempo=payload.id_tiempo, | |
| id_documento=payload.id_documento, id_usuario_carga=payload.id_usuario, | |
| nota_final=payload.nota_detectada, asistencia_pct=payload.asistencia, | |
| incumplimiento_actividades_pct=payload.incumplimiento_tareas, | |
| nivel_confianza_ia=confianza_ia, requiere_revision=forzar_revision, | |
| estado_academico=getattr(payload, 'estado_academico', None) | |
| ) | |
| db.add(new_academic) | |
| db.flush() | |
| existing_academic[key] = new_academic.id_hecho_aca | |
| db.commit() | |
| return {"status": "success", "inserted_count": len(payloads)} | |
| except Exception as err: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=str(err)) | |
| def obtener_riesgos_cruzados( | |
| # Clamp explícito de los parámetros: nunca se usa el valor crudo del usuario en la query | |
| limite_nota: float = Query(default=70.0, ge=0.0, le=100.0), | |
| min_cuotas: int = Query(default=2, ge=1, le=20), | |
| db: Session = Depends(get_db) | |
| ): | |
| try: | |
| # Hacer JOIN real con FactSituacionFinanciera (LEFT JOIN para no excluir si no hay finanzas) | |
| resultados = db.query( | |
| DimEstudiante, | |
| FactRendimientoAcademico, | |
| FactSituacionFinanciera | |
| ).join( | |
| FactRendimientoAcademico, DimEstudiante.id_estudiante == FactRendimientoAcademico.id_estudiante | |
| ).outerjoin( | |
| FactSituacionFinanciera, DimEstudiante.id_estudiante == FactSituacionFinanciera.id_estudiante | |
| ).filter(FactRendimientoAcademico.nota_final <= limite_nota).all() | |
| data = [] | |
| for est, fact_aca, fact_fin in resultados: | |
| cuotas = fact_fin.cuotas_impagas if fact_fin else min_cuotas | |
| if cuotas < min_cuotas: | |
| continue | |
| deuda = float(fact_fin.monto_deuda) if fact_fin else 350.0 * cuotas | |
| estado_cartera = fact_fin.estado_cartera if fact_fin else "MORA" | |
| data.append({ | |
| "estudiante": est.nombre_completo, | |
| "codigo": est.codigo_estudiante or f"EST-{est.id_estudiante:06d}", | |
| "rendimiento": { | |
| "nota_actual": float(fact_aca.nota_final), | |
| "estado_academico": "CRÍTICO" | |
| }, | |
| "finanzas": { | |
| "cuotas_mora": cuotas, | |
| "deuda_total": deuda, | |
| "estado_cartera": estado_cartera | |
| }, | |
| "nivel_riesgo_global": "ALTO - CRÍTICO" | |
| }) | |
| return {"status": "success", "data": data} | |
| except Exception as e: | |
| logger.error(f"Fallo en OLAP: {e}") | |
| # Error crudo al frontend para diagnóstico exacto de PostgreSQL | |
| raise HTTPException(status_code=500, detail=f"Error DB: {str(e)}") | |
| class FinancePayload(BaseModel): | |
| nombre: Optional[str] = None | |
| codigo_estudiante: Optional[str] = None | |
| id_estudiante: Optional[int] = None | |
| id_tiempo: int | |
| monto_deuda: float | |
| cuotas_impagas: int | |
| estado_cartera: str | |
| tipo_alerta: str | |
| def procesar_registro_financiero(payload: FinancePayload, db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)): | |
| if current_user.role not in ["analista_datos_marketing", "admin"]: | |
| raise HTTPException(status_code=403, detail="Acceso denegado: Se requiere rol de Analista de Datos.") | |
| try: | |
| id_est = payload.id_estudiante | |
| if not id_est: | |
| from similarity import find_best_match | |
| estudiantes_db = db.query(DimEstudiante).all() | |
| nombre_resuelto = payload.nombre or "Desconocido" | |
| estudiante = next((e for e in estudiantes_db if e.codigo_estudiante == payload.codigo_estudiante), None) if payload.codigo_estudiante else None | |
| if not estudiante: | |
| best_match, score = find_best_match(nombre_resuelto, estudiantes_db, threshold=0.85) | |
| if best_match: | |
| estudiante = best_match | |
| if not estudiante: | |
| estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=payload.codigo_estudiante) | |
| db.add(estudiante) | |
| db.flush() | |
| id_est = estudiante.id_estudiante | |
| existing = db.query(FactSituacionFinanciera).filter_by( | |
| id_estudiante=id_est, | |
| id_tiempo=payload.id_tiempo | |
| ).first() | |
| if existing: | |
| existing.monto_deuda = payload.monto_deuda | |
| existing.cuotas_impagas = payload.cuotas_impagas | |
| existing.estado_cartera = payload.estado_cartera | |
| existing.tipo_alerta = payload.tipo_alerta | |
| else: | |
| nuevo_hecho = FactSituacionFinanciera( | |
| id_estudiante=id_est, | |
| id_tiempo=payload.id_tiempo, | |
| monto_deuda=payload.monto_deuda, | |
| cuotas_impagas=payload.cuotas_impagas, | |
| estado_cartera=payload.estado_cartera, | |
| tipo_alerta=payload.tipo_alerta | |
| ) | |
| db.add(nuevo_hecho) | |
| db.commit() | |
| return {"status": "success", "inserted": True} | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class UnpivotFinancePayload(BaseModel): | |
| id_estudiante: int | |
| raw_data: Dict[str, Any] | |
| def procesar_lote_financiero_unpivot(payloads: List[UnpivotFinancePayload], db: Session = Depends(get_db)): | |
| """ | |
| Recibe un lote de datos financieros con columnas de meses (ej. 'MONTO ENERO 2024') | |
| y utiliza pandas para despivotarlos antes de insertarlos en FactCobranzasProyectadas. | |
| """ | |
| try: | |
| from database import FactCobranzasProyectadas, DimTiempo | |
| # 1. Convertir payloads a DataFrame | |
| df_list = [] | |
| for p in payloads: | |
| row = p.raw_data.copy() | |
| row['id_estudiante'] = p.id_estudiante | |
| df_list.append(row) | |
| if not df_list: | |
| return {"status": "success", "inserted": 0} | |
| df = pd.DataFrame(df_list) | |
| # 2. Identificar columnas de meses (Empiezan con 'MONTO ') | |
| monto_cols = [c for c in df.columns if c.startswith('MONTO ')] | |
| id_cols = [c for c in df.columns if c not in monto_cols] | |
| # 3. Despivotar (Melt) | |
| df_melted = df.melt(id_vars=id_cols, value_vars=monto_cols, var_name='mes_anio', value_name='monto_esperado') | |
| # Filtrar nulos o ceros si no son necesarios | |
| df_melted['monto_esperado'] = pd.to_numeric(df_melted['monto_esperado'], errors='coerce') | |
| df_melted = df_melted.dropna(subset=['monto_esperado']) | |
| # 4. Insertar en base de datos | |
| inserted_count = 0 | |
| for index, row in df_melted.iterrows(): | |
| mes_raw = str(row['mes_anio']).replace('MONTO ', '').strip() # Ej 'ENERO 2024' | |
| parts = mes_raw.split() | |
| gestion = int(parts[1]) if len(parts) > 1 else 2024 | |
| mes_str = parts[0] if len(parts) > 0 else 'Enero' | |
| # Buscar o crear tiempo | |
| tiempo = db.query(DimTiempo).filter(DimTiempo.gestion == gestion, DimTiempo.mes == mes_str).first() | |
| if not tiempo: | |
| tiempo = DimTiempo(gestion=gestion, mes=mes_str) | |
| db.add(tiempo) | |
| db.commit() | |
| db.refresh(tiempo) | |
| nuevo_cobro = FactCobranzasProyectadas( | |
| id_estudiante=row['id_estudiante'], | |
| id_tiempo=tiempo.id_tiempo, | |
| monto_esperado=row['monto_esperado'], | |
| estado_pago='PROYECTADO' | |
| ) | |
| db.add(nuevo_cobro) | |
| inserted_count += 1 | |
| db.commit() | |
| return {"status": "success", "inserted": inserted_count} | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def procesar_lote_encuestas(payloads: List[Dict[str, Any]], db: Session = Depends(get_db)): | |
| """Ruta para encuestas (Placeholder para lógica de NLP sobre comentarios)""" | |
| return {"status": "success", "message": "Ruta de encuestas lista para implementación"} | |
| def procesar_lote_marketing(payloads: List[Dict[str, Any]], db: Session = Depends(get_db)): | |
| """ | |
| Bulk Insert real de datos de Marketing/Ventas en fact_marketing. | |
| Recibe una lista de dicts con raw_data del frontend y los inserta | |
| resolviendo las dimensiones id_modulo e id_tiempo. | |
| """ | |
| try: | |
| from sqlalchemy import func | |
| existing_modulos = {m.nombre_modulo: m.id_modulo for m in db.query(DimModulo).all()} | |
| existing_tiempos = {t.id_tiempo for t in db.query(DimTiempo).all()} | |
| hechos_mkt = [] | |
| for p in payloads: | |
| raw = p.get("raw_data", p) if isinstance(p, dict) else p | |
| # Resolver módulo/programa | |
| programa_raw = raw.get("programa") or raw.get("modulo") or raw.get("data_domain", "Marketing General") | |
| # Aplicar normalización del diccionario | |
| norm_progs = DICCIONARIO_NORMALIZACION.get("programas_cursos", {}) | |
| programa_clean = norm_progs.get(programa_raw, programa_raw) | |
| id_modulo_val = existing_modulos.get(programa_clean) | |
| if not id_modulo_val: | |
| nuevo_modulo = DimModulo( | |
| nombre_modulo=programa_clean, | |
| nombre_institucion=raw.get("institucion", "GiraGroup"), | |
| programa=programa_clean | |
| ) | |
| db.add(nuevo_modulo) | |
| db.flush() | |
| id_modulo_val = nuevo_modulo.id_modulo | |
| existing_modulos[programa_clean] = id_modulo_val | |
| # Resolver tiempo | |
| id_tiempo_val = raw.get("id_tiempo", 1) | |
| if id_tiempo_val not in existing_tiempos: | |
| db.add(DimTiempo(id_tiempo=id_tiempo_val, gestion=2026, semestre=1, mes="Junio")) | |
| db.flush() | |
| existing_tiempos.add(id_tiempo_val) | |
| # Extraer métricas de marketing | |
| leads_val = int(raw.get("leads", 1)) | |
| reservas_val = int(raw.get("reservas", 0)) | |
| inscritos_val = int(raw.get("inscritos", 0)) | |
| costo_val = float(raw.get("costo", raw.get("costo_programa", 0))) | |
| hechos_mkt.append(FactMarketingInscripciones( | |
| id_modulo=id_modulo_val, | |
| id_tiempo=id_tiempo_val, | |
| leads=leads_val, | |
| reservas=reservas_val, | |
| inscritos=inscritos_val, | |
| costo_programa=costo_val | |
| )) | |
| if hechos_mkt: | |
| db.add_all(hechos_mkt) | |
| db.commit() | |
| return {"status": "success", "inserted": len(hechos_mkt)} | |
| except Exception as e: | |
| db.rollback() | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def procesar_lote_financiero(payloads: List[FinancePayload], db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)): | |
| if current_user.role not in ["analista_datos_marketing", "admin"]: | |
| raise HTTPException(status_code=403, detail="Acceso denegado: Se requiere rol de Analista de Datos.") | |
| try: | |
| from similarity import find_best_match | |
| estudiantes_db = db.query(DimEstudiante).all() | |
| # Cargar hechos de situación financiera existentes en memoria | |
| existing_finance = {(f.id_estudiante, f.id_tiempo): f.id_hecho_fin for f in db.query(FactSituacionFinanciera).all()} | |
| for payload in payloads: | |
| # 1. Resolver Estudiante | |
| nombre_resuelto = payload.nombre or "Desconocido" | |
| estudiante = next((e for e in estudiantes_db if e.codigo_estudiante == payload.codigo_estudiante), None) if payload.codigo_estudiante else None | |
| if not estudiante: | |
| best_match, score = find_best_match(nombre_resuelto, estudiantes_db, threshold=0.85) | |
| if best_match: | |
| estudiante = best_match | |
| # Si de plano no existe, lo creamos para que no falle la FK | |
| if not estudiante: | |
| estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=payload.codigo_estudiante) | |
| db.add(estudiante) | |
| db.flush() | |
| estudiantes_db.append(estudiante) | |
| # 2. Insertar o actualizar el hecho financiero (Deduplicación / Upsert) | |
| key = (estudiante.id_estudiante, payload.id_tiempo) | |
| if key in existing_finance: | |
| db_finance = db.query(FactSituacionFinanciera).filter_by(id_hecho_fin=existing_finance[key]).first() | |
| if db_finance: | |
| db_finance.monto_deuda = payload.monto_deuda | |
| db_finance.cuotas_impagas = payload.cuotas_impagas | |
| db_finance.estado_cartera = payload.estado_cartera | |
| db_finance.tipo_alerta = payload.tipo_alerta | |
| else: | |
| new_finance = FactSituacionFinanciera( | |
| id_estudiante=estudiante.id_estudiante, | |
| id_tiempo=payload.id_tiempo, | |
| monto_deuda=payload.monto_deuda, | |
| cuotas_impagas=payload.cuotas_impagas, | |
| estado_cartera=payload.estado_cartera, | |
| tipo_alerta=payload.tipo_alerta | |
| ) | |
| db.add(new_finance) | |
| db.flush() | |
| existing_finance[key] = new_finance.id_hecho_fin | |
| db.commit() | |
| return {"status": "success", "inserted_count": len(payloads)} | |
| except Exception as e: | |
| db.rollback() | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class MLOpsFeedbackPayload(BaseModel): | |
| texto_erroneo: str | |
| prediccion_beto: str | |
| confianza_ia: float | |
| texto_corregido: str | |
| def log_mlops_feedback(payload: MLOpsFeedbackPayload, db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)): | |
| try: | |
| # Update existing pending logs for this text | |
| logs_pendientes = db.query(LogAuditoriaNlp).filter( | |
| LogAuditoriaNlp.texto_original == payload.texto_erroneo, | |
| LogAuditoriaNlp.correccion_humana == "PENDIENTE" | |
| ).all() | |
| if logs_pendientes: | |
| for log in logs_pendientes: | |
| log.correccion_humana = payload.texto_corregido | |
| log.usuario_auditor = current_user.id | |
| else: | |
| # If none pending found, still add as a new memory rule | |
| nuevo_log = LogAuditoriaNlp( | |
| texto_original=payload.texto_erroneo, | |
| prediccion_beto=payload.prediccion_beto, | |
| confianza_ia=payload.confianza_ia, | |
| correccion_humana=payload.texto_corregido, | |
| usuario_auditor=current_user.id | |
| ) | |
| db.add(nuevo_log) | |
| db.commit() | |
| return {"status": "success", "message": "Feedback logged successfully"} | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_dashboard_kpis(db: Session = Depends(get_db)): | |
| try: | |
| # Calcular KPIs desde la BD | |
| from sqlalchemy import func | |
| total_estudiantes = db.query(DimEstudiante).count() | |
| total_documentos = db.query(DimOrigenDocumental).count() | |
| # Rendimiento académico stats | |
| stats_aca = db.query( | |
| func.avg(FactRendimientoAcademico.nivel_confianza_ia).label('avg_conf') | |
| ).first() | |
| avg_conf = float(stats_aca.avg_conf) if stats_aca and stats_aca.avg_conf else 0.0 | |
| # auditorias could be None or something else depending on driver, simpler approach: | |
| auditorias = db.query(FactRendimientoAcademico).filter(FactRendimientoAcademico.requiere_revision == True).count() | |
| total_hechos = db.query(FactRendimientoAcademico).count() | |
| pct_auditoria = (auditorias / total_hechos) if total_hechos > 0 else 0 | |
| calidad_data_score = 0.96 # Hardcode mock if not storing raw inconsistencies in DB, but could derive from auditorias | |
| return { | |
| "status": "success", | |
| "kpis": { | |
| "calidad_datos": round(1.0 - (pct_auditoria * 0.5), 2), | |
| "registros_unificados": total_estudiantes, | |
| "documentos_procesados": total_documentos, | |
| "estudiantes_relacionados": round(1.0 - (total_estudiantes / total_hechos if total_hechos > 0 else 1.0), 2), | |
| "casos_auditoria": round(pct_auditoria, 2), | |
| "confianza_promedio": round(avg_conf, 2), | |
| "total_hechos": total_hechos | |
| } | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ============================================================================= | |
| # ENDPOINTS CMI (CUADRO DE MANDO INTEGRAL) | |
| # ============================================================================= | |
| def get_dashboard_scorecard( | |
| umbral_nota: float = Query(default=70.0, ge=0.0, le=100.0), | |
| min_cuotas: int = Query(default=2, ge=0, le=20), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Perspectiva 1: Scorecard Ejecutivo. | |
| Retorna los 6 KPIs ejecutivos del CMI. | |
| """ | |
| try: | |
| from sqlalchemy import func | |
| total_estudiantes = db.query(DimEstudiante).count() | |
| # 1. Riesgo Multidimensional (Académico + Financiero cruzado) | |
| alumnos_riesgo = 0 | |
| try: | |
| cruzados = db.query(DimEstudiante.id_estudiante).join( | |
| FactRendimientoAcademico, DimEstudiante.id_estudiante == FactRendimientoAcademico.id_estudiante | |
| ).join( | |
| FactSituacionFinanciera, DimEstudiante.id_estudiante == FactSituacionFinanciera.id_estudiante | |
| ).filter( | |
| FactRendimientoAcademico.nota_final <= umbral_nota, | |
| FactSituacionFinanciera.cuotas_impagas >= min_cuotas | |
| ).distinct().count() | |
| alumnos_riesgo = cruzados | |
| except Exception: | |
| db.rollback() | |
| indice_riesgo = (alumnos_riesgo / total_estudiantes * 100) if total_estudiantes > 0 else 0 | |
| # 2. Cartera financiera | |
| cartera = {} | |
| deuda_total = 0 | |
| try: | |
| cartera_q = db.query( | |
| FactSituacionFinanciera.estado_cartera, | |
| func.count(FactSituacionFinanciera.id_hecho_fin).label("cantidad"), | |
| func.sum(FactSituacionFinanciera.monto_deuda).label("total") | |
| ).group_by(FactSituacionFinanciera.estado_cartera).all() | |
| cartera = {r.estado_cartera: {"cantidad": int(r.cantidad), "monto": float(r.total or 0)} for r in cartera_q} | |
| deuda_total = sum(v["monto"] for v in cartera.values()) | |
| except Exception: | |
| db.rollback() | |
| # 3. EBITDA / Rentabilidad (try real table, fallback to computed from cartera) | |
| ebitda_data = {"ingresos_ejecutados": 0, "costos_asociados": 0, "ebitda": 0, "meta_ingresos": 0} | |
| margen_por_programa = [] | |
| try: | |
| rent_data = db.query( | |
| DimModulo.programa, | |
| func.sum(FactRentabilidadPresupuesto.monto_ejecutado).label("ejecutado"), | |
| func.sum(FactRentabilidadPresupuesto.monto_meta).label("meta") | |
| ).join(DimModulo, FactRentabilidadPresupuesto.id_modulo == DimModulo.id_modulo | |
| ).group_by(DimModulo.programa).all() | |
| total_ejecutado = sum(float(r.ejecutado or 0) for r in rent_data) | |
| total_meta = sum(float(r.meta or 0) for r in rent_data) | |
| ebitda_data = { | |
| "ingresos_ejecutados": total_ejecutado, | |
| "costos_asociados": total_ejecutado * 0.65, | |
| "ebitda": total_ejecutado * 0.35, | |
| "meta_ingresos": total_meta, | |
| "cumplimiento_pct": round((total_ejecutado / total_meta * 100), 1) if total_meta > 0 else 0 | |
| } | |
| margen_por_programa = [ | |
| {"programa": r.programa or "General", "ejecutado": float(r.ejecutado or 0), "meta": float(r.meta or 0), | |
| "margen_pct": round(float(r.ejecutado or 0) / float(r.meta or 1) * 100, 1)} | |
| for r in rent_data | |
| ] | |
| except Exception: | |
| db.rollback() | |
| # 4. Tasa de Retención Estudiantil | |
| total_hechos_aca = 0 | |
| estudiantes_activos = 0 | |
| retencion_pct = 0 | |
| try: | |
| total_hechos_aca = db.query(FactRendimientoAcademico).count() | |
| estudiantes_activos = db.query(FactRendimientoAcademico.id_estudiante).distinct().count() | |
| # Students with nota > umbral are "retained" | |
| retenidos = db.query(FactRendimientoAcademico.id_estudiante).filter( | |
| FactRendimientoAcademico.nota_final > umbral_nota | |
| ).distinct().count() | |
| retencion_pct = round((retenidos / estudiantes_activos * 100), 1) if estudiantes_activos > 0 else 0 | |
| except Exception: | |
| db.rollback() | |
| # 5. Satisfacción Global (NPS Docente) | |
| satisfaccion = 0 | |
| try: | |
| avg_nps = db.query(func.avg(FactEvaluacionDocente.puntuacion)).scalar() | |
| satisfaccion = round(float(avg_nps or 0), 1) | |
| except Exception: | |
| db.rollback() | |
| # 6. Integridad MLOps | |
| mlops_integridad = {"validados_pct": 0, "en_auditoria_pct": 0, "total_registros": 0} | |
| try: | |
| total_reg = db.query(FactRendimientoAcademico).count() | |
| en_revision = db.query(FactRendimientoAcademico).filter(FactRendimientoAcademico.requiere_revision == True).count() | |
| avg_conf = db.query(func.avg(FactRendimientoAcademico.nivel_confianza_ia)).scalar() | |
| mlops_integridad = { | |
| "validados_pct": round(((total_reg - en_revision) / total_reg * 100), 1) if total_reg > 0 else 0, | |
| "en_auditoria_pct": round((en_revision / total_reg * 100), 1) if total_reg > 0 else 0, | |
| "total_registros": total_reg, | |
| "confianza_promedio": round(float(avg_conf or 0) * 100, 1) | |
| } | |
| except Exception: | |
| db.rollback() | |
| return { | |
| "status": "success", | |
| "kpis": { | |
| "ebitda": ebitda_data, | |
| "retencion_pct": retencion_pct, | |
| "satisfaccion_global": satisfaccion, | |
| "riesgo_desercion_pct": round(indice_riesgo, 1), | |
| "alumnos_riesgo": alumnos_riesgo, | |
| "total_estudiantes": total_estudiantes, | |
| "deuda_total": deuda_total, | |
| "mlops": mlops_integridad | |
| }, | |
| "cartera": cartera, | |
| "margen_por_programa": margen_por_programa | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_dashboard_academica( | |
| umbral_nota: float = Query(default=70.0, ge=0.0, le=100.0), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Perspectiva 2: Gestión Académica. | |
| Riesgo académico, deserción, evaluaciones docentes, distribución por estado. | |
| """ | |
| try: | |
| from sqlalchemy import func | |
| # 1. Aprobación vs Reprobación | |
| totales = db.query(FactRendimientoAcademico).count() | |
| reprobados = db.query(FactRendimientoAcademico).filter(FactRendimientoAcademico.nota_final <= umbral_nota).count() | |
| aprobados = totales - reprobados | |
| # 2. Riesgo Académico Crítico | |
| nota_promedio = 0 | |
| try: | |
| avg_nota = db.query(func.avg(FactRendimientoAcademico.nota_final)).scalar() | |
| nota_promedio = round(float(avg_nota or 0), 1) | |
| except Exception: | |
| db.rollback() | |
| # 3. Riesgo Deserción (Inasistencia > 30% o Incumplimiento > 30%) | |
| riesgo_desercion = 0 | |
| try: | |
| riesgo_desercion = db.query(FactRendimientoAcademico).filter( | |
| (FactRendimientoAcademico.asistencia_pct < 70) | | |
| (FactRendimientoAcademico.incumplimiento_actividades_pct > 30) | |
| ).count() | |
| except Exception: | |
| db.rollback() | |
| # 4. Dispersión Notas vs Asistencia por módulo | |
| dispersion = [] | |
| try: | |
| dispersion_raw = db.query( | |
| DimModulo.nombre_modulo, | |
| func.avg(FactRendimientoAcademico.nota_final).label("nota_promedio"), | |
| func.avg(FactRendimientoAcademico.asistencia_pct).label("asistencia_promedio"), | |
| func.avg(FactRendimientoAcademico.incumplimiento_actividades_pct).label("incumplimiento_promedio"), | |
| func.count(FactRendimientoAcademico.id_hecho_aca).label("total_alumnos") | |
| ).join( | |
| DimModulo, FactRendimientoAcademico.id_modulo == DimModulo.id_modulo | |
| ).group_by(DimModulo.nombre_modulo).all() | |
| dispersion = [ | |
| { | |
| "modulo": r.nombre_modulo, | |
| "nota": round(float(r.nota_promedio or 0), 1), | |
| "asistencia": round(float(r.asistencia_promedio or 0), 1), | |
| "incumplimiento": round(float(r.incumplimiento_promedio or 0), 1), | |
| "alumnos": int(r.total_alumnos) | |
| } for r in dispersion_raw | |
| ] | |
| except Exception: | |
| db.rollback() | |
| # 5. Evaluación Docente (NPS) | |
| nps_data = [] | |
| nps_promedio_global = 0 | |
| try: | |
| docentes_nps = db.query( | |
| DimDocente.nombre_completo, | |
| DimDocente.area_especialidad, | |
| func.avg(FactEvaluacionDocente.puntuacion).label("nps_promedio"), | |
| func.count(FactEvaluacionDocente.id_hecho_eval).label("total_evaluaciones") | |
| ).outerjoin( | |
| FactEvaluacionDocente, DimDocente.id_docente == FactEvaluacionDocente.id_docente | |
| ).group_by(DimDocente.nombre_completo, DimDocente.area_especialidad).all() | |
| nps_data = [ | |
| { | |
| "docente": d.nombre_completo, | |
| "area": d.area_especialidad, | |
| "nps": round(float(d.nps_promedio), 1) if d.nps_promedio else 0, | |
| "evaluaciones": int(d.total_evaluaciones) | |
| } for d in docentes_nps | |
| ] | |
| if nps_data: | |
| nps_promedio_global = round(sum(d["nps"] for d in nps_data if d["nps"] > 0) / max(len([d for d in nps_data if d["nps"] > 0]), 1), 1) | |
| except Exception: | |
| db.rollback() | |
| # 6. Distribución por estado ARCA (derivado de notas y asistencia) | |
| estado_distribucion = [] | |
| try: | |
| # Aprobado-Titulado: nota > 70 y asistencia >= 70 | |
| aprobado_titulado = db.query(FactRendimientoAcademico).filter( | |
| FactRendimientoAcademico.nota_final > umbral_nota, | |
| FactRendimientoAcademico.asistencia_pct >= 70 | |
| ).count() | |
| # Reprobado-Insuficiencia: nota <= 70 y asistencia >= 70 | |
| reprobado_insuf = db.query(FactRendimientoAcademico).filter( | |
| FactRendimientoAcademico.nota_final <= umbral_nota, | |
| FactRendimientoAcademico.asistencia_pct >= 70 | |
| ).count() | |
| # Reprobado-Deserción: asistencia < 50 | |
| reprobado_desercion = db.query(FactRendimientoAcademico).filter( | |
| FactRendimientoAcademico.asistencia_pct < 50 | |
| ).count() | |
| # Reprobado-Congelamiento: nota <= 70 y 50 <= asistencia < 70 | |
| reprobado_congelamiento = db.query(FactRendimientoAcademico).filter( | |
| FactRendimientoAcademico.nota_final <= umbral_nota, | |
| FactRendimientoAcademico.asistencia_pct >= 50, | |
| FactRendimientoAcademico.asistencia_pct < 70 | |
| ).count() | |
| estado_distribucion = [ | |
| {"estado": "Aprobado - Titulado", "cantidad": aprobado_titulado, "color": "#10b981"}, | |
| {"estado": "Reprobado - Insuficiencia", "cantidad": reprobado_insuf, "color": "#f59e0b"}, | |
| {"estado": "Reprobado - Deserción", "cantidad": reprobado_desercion, "color": "#ef4444"}, | |
| {"estado": "Reprobado - Congelamiento", "cantidad": reprobado_congelamiento, "color": "#8b5cf6"} | |
| ] | |
| except Exception: | |
| db.rollback() | |
| return { | |
| "status": "success", | |
| "aprobacion": {"aprobados": aprobados, "reprobados": reprobados, "total": totales}, | |
| "kpis": { | |
| "nota_promedio": nota_promedio, | |
| "riesgo_desercion": riesgo_desercion, | |
| "nps_promedio_global": nps_promedio_global, | |
| "tasa_aprobacion_pct": round((aprobados / totales * 100), 1) if totales > 0 else 0 | |
| }, | |
| "dispersion": dispersion, | |
| "nps_docentes": nps_data, | |
| "estado_distribucion": estado_distribucion | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_dashboard_comercial(db: Session = Depends(get_db)): | |
| """ | |
| Perspectiva 3: Comercial y Financiera. | |
| Embudo de marketing y liquidez. | |
| """ | |
| try: | |
| from sqlalchemy import func | |
| # 1. Embudo de Marketing (Mocked if table is missing) | |
| try: | |
| embudo_raw = db.query( | |
| func.sum(FactMarketingInscripciones.leads).label("leads"), | |
| func.sum(FactMarketingInscripciones.reservas).label("reservas"), | |
| func.sum(FactMarketingInscripciones.inscritos).label("inscritos") | |
| ).first() | |
| leads = int(embudo_raw.leads or 0) if embudo_raw else 0 | |
| reservas = int(embudo_raw.reservas or 0) if embudo_raw else 0 | |
| inscritos = int(embudo_raw.inscritos or 0) if embudo_raw else 0 | |
| except Exception: | |
| db.rollback() | |
| leads = reservas = inscritos = 0 | |
| # 2. Liquidez Proyectada (Mocked if table is missing) | |
| try: | |
| liquidez = db.query( | |
| FactCobranzasProyectadas.estado_pago, | |
| func.sum(FactCobranzasProyectadas.monto_esperado).label("monto") | |
| ).group_by(FactCobranzasProyectadas.estado_pago).all() | |
| flujo_caja = {row.estado_pago: float(row.monto or 0) for row in liquidez} | |
| except Exception: | |
| db.rollback() | |
| flujo_caja = {} | |
| return { | |
| "status": "success", | |
| "embudo": [ | |
| {"etapa": "Leads", "cantidad": leads}, | |
| {"etapa": "Reservas", "cantidad": reservas}, | |
| {"etapa": "Inscritos", "cantidad": inscritos} | |
| ], | |
| "liquidez": { | |
| "total_recaudado": sum(flujo_caja.values()), | |
| "estudiantes_al_dia": 0 # This should be derived from FactSituacionFinanciera but just mocking it here if we don't have it | |
| } | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_dashboard_calidad(db: Session = Depends(get_db)): | |
| """ | |
| Perspectiva 4: Calidad MLOps | |
| """ | |
| try: | |
| from sqlalchemy import func, case | |
| # Priorizar PENDIENTE y luego ordenar por fecha | |
| logs = db.query(LogAuditoriaNlp).order_by( | |
| case((LogAuditoriaNlp.correccion_humana == 'PENDIENTE', 0), else_=1), | |
| LogAuditoriaNlp.created_at.desc() | |
| ).limit(100).all() | |
| datos_log = [ | |
| { | |
| "id": log.id_log, | |
| "texto_original": log.texto_original, | |
| "prediccion": log.prediccion_beto, | |
| "correccion": log.correccion_humana, | |
| "confianza": float(log.confianza_ia or 0) | |
| } for log in logs | |
| ] | |
| confianza_promedio = db.query(func.avg(FactRendimientoAcademico.nivel_confianza_ia)).scalar() or 0.0 | |
| return { | |
| "status": "success", | |
| "confianza_promedio": float(confianza_promedio), | |
| "logs_auditoria": datos_log | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def anonymize_name(name: str) -> str: | |
| if not name or name.strip() == "": | |
| return "Desconocido" | |
| parts = name.strip().split() | |
| anonymized_parts = [] | |
| for p in parts: | |
| if len(p) > 1: | |
| anonymized_parts.append(p[0] + "***") | |
| else: | |
| anonymized_parts.append(p + "***") | |
| return " ".join(anonymized_parts) | |
| def batch_analyze_nlp( | |
| payload: Union[BatchPayload, List[ProcessSheetPayload]], | |
| db: Session = Depends(get_db) | |
| ): | |
| if isinstance(payload, list): | |
| records = payload | |
| else: | |
| records = payload.records | |
| # ── Deduplicación con Pandas (todas las columnas) ───────────────────── | |
| try: | |
| records_dicts = [r.model_dump() if hasattr(r, 'model_dump') else r.dict() for r in records] | |
| df_records = pd.DataFrame(records_dicts) | |
| original_count = len(df_records) | |
| df_records = df_records.drop_duplicates() | |
| dedup_count = original_count - len(df_records) | |
| if dedup_count > 0: | |
| logger.info(f"drop_duplicates eliminó {dedup_count} registros duplicados exactos de {original_count}") | |
| # ── Normalización con diccionario ───────────────────────────────── | |
| norm_progs = DICCIONARIO_NORMALIZACION.get("programas_cursos", {}) | |
| norm_ciudades = DICCIONARIO_NORMALIZACION.get("departamentos_ciudades", {}) | |
| norm_grupos = DICCIONARIO_NORMALIZACION.get("grupos_sede", {}) | |
| norm_estados = DICCIONARIO_NORMALIZACION.get("estados_financieros", {}) | |
| norm_estados_arca = DICCIONARIO_NORMALIZACION.get("estados_academicos_arca", {}) | |
| if "programa" in df_records.columns: | |
| # Eliminar versiones como "v.3", "2° Versión", "3ª Versión" antes del cruce | |
| df_records["programa"] = df_records["programa"].str.replace(r'(?i)\s*(v\.\d+|\d+[°ª]\s*Versi[oó]n).*$', '', regex=True) | |
| df_records["programa"] = df_records["programa"].replace(norm_progs) | |
| if "modulo" in df_records.columns: | |
| df_records["modulo"] = df_records["modulo"].replace(norm_progs) | |
| if "ciudad" in df_records.columns: | |
| df_records["ciudad"] = df_records["ciudad"].replace(norm_ciudades) | |
| df_records["ciudad"] = df_records["ciudad"].replace(norm_grupos) | |
| if "estado_cartera" in df_records.columns: | |
| df_records["estado_cartera"] = df_records["estado_cartera"].replace(norm_estados) | |
| for col_arca in ["estado_academico", "estado_tutoria", "estado_arca"]: | |
| if col_arca in df_records.columns: | |
| df_records[col_arca] = df_records[col_arca].replace(norm_estados_arca) | |
| # Convert NaN to None to prevent Pydantic validation errors | |
| df_records = df_records.where(pd.notnull(df_records), None) | |
| # Reconstruir records desde DataFrame normalizado | |
| if hasattr(records[0], 'model_validate'): | |
| RecordClass = type(records[0]) | |
| records = [RecordClass.model_validate(row) for row in df_records.to_dict(orient="records")] | |
| else: | |
| records = [type(records[0])(**row) for row in df_records.to_dict(orient="records")] | |
| except Exception as e: | |
| logger.warning(f"drop_duplicates/normalización falló (procesando sin dedup): {e}") | |
| results = [] | |
| estudiantes_existentes = db.query(DimEstudiante).all() | |
| existing_docentes = {d.nombre_completo: d.id_docente for d in db.query(DimDocente).all()} | |
| existing_modulos = {m.nombre_modulo: m.id_modulo for m in db.query(DimModulo).all()} | |
| existing_tiempos = {t.id_tiempo for t in db.query(DimTiempo).all()} | |
| existing_docs = {d.id_documento for d in db.query(DimOrigenDocumental).all()} | |
| existing_users = {u.id for u in db.query(Users).all()} | |
| for record in records: | |
| # 1. Determinar el área y calcular confianza ajustada | |
| area = getattr(record, 'tipo_fuente', 'ACADEMIC') | |
| if not area: | |
| area = 'ACADEMIC' | |
| entidades = [] | |
| if area in ['BUDGET', 'FINANCE', 'MARKETING', 'SURVEYS']: | |
| confianza_ia = 0.95 | |
| else: | |
| entidades = ner_engine.extract_entities(record.texto_celda) | |
| if entidades: | |
| confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) | |
| else: | |
| confianza_ia = 0.40 | |
| nombre_resuelto = record.texto_celda[:200].strip() | |
| # Consultar log_auditoria_nlp primero | |
| log_memoria = db.query(LogAuditoriaNlp).filter( | |
| LogAuditoriaNlp.texto_original == nombre_resuelto | |
| ).order_by(LogAuditoriaNlp.created_at.desc()).first() | |
| estudiante = None | |
| requiere_revision = False | |
| if log_memoria and log_memoria.correccion_humana != "PENDIENTE": | |
| nombre_resuelto = log_memoria.correccion_humana | |
| confianza_ia = 1.0 | |
| best_match, _ = find_best_match(nombre_resuelto, estudiantes_existentes) | |
| if best_match: | |
| estudiante = best_match | |
| else: | |
| estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=record.codigo_estudiante) | |
| db.add(estudiante) | |
| db.flush() | |
| estudiantes_existentes.append(estudiante) | |
| else: | |
| # Skip fuzzy match for non-academic/non-finance areas where "student" name isn't critical | |
| if area in ['ACADEMIC', 'FINANCE']: | |
| best_match, score = find_best_match(nombre_resuelto, estudiantes_existentes) | |
| if best_match and score >= 0.8: | |
| estudiante = best_match | |
| if score < confianza_ia: | |
| confianza_ia = score | |
| else: | |
| estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=record.codigo_estudiante, genero=record.genero, ciudad=record.ciudad) | |
| db.add(estudiante) | |
| db.flush() | |
| estudiantes_existentes.append(estudiante) | |
| else: | |
| # Mock student for MARKETING/SURVEYS if none exists to satisfy foreign keys | |
| estudiante = estudiantes_existentes[0] if estudiantes_existentes else DimEstudiante(nombre_completo="Anonimo") | |
| if not estudiantes_existentes: | |
| db.add(estudiante) | |
| db.flush() | |
| estudiantes_existentes.append(estudiante) | |
| candidatos_difusos = get_top_matches(nombre_resuelto, estudiantes_existentes, top_k=5) if requiere_revision or (confianza_ia < 0.50 and area in ['ACADEMIC', 'FINANCE']) else [] | |
| # Calculo de alertas | |
| alertas = [] | |
| if area == 'ACADEMIC': | |
| if getattr(record, 'nota_detectada', 100) <= 70.0: | |
| alertas.append("RIESGO_ACADEMICO_CRITICO") | |
| if getattr(record, 'asistencia', 100) < 70.0 or getattr(record, 'incumplimiento_tareas', 0) > 30.0: | |
| alertas.append("RIESGO_DESERCION_ALTA") | |
| # Ensure dimensions exist using in-memory cache to prevent lock contention | |
| docente_name = getattr(record, 'docente', None) or "Docente Generico" | |
| id_docente_val = existing_docentes.get(docente_name) | |
| if not id_docente_val: | |
| nuevo_docente = DimDocente(nombre_completo=docente_name, area_especialidad="General") | |
| db.add(nuevo_docente) | |
| db.flush() | |
| id_docente_val = nuevo_docente.id_docente | |
| existing_docentes[docente_name] = id_docente_val | |
| modulo_name = getattr(record, 'modulo', None) or "Modulo Generico" | |
| id_modulo_val = existing_modulos.get(modulo_name) | |
| if not id_modulo_val: | |
| nuevo_modulo = DimModulo(nombre_modulo=modulo_name, nombre_institucion=getattr(record, 'institucion', None) or "GiraGroup", programa=getattr(record, 'programa', None) or "General") | |
| db.add(nuevo_modulo) | |
| db.flush() | |
| id_modulo_val = nuevo_modulo.id_modulo | |
| existing_modulos[modulo_name] = id_modulo_val | |
| id_tiempo_val = getattr(record, 'id_tiempo', 1) | |
| if id_tiempo_val not in existing_tiempos: | |
| db.add(DimTiempo(id_tiempo=id_tiempo_val, gestion=2026, semestre=1, mes="Mayo")) | |
| existing_tiempos.add(id_tiempo_val) | |
| id_documento_val = getattr(record, 'id_documento', 1) | |
| if id_documento_val not in existing_docs: | |
| db.add(DimOrigenDocumental(id_documento=id_documento_val, tipo_documento="SHEET", nombre_archivo="carga_automatica")) | |
| existing_docs.add(id_documento_val) | |
| id_usuario_val = getattr(record, 'id_usuario', 1) | |
| if id_usuario_val not in existing_users: | |
| db.add(Users(id=id_usuario_val, username=f"sistema_{id_usuario_val}", hashed_password="$placeholder$", role="admin")) | |
| existing_users.add(id_usuario_val) | |
| db.flush() | |
| requiere_revision = False | |
| if confianza_ia < 0.60 and area in ['ACADEMIC', 'FINANCE']: | |
| requiere_revision = True | |
| if not log_memoria: | |
| log = LogAuditoriaNlp( | |
| texto_original=nombre_resuelto, | |
| prediccion_beto=nombre_resuelto, | |
| confianza_ia=confianza_ia, | |
| correccion_humana="PENDIENTE", | |
| usuario_auditor=id_usuario_val | |
| ) | |
| db.add(log) | |
| db.flush() | |
| # Insert into Constellation Schema ALWAYS based on area | |
| # ELIMINADO: La inserción a las tablas de hechos ahora OCURRE ÚNICAMENTE en /api/v1/ingesta/bulk | |
| # para evitar duplicación de datos entre el análisis y la confirmación final. | |
| results.append({ | |
| "anonymized_name": anonymize_name(nombre_resuelto), | |
| "nombre_resuelto": nombre_resuelto, | |
| "confianza_ia": round(float(confianza_ia), 4), | |
| "alertas": alertas, | |
| "requiere_revision": requiere_revision, | |
| "status": "pending_human_review" if requiere_revision else "inserted", | |
| "candidatos_difusos": candidatos_difusos, | |
| "area_asignada": area | |
| }) | |
| try: | |
| db.commit() | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| return { | |
| "status": "success", | |
| "processed_count": len(records), | |
| "results": results | |
| } |