Adzacam
feat: import Field and field_validator from pydantic in app.py
095f386
Raw
History Blame
32.6 kB
import os
import datetime
import logging
import re
from fastapi import FastAPI, Depends, HTTPException, status, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, ConfigDict, Field, field_validator
import rapidfuzz
import pandas as pd
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from database import (
get_db,
DimEstudiante,
DimDocente,
DimModulo,
DimTiempo,
DimOrigenDocumental,
Users,
FactRendimientoAcademico,
FactSituacionFinanciera,
LogAuditoriaNlp,
)
from ner_engine import ner_engine
from similarity import find_best_match
from typing import List, Optional
import jwt
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
SECRET_KEY = os.getenv("JWT_SECRET", "super-secret-local-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def verify_password(plain_password, hashed_password):
if hashed_password == "$placeholder$":
return plain_password == "admin123"
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.datetime.utcnow() + expires_delta
else:
expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = db.query(Users).filter(Users.username == username).first()
if user is None:
raise credentials_exception
return user
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="GiraGroup BI Backend Cloud",
description="API para Tecnologías Emergentes II con BETO y Supabase",
version="1.0.0"
)
# CORS: sólo permite peticiones desde el frontend registrado
_ALLOWED_ORIGINS = [
origin.strip()
for origin in os.getenv("CORS_ALLOWED_ORIGINS", "http://localhost:5173,https://giragroup-bi-frontend-tei-jgc45f654-dazz-s-projects.vercel.app,https://giragroup-bi-frontend-tei-ii.vercel.app").split(",")
if origin.strip()
]
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# Valores válidos para el CHECK constraint de Supabase
TIPO_DOC_VALIDO = "SHEET"
# Clave interna para el endpoint de diagnóstico (solo debugging, nunca pública)
_DIAG_SECRET = os.getenv("DIAG_SECRET", "")
class ProcessSheetPayload(BaseModel):
# texto_celda: máximo 300 caracteres, sin caracteres de control ni HTML
texto_celda: str = Field(..., min_length=1, max_length=300)
nota_detectada: float = Field(..., ge=0.0, le=100.0)
asistencia: float = Field(..., ge=0.0, le=100.0)
incumplimiento_tareas: float = Field(..., ge=0.0, le=100.0)
id_docente: int = Field(..., ge=1, le=9999)
id_modulo: int = Field(..., ge=1, le=9999)
id_tiempo: int = Field(..., ge=1, le=9999)
id_documento: int = Field(..., ge=1, le=9999)
id_usuario: int = Field(..., ge=1, le=9999)
codigo_estudiante: Optional[str] = None
programa: Optional[str] = None
modulo: Optional[str] = None
semestre: Optional[str] = None
institucion: Optional[str] = None
@field_validator('texto_celda')
@classmethod
def sanitize_texto(cls, v: str) -> str:
# Eliminar etiquetas HTML, caracteres de control y secuencias peligrosas
v = re.sub(r'<[^>]*>', '', v) # strip HTML tags
v = re.sub(r'[\x00-\x1f\x7f]', '', v) # strip control chars
v = v.strip()
if not v:
raise ValueError('texto_celda no puede estar vacío después de sanitizar')
return v
class ProcessSheetPayloadRaw(BaseModel):
texto_celda: str
nota_detectada: float
asistencia: float
incumplimiento_tareas: float
codigo_estudiante: Optional[str] = None
programa: Optional[str] = None
modulo: Optional[str] = None
semestre: Optional[str] = None
institucion: Optional[str] = None
@app.get("/")
def read_root():
return {
"status": "healthy",
"service": "GiraGroup BI Backend API Cloud",
"ner_initialized": ner_engine._initialized or ner_engine.pipeline is not None
}
class Token(BaseModel):
access_token: str
token_type: str
role: str
@app.post("/api/v1/auth/login", response_model=Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(Users).filter(Users.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
# Auto-seed the user if it's one of the test users and doesn't exist
test_users = {
"directivo@giragroup.com": {"password": "Directivo@123", "role": "comite_directivo"},
"academico@giragroup.com": {"password": "Academico@123", "role": "coordinador_academico"},
"datos@giragroup.com": {"password": "Datos@123", "role": "analista_datos_marketing"},
"admin@giragroup.com": {"password": "Admin@123", "role": "admin"}
}
if form_data.username in test_users and form_data.password == test_users[form_data.username]["password"]:
if not user:
user = Users(
username=form_data.username,
hashed_password=get_password_hash(form_data.password),
role=test_users[form_data.username]["role"]
)
db.add(user)
db.commit()
db.refresh(user)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "role": user.role}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer", "role": user.role}
@app.get("/api/v1/diagnostico")
def diagnostico_db(
secret: str = Query(default=""),
db: Session = Depends(get_db)
):
"""
Endpoint de diagnóstico: protegido por DIAG_SECRET.
En producción, configurar DIAG_SECRET en los Secrets del Space.
Sin la clave correcta devuelve 403.
"""
if not _DIAG_SECRET or secret != _DIAG_SECRET:
raise HTTPException(status_code=403, detail="Acceso denegado al diagnóstico.")
resultados = {}
tablas = {
"dim_estudiante": DimEstudiante,
"dim_docente": DimDocente,
"dim_modulo": DimModulo,
"dim_tiempo": DimTiempo,
"dim_origen_documental": DimOrigenDocumental,
"users": Users,
"fact_rendimiento_academico": FactRendimientoAcademico,
}
for nombre, modelo in tablas.items():
try:
count = db.query(modelo).count()
resultados[nombre] = {"ok": True, "count": count}
except Exception as e:
resultados[nombre] = {"ok": False, "error": str(e)}
todo_ok = all(v["ok"] for v in resultados.values())
return {
"conexion": "ok",
"tablas": resultados,
"listo_para_produccion": todo_ok
}
@app.post("/api/v1/ingesta/tabular", status_code=status.HTTP_201_CREATED)
def procesar_registro_tabular(payload: ProcessSheetPayload, db: Session = Depends(get_db)):
try:
# 1. NLP con BETO
entidades = ner_engine.extract_entities(payload.texto_celda)
confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) if entidades else 1.0
forzar_revision = confianza_ia < 0.60
# 2. Dimensión Estudiante — Vinculación por Niveles
nombre_resuelto = payload.texto_celda[:200].strip()
estudiante = None
confianza_vinculacion = 0.0
nivel_vinculacion = 0
# Nivel 1: ID Único
if payload.codigo_estudiante:
estudiante = db.query(DimEstudiante).filter(DimEstudiante.codigo_estudiante == payload.codigo_estudiante).first()
if estudiante:
confianza_vinculacion = 1.0
nivel_vinculacion = 1
if not estudiante:
# Nivel 2 y 3: Fuzzy matching
estudiantes_existentes = db.query(DimEstudiante).all()
best_match, score = find_best_match(nombre_resuelto, estudiantes_existentes)
if best_match and score >= 0.80:
estudiante = best_match
confianza_vinculacion = score
nivel_vinculacion = 2 if payload.programa and payload.semestre else 3
if not estudiante:
estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=payload.codigo_estudiante)
db.add(estudiante)
db.flush()
# 3. Dimensión Docente — columnas reales: nombre_completo, area_especialidad
if not db.query(DimDocente).filter(DimDocente.id_docente == payload.id_docente).first():
db.add(DimDocente(
id_docente=payload.id_docente,
nombre_completo="Docente Generico",
area_especialidad="Generico"
))
# 4. Dimensión Módulo — columnas reales: nombre_modulo, nombre_institucion, programa
if not db.query(DimModulo).filter(DimModulo.id_modulo == payload.id_modulo).first():
db.add(DimModulo(
id_modulo=payload.id_modulo,
nombre_modulo=payload.modulo or "Modulo Generico",
nombre_institucion=payload.institucion or "GiraGroup",
programa=payload.programa or "General"
))
# 5. Dimensión Tiempo — columnas reales: gestion, semestre, mes
if not db.query(DimTiempo).filter(DimTiempo.id_tiempo == payload.id_tiempo).first():
db.add(DimTiempo(
id_tiempo=payload.id_tiempo,
gestion=2026,
semestre=1,
mes="Mayo"
))
# 6. Dimensión Origen Documental — tabla real: dim_origen_documental
# CHECK: tipo_documento IN ('SHEET', 'FORM', 'MOODLE', 'XLSX')
if not db.query(DimOrigenDocumental).filter(
DimOrigenDocumental.id_documento == payload.id_documento
).first():
db.add(DimOrigenDocumental(
id_documento=payload.id_documento,
tipo_documento=TIPO_DOC_VALIDO,
nombre_archivo="carga_automatica"
))
# 7. Usuario — tabla real: users (id, username, hashed_password, role)
if not db.query(Users).filter(Users.id == payload.id_usuario).first():
db.add(Users(
id=payload.id_usuario,
username=f"sistema_{payload.id_usuario}",
hashed_password="$placeholder$",
role="admin"
))
db.flush()
# 8. Alertas estratégicas
alertas_disparadas = []
if payload.nota_detectada <= 70.0:
alertas_disparadas.append("RIESGO_ACADEMICO_CRITICO")
if payload.asistencia < 70.0 or payload.incumplimiento_tareas > 30.0:
alertas_disparadas.append("RIESGO_DESERCION_ALTA")
# 9. Insertar hecho con las FK correctas
nuevo_hecho = FactRendimientoAcademico(
id_estudiante=estudiante.id_estudiante,
id_docente=payload.id_docente,
id_modulo=payload.id_modulo,
id_tiempo=payload.id_tiempo,
id_documento=payload.id_documento,
id_usuario_carga=payload.id_usuario,
nota_final=payload.nota_detectada,
asistencia_pct=payload.asistencia,
incumplimiento_actividades_pct=payload.incumplimiento_tareas,
nivel_confianza_ia=confianza_ia,
requiere_revision=forzar_revision
)
db.add(nuevo_hecho)
db.commit()
return {
"status": "processed",
"id_estudiante_asignado": estudiante.id_estudiante,
"confianza_modelo_beto": round(confianza_ia, 4),
"confianza_vinculacion": round(confianza_vinculacion, 4),
"nivel_vinculacion": nivel_vinculacion,
"requiere_auditoria_humana": forzar_revision,
"alertas_estrategicas": alertas_disparadas
}
except Exception as err:
db.rollback()
logger.error(f"Error crítico en backend 500: {err}")
raise HTTPException(status_code=500, detail=str(err))
@app.post("/api/v1/nlp/analyze")
def analyze_nlp_only(payload: ProcessSheetPayload, db: Session = Depends(get_db)):
"""
Fase 1: Solo ejecuta el modelo NLP (BETO) sobre el texto y devuelve las métricas.
NO inserta en la base de datos. Usado para el Staging area en el Frontend.
"""
entidades = ner_engine.extract_entities(payload.texto_celda)
confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) if entidades else 1.0
nombre_resuelto = payload.texto_celda[:200].strip()
# 1. Consultar log_auditoria_nlp (MLOps Memory)
log_memoria = db.query(LogAuditoriaNlp).filter(
LogAuditoriaNlp.texto_original == nombre_resuelto
).order_by(LogAuditoriaNlp.created_at.desc()).first()
candidatos_difusos = []
regla_aplicada = False
if log_memoria:
# BETO "recuerda" la decisión humana previa
nombre_resuelto = log_memoria.correccion_humana
confianza_ia = 1.0
forzar_revision = False
regla_aplicada = True
else:
# Fuzzy Matching
estudiantes_existentes = db.query(DimEstudiante).all()
best_match, score = find_best_match(nombre_resuelto, estudiantes_existentes)
# Generar Top 3 candidatos para el dropdown de resolución
from rapidfuzz import fuzz
for est in estudiantes_existentes:
s = fuzz.token_sort_ratio(nombre_resuelto.lower(), est.nombre_completo.lower()) / 100.0
if s > 0.4:
candidatos_difusos.append({"id": est.id_estudiante, "nombre": est.nombre_completo, "score": round(s, 2)})
candidatos_difusos = sorted(candidatos_difusos, key=lambda x: x["score"], reverse=True)[:3]
if score > 0.8:
confianza_ia = score
forzar_revision = confianza_ia < 0.60
alertas_disparadas = []
if payload.nota_detectada <= 70.0:
alertas_disparadas.append("RIESGO_ACADEMICO_CRITICO")
if payload.asistencia < 70.0 or payload.incumplimiento_tareas > 30.0:
alertas_disparadas.append("RIESGO_DESERCION_ALTA")
return {
"status": "analyzed",
"confianza_modelo_beto": round(confianza_ia, 4),
"requiere_auditoria_humana": forzar_revision,
"alertas_estrategicas": alertas_disparadas,
"entidades_nlp": entidades,
"candidatos_difusos": candidatos_difusos,
"regla_memoria_aplicada": regla_aplicada,
"nombre_resuelto": nombre_resuelto
}
@app.post("/api/v1/nlp/quality-check")
def nlp_quality_check(payload: ProcessSheetPayloadRaw):
"""
Evalúa la calidad del dato sin aplicar clamping (diagnóstico en lugar de corrección silenciosa).
"""
inconsistencias = []
if payload.nota_detectada > 100 or payload.nota_detectada < 0:
inconsistencias.append({"campo": "nota", "original": payload.nota_detectada, "corregido": max(0, min(100, payload.nota_detectada)), "tipo": "FUERA_RANGO"})
if payload.asistencia > 100 or payload.asistencia < 0:
inconsistencias.append({"campo": "asistencia", "original": payload.asistencia, "corregido": max(0, min(100, payload.asistencia)), "tipo": "FUERA_RANGO"})
if payload.incumplimiento_tareas > 100 or payload.incumplimiento_tareas < 0:
inconsistencias.append({"campo": "incumplimiento_tareas", "original": payload.incumplimiento_tareas, "corregido": max(0, min(100, payload.incumplimiento_tareas)), "tipo": "FUERA_RANGO"})
nombre_limpio = payload.texto_celda.strip()
if not nombre_limpio or nombre_limpio.lower() in ["sin nombre", "desconocido"]:
inconsistencias.append({"campo": "nombre", "original": payload.texto_celda, "corregido": "Estudiante (Sin Nombre)", "tipo": "NOMBRE_VACIO"})
elif len(nombre_limpio) < 3 or nombre_limpio.replace('.', '').replace(',', '').isdigit():
inconsistencias.append({"campo": "nombre", "original": payload.texto_celda, "corregido": nombre_limpio, "tipo": "NOMBRE_SOSPECHOSO"})
return {
"status": "checked",
"inconsistencias": inconsistencias,
"score_calidad": 1.0 if not inconsistencias else max(0.0, 1.0 - (len(inconsistencias) * 0.2))
}
from typing import List
@app.post("/api/v1/ingesta/bulk", status_code=status.HTTP_201_CREATED)
def procesar_lote_tabular(payloads: List[ProcessSheetPayload], db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)):
"""
Fase 3: Recibe una lista de registros (ya confirmados/editados por el usuario
en la Fase 2) y los inserta masivamente en una sola transacción.
"""
if current_user.role not in ["coordinador_academico", "admin"]:
raise HTTPException(status_code=403, detail="Acceso denegado: Se requiere rol de Coordinador Académico.")
try:
for payload in payloads:
# Re-evaluamos rápidamente para obtener la misma métrica
entidades = ner_engine.extract_entities(payload.texto_celda)
confianza_ia = sum([e["score"] for e in entidades]) / len(entidades) if entidades else 1.0
forzar_revision = confianza_ia < 0.60
# Dimensiones
nombre_resuelto = payload.texto_celda[:200].strip()
estudiante = None
confianza_vinculacion = 0.0
if payload.codigo_estudiante:
estudiante = db.query(DimEstudiante).filter(DimEstudiante.codigo_estudiante == payload.codigo_estudiante).first()
if estudiante: confianza_vinculacion = 1.0
if not estudiante:
# Fuzzy match optimization (in bulk it can be slow, but okay for MVP)
estudiantes_existentes = db.query(DimEstudiante).all()
best_match, score = find_best_match(nombre_resuelto, estudiantes_existentes)
if best_match and score >= 0.80:
estudiante = best_match
confianza_vinculacion = score
if not estudiante:
estudiante = DimEstudiante(nombre_completo=nombre_resuelto, codigo_estudiante=payload.codigo_estudiante)
db.add(estudiante)
db.flush()
if not db.query(DimDocente).filter(DimDocente.id_docente == payload.id_docente).first():
db.add(DimDocente(id_docente=payload.id_docente, nombre_completo="Docente Generico", area_especialidad="Generico"))
if not db.query(DimModulo).filter(DimModulo.id_modulo == payload.id_modulo).first():
db.add(DimModulo(id_modulo=payload.id_modulo, nombre_modulo=payload.modulo or "Modulo Generico", nombre_institucion=payload.institucion or "GiraGroup", programa=payload.programa or "General"))
if not db.query(DimTiempo).filter(DimTiempo.id_tiempo == payload.id_tiempo).first():
db.add(DimTiempo(id_tiempo=payload.id_tiempo, gestion=2026, semestre=1, mes="Mayo"))
if not db.query(DimOrigenDocumental).filter(DimOrigenDocumental.id_documento == payload.id_documento).first():
db.add(DimOrigenDocumental(id_documento=payload.id_documento, tipo_documento="SHEET", nombre_archivo="carga_automatica"))
if not db.query(Users).filter(Users.id == payload.id_usuario).first():
db.add(Users(id=payload.id_usuario, username=f"sistema_{payload.id_usuario}", hashed_password="$placeholder$", role="admin"))
db.flush() # Importante: flush para poder usar los IDs recién creados
# Hecho (La data de payload ya viene corregida por ti desde el frontend)
nuevo_hecho = FactRendimientoAcademico(
id_estudiante=estudiante.id_estudiante,
id_docente=payload.id_docente,
id_modulo=payload.id_modulo,
id_tiempo=payload.id_tiempo,
id_documento=payload.id_documento,
id_usuario_carga=payload.id_usuario,
nota_final=payload.nota_detectada,
asistencia_pct=payload.asistencia,
incumplimiento_actividades_pct=payload.incumplimiento_tareas,
nivel_confianza_ia=confianza_ia,
requiere_revision=forzar_revision
)
db.add(nuevo_hecho)
# Al final, guardamos todo junto
db.commit()
return {"status": "success", "inserted_count": len(payloads)}
except Exception as err:
db.rollback()
raise HTTPException(status_code=500, detail=str(err))
@app.get("/api/v1/riesgos/cruzado")
def obtener_riesgos_cruzados(
# Clamp explícito de los parámetros: nunca se usa el valor crudo del usuario en la query
limite_nota: float = Query(default=70.0, ge=0.0, le=100.0),
min_cuotas: int = Query(default=2, ge=1, le=20),
db: Session = Depends(get_db)
):
try:
# Hacer JOIN real con FactSituacionFinanciera (LEFT JOIN para no excluir si no hay finanzas)
resultados = db.query(
DimEstudiante,
FactRendimientoAcademico,
FactSituacionFinanciera
).join(
FactRendimientoAcademico, DimEstudiante.id_estudiante == FactRendimientoAcademico.id_estudiante
).outerjoin(
FactSituacionFinanciera, DimEstudiante.id_estudiante == FactSituacionFinanciera.id_estudiante
).filter(FactRendimientoAcademico.nota_final <= limite_nota).all()
data = []
for est, fact_aca, fact_fin in resultados:
cuotas = fact_fin.cuotas_impagas if fact_fin else min_cuotas
if cuotas < min_cuotas:
continue
deuda = float(fact_fin.monto_deuda) if fact_fin else 350.0 * cuotas
estado_cartera = fact_fin.estado_cartera if fact_fin else "MORA"
data.append({
"estudiante": est.nombre_completo,
"codigo": est.codigo_estudiante or f"EST-{est.id_estudiante:06d}",
"rendimiento": {
"nota_actual": float(fact_aca.nota_final),
"estado_academico": "CRÍTICO"
},
"finanzas": {
"cuotas_mora": cuotas,
"deuda_total": deuda,
"estado_cartera": estado_cartera
},
"nivel_riesgo_global": "ALTO - CRÍTICO"
})
return {"status": "success", "data": data}
except Exception as e:
logger.error(f"Fallo en OLAP: {e}")
# Error crudo al frontend para diagnóstico exacto de PostgreSQL
raise HTTPException(status_code=500, detail=f"Error DB: {str(e)}")
class FinancePayload(BaseModel):
id_estudiante: int
id_tiempo: int
monto_deuda: float
cuotas_impagas: int
estado_cartera: str
tipo_alerta: str
@app.post("/api/v1/ingesta/financiera", status_code=status.HTTP_201_CREATED)
def procesar_registro_financiero(payload: FinancePayload, db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)):
if current_user.role not in ["analista_datos_marketing", "admin"]:
raise HTTPException(status_code=403, detail="Acceso denegado: Se requiere rol de Analista de Datos.")
try:
nuevo_hecho = FactSituacionFinanciera(
id_estudiante=payload.id_estudiante,
id_tiempo=payload.id_tiempo,
monto_deuda=payload.monto_deuda,
cuotas_impagas=payload.cuotas_impagas,
estado_cartera=payload.estado_cartera,
tipo_alerta=payload.tipo_alerta
)
db.add(nuevo_hecho)
db.commit()
return {"status": "success", "inserted": True}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
class UnpivotFinancePayload(BaseModel):
id_estudiante: int
raw_data: Dict[str, Any]
@app.post("/api/v1/ingest/finance_unpivot", status_code=status.HTTP_201_CREATED)
def procesar_lote_financiero_unpivot(payloads: List[UnpivotFinancePayload], db: Session = Depends(get_db)):
"""
Recibe un lote de datos financieros con columnas de meses (ej. 'MONTO ENERO 2024')
y utiliza pandas para despivotarlos antes de insertarlos en FactCobranzasProyectadas.
"""
try:
from database import FactCobranzasProyectadas, DimTiempo
# 1. Convertir payloads a DataFrame
df_list = []
for p in payloads:
row = p.raw_data.copy()
row['id_estudiante'] = p.id_estudiante
df_list.append(row)
if not df_list:
return {"status": "success", "inserted": 0}
df = pd.DataFrame(df_list)
# 2. Identificar columnas de meses (Empiezan con 'MONTO ')
monto_cols = [c for c in df.columns if c.startswith('MONTO ')]
id_cols = [c for c in df.columns if c not in monto_cols]
# 3. Despivotar (Melt)
df_melted = df.melt(id_vars=id_cols, value_vars=monto_cols, var_name='mes_anio', value_name='monto_esperado')
# Filtrar nulos o ceros si no son necesarios
df_melted['monto_esperado'] = pd.to_numeric(df_melted['monto_esperado'], errors='coerce')
df_melted = df_melted.dropna(subset=['monto_esperado'])
# 4. Insertar en base de datos
inserted_count = 0
for index, row in df_melted.iterrows():
mes_raw = str(row['mes_anio']).replace('MONTO ', '').strip() # Ej 'ENERO 2024'
parts = mes_raw.split()
gestion = int(parts[1]) if len(parts) > 1 else 2024
mes_str = parts[0] if len(parts) > 0 else 'Enero'
# Buscar o crear tiempo
tiempo = db.query(DimTiempo).filter(DimTiempo.gestion == gestion, DimTiempo.mes == mes_str).first()
if not tiempo:
tiempo = DimTiempo(gestion=gestion, mes=mes_str)
db.add(tiempo)
db.commit()
db.refresh(tiempo)
nuevo_cobro = FactCobranzasProyectadas(
id_estudiante=row['id_estudiante'],
id_tiempo=tiempo.id_tiempo,
monto_esperado=row['monto_esperado'],
estado_pago='PROYECTADO'
)
db.add(nuevo_cobro)
inserted_count += 1
db.commit()
return {"status": "success", "inserted": inserted_count}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/ingest/surveys", status_code=status.HTTP_201_CREATED)
def procesar_lote_encuestas(payloads: List[Dict[str, Any]], db: Session = Depends(get_db)):
"""Ruta para encuestas (Placeholder para lógica de NLP sobre comentarios)"""
return {"status": "success", "message": "Ruta de encuestas lista para implementación"}
@app.post("/api/v1/ingest/marketing", status_code=status.HTTP_201_CREATED)
def procesar_lote_marketing(payloads: List[Dict[str, Any]], db: Session = Depends(get_db)):
"""Ruta para OKRs de Marketing (Placeholder)"""
return {"status": "success", "message": "Ruta de marketing lista para implementación"}
@app.post("/api/v1/ingesta/financiera/bulk", status_code=status.HTTP_201_CREATED)
def procesar_lote_financiero(payloads: List[FinancePayload], db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)):
if current_user.role not in ["analista_datos_marketing", "admin"]:
raise HTTPException(status_code=403, detail="Acceso denegado: Se requiere rol de Analista de Datos.")
try:
for payload in payloads:
nuevo_hecho = FactSituacionFinanciera(
id_estudiante=payload.id_estudiante,
id_tiempo=payload.id_tiempo,
monto_deuda=payload.monto_deuda,
cuotas_impagas=payload.cuotas_impagas,
estado_cartera=payload.estado_cartera,
tipo_alerta=payload.tipo_alerta
)
db.add(nuevo_hecho)
db.commit()
return {"status": "success", "inserted_count": len(payloads)}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
class MLOpsFeedbackPayload(BaseModel):
texto_erroneo: str
prediccion_beto: str
confianza_ia: float
texto_corregido: str
@app.post("/api/v1/mlops/feedback", status_code=status.HTTP_201_CREATED)
def log_mlops_feedback(payload: MLOpsFeedbackPayload, db: Session = Depends(get_db), current_user: Users = Depends(get_current_user)):
try:
nuevo_log = LogAuditoriaNlp(
texto_original=payload.texto_erroneo,
prediccion_beto=payload.prediccion_beto,
confianza_ia=payload.confianza_ia,
correccion_humana=payload.texto_corregido,
usuario_auditor=current_user.id
)
db.add(nuevo_log)
db.commit()
return {"status": "success", "message": "Feedback logged successfully"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/dashboard/kpis")
def get_dashboard_kpis(db: Session = Depends(get_db)):
try:
# Calcular KPIs desde la BD
from sqlalchemy import func
total_estudiantes = db.query(DimEstudiante).count()
total_documentos = db.query(DimOrigenDocumental).count()
# Rendimiento académico stats
stats_aca = db.query(
func.avg(FactRendimientoAcademico.nivel_confianza_ia).label('avg_conf')
).first()
avg_conf = float(stats_aca.avg_conf) if stats_aca and stats_aca.avg_conf else 0.0
# auditorias could be None or something else depending on driver, simpler approach:
auditorias = db.query(FactRendimientoAcademico).filter(FactRendimientoAcademico.requiere_revision == True).count()
total_hechos = db.query(FactRendimientoAcademico).count()
pct_auditoria = (auditorias / total_hechos) if total_hechos > 0 else 0
calidad_data_score = 0.96 # Hardcode mock if not storing raw inconsistencies in DB, but could derive from auditorias
return {
"status": "success",
"kpis": {
"calidad_datos": round(1.0 - (pct_auditoria * 0.5), 2),
"registros_unificados": total_estudiantes,
"documentos_procesados": total_documentos,
"estudiantes_relacionados": round(1.0 - (total_estudiantes / total_hechos if total_hechos > 0 else 1.0), 2),
"casos_auditoria": round(pct_auditoria, 2),
"confianza_promedio": round(avg_conf, 2),
"total_hechos": total_hechos
}
}
except Exception as e:
logger.error(f"Fallo KPI: {e}")
raise HTTPException(status_code=500, detail=str(e))