"""Ollama Cloud LLM client factory. Uses the official Ollama Python SDK pointed at Ollama Cloud. """ from __future__ import annotations import logging import time from typing import Any from ollama import Client from backend.app.core.config import settings logger = logging.getLogger(__name__) _client: Client | None = None def _extract_response_text(response: Any) -> str: message = getattr(response, "message", None) candidates = [ getattr(message, "content", None), getattr(response, "response", None), ] if hasattr(response, "model_dump"): dumped = response.model_dump() dumped_message = dumped.get("message") if isinstance(dumped_message, dict): candidates.append(dumped_message.get("content")) candidates.append(dumped.get("response")) for candidate in candidates: if isinstance(candidate, str) and candidate.strip(): return candidate # Thinking-model fallback: Qwen3 and similar models occasionally place their # answer in the ``thinking`` field when ``think=False`` conflicts with # ``format=json`` on the Ollama Cloud backend. Extracting it here avoids # unnecessary retries in that edge case. thinking = _extract_response_thinking(response) if thinking.strip(): return thinking return "" def _extract_response_thinking(response: Any) -> str: message = getattr(response, "message", None) thinking = getattr(message, "thinking", None) if isinstance(thinking, str) and thinking.strip(): return thinking if hasattr(response, "model_dump"): dumped = response.model_dump() dumped_message = dumped.get("message") if isinstance(dumped_message, dict): dumped_thinking = dumped_message.get("thinking") if isinstance(dumped_thinking, str) and dumped_thinking.strip(): return dumped_thinking return "" def _chat_with_empty_retry(client: Client, request_kwargs: dict[str, Any]) -> Any: last_response: Any | None = None for attempt in range(3): last_response = client.chat(**request_kwargs) if _extract_response_text(last_response).strip(): return last_response if attempt < 2: wait = 2 ** attempt # 1 s, then 2 s logger.info( "LLM returned an empty chat payload on attempt %s for model %s; retrying in %ss.", attempt + 1, request_kwargs["model"], wait, ) time.sleep(wait) return last_response def get_llm_client() -> Client: global _client if _client is None: headers = {} if settings.ollama_api_key: headers["Authorization"] = f"Bearer {settings.ollama_api_key}" _client = Client(host=settings.resolved_ollama_host, headers=headers, timeout=None) return _client def llm_create( messages: list[dict[str, str]], *, max_tokens: int = 1024, temperature: float = 0.2, model: str | None = None, response_format: str | dict[str, Any] | None = None, ) -> dict[str, Any]: """Call the configured LLM via Ollama Cloud. All outgoing message content is scrubbed of PII via pii_scrubber before hitting external APIs (HIPAA compliance). Returns dict with 'text', 'finish_reason', 'usage'. """ from backend.app.core.pii_scrubber import scrub_pii client = get_llm_client() model = model or settings.ollama_model scrubbed_messages = [ {**msg, "content": scrub_pii(msg.get("content", ""))} for msg in messages ] options: dict[str, Any] = {} if max_tokens: options["num_predict"] = max_tokens if temperature is not None: options["temperature"] = temperature request_kwargs = { "model": model, "messages": scrubbed_messages, "options": options, "stream": False, "think": False, } if response_format is not None: request_kwargs["format"] = response_format try: response = _chat_with_empty_retry(client, request_kwargs) except Exception as exc: error_text = str(exc).lower() if "think" not in error_text or "unknown field" not in error_text: raise request_kwargs.pop("think", None) response = _chat_with_empty_retry(client, request_kwargs) text = _extract_response_text(response) finish_reason = getattr(response, "done_reason", "stop") or "stop" thinking = _extract_response_thinking(response) usage = { "prompt_tokens": getattr(response, "prompt_eval_count", 0) or 0, "completion_tokens": getattr(response, "eval_count", 0) or 0, "total_tokens": (getattr(response, "prompt_eval_count", 0) or 0) + (getattr(response, "eval_count", 0) or 0), } if not text.strip(): logger.warning( "LLM returned empty response. Model %s, finish_reason=%s, thinking=%s - usage: %s", model, finish_reason, bool(thinking.strip()), usage, ) return { "text": text, "finish_reason": finish_reason, "usage": usage, }