"""Portfolio management engine for biotech multi-project operations.""" from __future__ import annotations from datetime import date, datetime, timedelta, timezone from typing import Any, Dict, List, Optional import csv import io import json import sqlite3 import uuid class PortfolioEngine: """Persist portfolio, project, and milestone entities for OmniBiMol.""" def __init__(self, db_path: str = "omnibimol_portfolio.db") -> None: self.db_path = db_path self.init_db() def init_db(self) -> None: """Create the portfolio schema used by the Streamlit app.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS portfolios ( id TEXT PRIMARY KEY, name TEXT NOT NULL, owner TEXT, description TEXT, reviewer TEXT, comments TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, portfolio_id TEXT NOT NULL, name TEXT NOT NULL, indication TEXT, modality TEXT, stage TEXT, owner TEXT, reviewer TEXT, status TEXT, comments TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(portfolio_id) REFERENCES portfolios(id) ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS milestones ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, title TEXT NOT NULL, type TEXT NOT NULL, due_date TEXT, owner TEXT, reviewer TEXT, status TEXT NOT NULL, criteria_json TEXT NOT NULL, comments TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(project_id) REFERENCES projects(id) ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS activity_log ( id TEXT PRIMARY KEY, entity_type TEXT NOT NULL, entity_id TEXT NOT NULL, event_type TEXT NOT NULL, payload_json TEXT NOT NULL, timestamp TEXT NOT NULL, actor TEXT ) """ ) cursor.execute("CREATE INDEX IF NOT EXISTS idx_project_portfolio ON projects(portfolio_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_milestone_project ON milestones(project_id)") conn.commit() def create_portfolio( self, name: str, owner: str = "", description: str = "", reviewer: str = "", comments: str = "", ) -> Dict[str, Any]: ts = self._now() row = { "id": self._id("pf"), "name": name.strip(), "owner": owner.strip(), "description": description.strip(), "reviewer": reviewer.strip(), "comments": comments.strip(), "created_at": ts, "updated_at": ts, } with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO portfolios (id, name, owner, description, reviewer, comments, created_at, updated_at) VALUES (:id, :name, :owner, :description, :reviewer, :comments, :created_at, :updated_at) """, row, ) conn.commit() self._log_activity("portfolio", str(row["id"]), "created", row, owner) return row def list_portfolios(self) -> List[Dict[str, Any]]: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row rows = conn.execute("SELECT * FROM portfolios ORDER BY updated_at DESC, name ASC").fetchall() return [dict(r) for r in rows] def create_project( self, portfolio_id: str, name: str, indication: str = "", modality: str = "", stage: str = "discovery", owner: str = "", status: str = "active", reviewer: str = "", comments: str = "", ) -> Dict[str, Any]: ts = self._now() row = { "id": self._id("prj"), "portfolio_id": portfolio_id, "name": name.strip(), "indication": indication.strip(), "modality": modality.strip(), "stage": stage.strip(), "owner": owner.strip(), "reviewer": reviewer.strip(), "status": status.strip(), "comments": comments.strip(), "created_at": ts, "updated_at": ts, } with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO projects ( id, portfolio_id, name, indication, modality, stage, owner, reviewer, status, comments, created_at, updated_at ) VALUES ( :id, :portfolio_id, :name, :indication, :modality, :stage, :owner, :reviewer, :status, :comments, :created_at, :updated_at ) """, row, ) conn.commit() self._log_activity("project", str(row["id"]), "created", row, owner) return row def list_projects(self, portfolio_id: str) -> List[Dict[str, Any]]: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT * FROM projects WHERE portfolio_id = ? ORDER BY updated_at DESC, name ASC", (portfolio_id,), ).fetchall() return [dict(r) for r in rows] def create_milestone( self, project_id: str, title: str, milestone_type: str, due_date: str, owner: str = "", status: str = "not started", criteria: Optional[Dict[str, Any]] = None, reviewer: str = "", comments: str = "", ) -> Dict[str, Any]: ts = self._now() row = { "id": self._id("ms"), "project_id": project_id, "title": title.strip(), "type": milestone_type.strip(), "due_date": due_date, "owner": owner.strip(), "reviewer": reviewer.strip(), "status": status.strip().lower(), "criteria_json": json.dumps(criteria or {}, sort_keys=True), "comments": comments.strip(), "created_at": ts, "updated_at": ts, } with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO milestones ( id, project_id, title, type, due_date, owner, reviewer, status, criteria_json, comments, created_at, updated_at ) VALUES ( :id, :project_id, :title, :type, :due_date, :owner, :reviewer, :status, :criteria_json, :comments, :created_at, :updated_at ) """, row, ) conn.commit() self._log_activity("milestone", str(row["id"]), "created", row, owner) return row def update_milestone_status( self, milestone_id: str, status: str, actor: str = "" ) -> Dict[str, Any]: ts = self._now() with sqlite3.connect(self.db_path) as conn: conn.execute( "UPDATE milestones SET status = ?, updated_at = ? WHERE id = ?", (status.strip().lower(), ts, milestone_id), ) conn.commit() conn.row_factory = sqlite3.Row row = conn.execute("SELECT * FROM milestones WHERE id = ?", (milestone_id,)).fetchone() payload = dict(row) if row else {} self._log_activity("milestone", milestone_id, "status_updated", {"status": status}, actor) return payload def get_project_dashboard_data(self, project_id: str) -> Dict[str, Any]: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row project_row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone() milestone_rows = conn.execute( "SELECT * FROM milestones WHERE project_id = ? ORDER BY due_date ASC, created_at ASC", (project_id,), ).fetchall() project = dict(project_row) if project_row else {} milestones = [self._parse_milestone(dict(r)) for r in milestone_rows] return { "project": project, "milestones": milestones, "milestone_metrics": self._milestone_metrics(milestones), "disclaimer": "Research portfolio workflow support only. Not for clinical or patient-care decisions.", } def export_project_packet(self, project_id: str, format: str = "json") -> Dict[str, Any]: dashboard = self.get_project_dashboard_data(project_id) payload = { "project": dashboard["project"], "milestone_metrics": dashboard["milestone_metrics"], "milestones": dashboard["milestones"], "generated_at": self._now(), "disclaimer": dashboard["disclaimer"], } fmt = format.lower() if fmt == "json": return { "format": "json", "content": json.dumps(payload, indent=2, sort_keys=False), "schema_valid": True, } if fmt == "csv": return {"format": "csv", "content": self._build_csv_packet(payload), "schema_valid": True} if fmt == "md": return {"format": "md", "content": self._build_markdown_packet(payload), "schema_valid": True} raise ValueError("Unsupported format. Use one of: json, csv, md") def get_stage_distribution(self, portfolio_id: str) -> List[Dict[str, Any]]: projects = self.list_projects(portfolio_id) counter: Dict[str, int] = {} for project in projects: stage = project.get("stage") or "unknown" counter[stage] = counter.get(stage, 0) + 1 return [{"stage": stage, "count": count} for stage, count in sorted(counter.items())] def optimize_portfolio(self, portfolio_id: str, budget: float = 0.0) -> Dict[str, Any]: projects = self.list_projects(portfolio_id) rows: List[Dict[str, Any]] = [] for project in projects: dashboard = self.get_project_dashboard_data(project["id"]) metrics = dashboard["milestone_metrics"] blocked_penalty = float(metrics["blocker_count"]) * 15.0 overdue_penalty = float(metrics["overdue_count"]) * 10.0 score = max(0.0, float(metrics["completion_pct"]) - blocked_penalty - overdue_penalty) rows.append( { "project_id": project["id"], "project_name": project.get("name", ""), "stage": project.get("stage", ""), "status": project.get("status", ""), "workflow_score": round(score, 2), } ) rows.sort(key=lambda item: (-item["workflow_score"], item["project_name"].lower())) return { "portfolio_id": portfolio_id, "budget": round(float(budget), 4), "ranked_projects": rows, "selected_projects": rows, } def list_recent_activity(self, limit: int = 40) -> List[Dict[str, Any]]: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT * FROM activity_log ORDER BY timestamp DESC LIMIT ?", (int(limit),), ).fetchall() return [dict(r) for r in rows] def _build_csv_packet(self, payload: Dict[str, Any]) -> Dict[str, str]: return { "project": self._rows_to_csv([payload["project"]]), "milestones": self._rows_to_csv(payload["milestones"]), "milestone_metrics": self._rows_to_csv([payload["milestone_metrics"]]), } def _build_markdown_packet(self, payload: Dict[str, Any]) -> str: project = payload["project"] metrics = payload["milestone_metrics"] lines = [ f"# Project Packet: {project.get('name', 'Unknown')}", "", f"- Project ID: `{project.get('id', '')}`", f"- Stage: `{project.get('stage', '')}`", f"- Status: `{project.get('status', '')}`", f"- Owner: `{project.get('owner', '')}`", f"- Generated: `{payload.get('generated_at', '')}`", "", "## Milestone Metrics", f"- Completion: `{metrics['completion_pct']}%`", f"- Overdue: `{metrics['overdue_count']}`", f"- Blocked: `{metrics['blocker_count']}`", f"- Upcoming (30d): `{metrics['upcoming_30d']}`", "", "## Milestones", ] for idx, row in enumerate(payload["milestones"], start=1): lines.append( f"{idx}. `{row.get('title', '')}` status={row.get('status', '')} due={row.get('due_date', '')}" ) lines.extend(["", f"> {payload['disclaimer']}"]) return "\n".join(lines) def _rows_to_csv(self, rows: List[Dict[str, Any]]) -> str: if not rows: return "" fieldnames = sorted({key for row in rows for key in row.keys()}) stream = io.StringIO() writer = csv.DictWriter(stream, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow({key: self._csv_scalar(row.get(key)) for key in fieldnames}) return stream.getvalue() def _milestone_metrics(self, milestones: List[Dict[str, Any]]) -> Dict[str, Any]: total = len(milestones) complete = sum(1 for m in milestones if (m.get("status") or "").lower() == "complete") blocked = sum(1 for m in milestones if (m.get("status") or "").lower() == "blocked") today = datetime.now(timezone.utc).date() upcoming_limit = today + timedelta(days=30) overdue = 0 upcoming = 0 for milestone in milestones: due_date = self._try_date(milestone.get("due_date")) if not due_date: continue if due_date < today and (milestone.get("status") or "").lower() != "complete": overdue += 1 if today <= due_date <= upcoming_limit: upcoming += 1 completion_pct = (complete / total * 100.0) if total else 0.0 return { "total_count": total, "completion_pct": round(completion_pct, 2), "overdue_count": overdue, "blocker_count": blocked, "upcoming_30d": upcoming, } def _parse_milestone(self, row: Dict[str, Any]) -> Dict[str, Any]: row["criteria"] = json.loads(row.get("criteria_json", "{}")) return row def _log_activity( self, entity_type: str, entity_id: str, event_type: str, payload: Dict[str, Any], actor: str, ) -> None: row = { "id": self._id("log"), "entity_type": entity_type, "entity_id": entity_id, "event_type": event_type, "payload_json": json.dumps(payload, sort_keys=True, default=str), "timestamp": self._now(), "actor": actor, } with sqlite3.connect(self.db_path) as conn: conn.execute( """ INSERT INTO activity_log (id, entity_type, entity_id, event_type, payload_json, timestamp, actor) VALUES (:id, :entity_type, :entity_id, :event_type, :payload_json, :timestamp, :actor) """, row, ) conn.commit() def _csv_scalar(self, value: Any) -> str: if isinstance(value, (dict, list)): return json.dumps(value, sort_keys=True) return "" if value is None else str(value) def _try_date(self, value: Any) -> Optional[date]: if not value: return None for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(str(value)[:19], fmt).date() except ValueError: continue return None def _now(self) -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def _id(self, prefix: str) -> str: return f"{prefix}_{uuid.uuid4().hex[:12]}"