Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import uuid | |
| import datetime | |
| import html as _html | |
| from pathlib import Path | |
| # --- Preload CUDA runtime libs before importing llama_cpp --- | |
| # The cu124 llama-cpp-python wheel's libllama.so needs libcudart.so.12 / | |
| # libcublas at import time. On ZeroGPU those aren't on the default loader | |
| # path, so we dlopen the pip-provided nvidia libs (cudart first) globally. | |
| import ctypes | |
| import glob | |
| import site | |
| def _preload_cuda(): | |
| bases = set(site.getsitepackages()) | |
| try: | |
| bases.add(site.getusersitepackages()) | |
| except Exception: | |
| pass | |
| libs = [] | |
| for base in bases: | |
| libs += glob.glob(os.path.join(base, "nvidia", "*", "lib", "*.so*")) | |
| priority = {"cuda_runtime": 0, "cublas": 1} | |
| def _key(p): | |
| for name, rank in priority.items(): | |
| if name in p: | |
| return rank | |
| return 2 | |
| for so in sorted(set(libs), key=_key): | |
| try: | |
| ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL) | |
| except OSError: | |
| pass | |
| _preload_cuda() | |
| import gradio as gr | |
| import spaces | |
| from huggingface_hub import hf_hub_download | |
| from llama_cpp import Llama | |
| # ---- feedback logging (JSONL, synced to a private HF dataset) ---- | |
| # NB: huggingface_hub.CommitScheduler's background thread breaks under ZeroGPU's | |
| # process forking ("Invalid file descriptor: -1"), so we append locally and push | |
| # the file synchronously from the main process instead. | |
| from huggingface_hub import HfApi | |
| FEEDBACK_REPO = os.environ.get("FEEDBACK_REPO", "AlexWortega/my-pi-agent-feedback") | |
| _FB_DIR = Path("feedback") | |
| _FB_DIR.mkdir(exist_ok=True) | |
| _FB_FILE = _FB_DIR / f"log_{uuid.uuid4().hex}.jsonl" | |
| _FB_PATH_IN_REPO = f"data/{_FB_FILE.name}" | |
| _HF_TOKEN = os.environ.get("HF_TOKEN") | |
| _api = HfApi(token=_HF_TOKEN) if _HF_TOKEN else None | |
| print("feedback ->", FEEDBACK_REPO if _api else "(local only, no HF_TOKEN)", flush=True) | |
| def _log(record): | |
| record = { | |
| "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(), | |
| **record, | |
| } | |
| try: | |
| with _FB_FILE.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| except Exception as e: # noqa: BLE001 | |
| print("local log failed:", repr(e)[:120], flush=True) | |
| return | |
| if _api is not None: | |
| try: | |
| _api.upload_file( | |
| path_or_fileobj=str(_FB_FILE), | |
| path_in_repo=_FB_PATH_IN_REPO, | |
| repo_id=FEEDBACK_REPO, | |
| repo_type="dataset", | |
| commit_message="feedback log", | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| print("dataset upload failed:", repr(e)[:160], flush=True) | |
| # ---- model (GGUF pulled from the Hub at startup, runs on ZeroGPU) ---- | |
| GGUF_REPO = os.environ.get("GGUF_REPO", "AlexWortega/qwen35-4b-soyuz-merged-gguf") | |
| GGUF_FILE = os.environ.get("GGUF_FILE", "qwen35-4b-soyuz-merged.nomtp.Q4_K_M.gguf") | |
| N_CTX = int(os.environ.get("N_CTX", "16384")) | |
| print("Downloading GGUF from the Hub ...", flush=True) | |
| MODEL_PATH = hf_hub_download(GGUF_REPO, GGUF_FILE) | |
| print("GGUF ready at", MODEL_PATH, flush=True) | |
| _LLM = None | |
| def _get_llm(): | |
| global _LLM | |
| if _LLM is None: | |
| _LLM = Llama( | |
| model_path=MODEL_PATH, | |
| n_gpu_layers=-1, | |
| n_ctx=N_CTX, | |
| verbose=False, | |
| ) | |
| return _LLM | |
| _THINK = re.compile(r"<think>(.*?)</think>", re.DOTALL) | |
| _CODE_BLOCK = re.compile(r"```([\w+-]*)\s*\n(.*?)```", re.DOTALL) | |
| def _split(text): | |
| """Return (clean_answer, thinking). Handles an unterminated <think>.""" | |
| think_parts = _THINK.findall(text) | |
| answer = _THINK.sub("", text) | |
| if "<think>" in text and "</think>" not in text: | |
| i = text.index("<think>") | |
| think_parts.append(text[i + len("<think>"):]) | |
| answer = text[:i] | |
| thinking = "\n\n".join(p.strip() for p in think_parts).strip() | |
| return answer.strip(), thinking | |
| def _extract_doc(answer): | |
| """Assemble a single self-contained HTML document from the answer's | |
| HTML/CSS/JS code blocks, to render in the preview iframe.""" | |
| htmls, csss, jss = [], [], [] | |
| for lang, body in _CODE_BLOCK.findall(answer): | |
| l = (lang or "").lower().strip() | |
| b = body.strip() | |
| if not b: | |
| continue | |
| low = b.lower() | |
| if l in ("html", "htm") or "<!doctype" in low or "<html" in low or "<body" in low: | |
| htmls.append(b) | |
| elif l == "css": | |
| csss.append(b) | |
| elif l in ("js", "javascript"): | |
| jss.append(b) | |
| elif l == "" and "<" in b and ">" in b: | |
| htmls.append(b) | |
| doc = htmls[0] if htmls else "" | |
| if not doc and (csss or jss): | |
| doc = "<!DOCTYPE html><html><head><meta charset='utf-8'></head><body></body></html>" | |
| if not doc: | |
| return "" | |
| if "<html" not in doc.lower() and "<!doctype" not in doc.lower(): | |
| doc = ( | |
| "<!DOCTYPE html><html><head><meta charset='utf-8'></head><body>\n" | |
| + doc | |
| + "\n</body></html>" | |
| ) | |
| if csss and "<style" not in doc.lower(): | |
| style = "<style>\n" + "\n".join(csss) + "\n</style>" | |
| doc = doc.replace("</head>", style + "</head>", 1) if "</head>" in doc else style + doc | |
| if jss and "<script" not in doc.lower(): | |
| script = "<script>\n" + "\n".join(jss) + "\n</script>" | |
| doc = doc.replace("</body>", script + "</body>", 1) if "</body>" in doc else doc + script | |
| return doc | |
| _EMPTY_PREVIEW = ( | |
| "<div style='padding:1rem;color:#888;font-family:sans-serif'>" | |
| "The preview appears here once the model returns HTML/CSS/JS in a code block.</div>" | |
| ) | |
| _NO_REASONING = "(no <think> reasoning in this turn)" | |
| def _iframe(doc): | |
| if not doc or not doc.strip(): | |
| return _EMPTY_PREVIEW | |
| esc = _html.escape(doc, quote=True) | |
| return ( | |
| f'<iframe srcdoc="{esc}" sandbox="allow-scripts allow-modals allow-forms allow-popups" ' | |
| 'style="width:100%;height:540px;border:1px solid #ddd;border-radius:8px;background:white"></iframe>' | |
| ) | |
| DEFAULT_SYS = ( | |
| "You are Soyuz, a helpful coding assistant. Reason briefly inside " | |
| "<think> ... </think>, then answer. When the user asks for a web app, page, " | |
| "game or visual UI, output ONE complete, self-contained HTML file with inline " | |
| "CSS and JavaScript inside a single ```html code block (no external files or " | |
| "CDNs unless necessary). Keep the thinking short so you have room for the code." | |
| ) | |
| def _stream(message, history, system_prompt, temperature, max_tokens): | |
| llm = _get_llm() | |
| msgs = [] | |
| if system_prompt and system_prompt.strip(): | |
| msgs.append({"role": "system", "content": system_prompt.strip()}) | |
| for m in history: | |
| if m.get("role") in ("user", "assistant") and m.get("content"): | |
| msgs.append({"role": m["role"], "content": m["content"]}) | |
| msgs.append({"role": "user", "content": message}) | |
| raw = "" | |
| for chunk in llm.create_chat_completion( | |
| messages=msgs, | |
| max_tokens=int(max_tokens), | |
| temperature=float(temperature), | |
| stream=True, | |
| ): | |
| delta = chunk["choices"][0]["delta"].get("content", "") | |
| if not delta: | |
| continue | |
| raw += delta | |
| answer, thinking = _split(raw) | |
| yield (answer if answer else "β¦"), raw, thinking | |
| def respond(message, history, system_prompt, temperature, max_tokens, meta): | |
| meta = meta or [] | |
| if not message or not message.strip(): | |
| yield history or [], "", "", _NO_REASONING, "", _EMPTY_PREVIEW, meta | |
| return | |
| history = (history or []) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": ""}, | |
| ] | |
| prior = history[:-2] | |
| answer = raw = thinking = "" | |
| for answer, raw, thinking in _stream( | |
| message, prior, system_prompt, temperature, max_tokens | |
| ): | |
| history[-1]["content"] = answer | |
| code = _extract_doc(answer) | |
| # live-update chat / raw / reasoning / code; keep preview steady until done | |
| yield history, "", raw, (thinking or _NO_REASONING), code, _EMPTY_PREVIEW, meta | |
| doc = _extract_doc(answer) | |
| history[-1]["content"] = answer | |
| turn_id = uuid.uuid4().hex | |
| record = { | |
| "turn_id": turn_id, | |
| "event": "generation", | |
| "user": message, | |
| "answer": answer, | |
| "reasoning": thinking, | |
| "code": doc, | |
| "reaction": None, | |
| } | |
| meta = meta + [record] | |
| yield history, "", raw, (thinking or _NO_REASONING), doc, _iframe(doc), meta | |
| _log(record) # after yield: upload doesn't block the visible response | |
| def on_like(meta, evt: gr.LikeData): | |
| reaction = "like" if evt.liked else "dislike" | |
| turn = (evt.index // 2) if isinstance(evt.index, int) else 0 | |
| base = dict(meta[turn]) if (meta and 0 <= turn < len(meta)) else {} | |
| _log( | |
| { | |
| "turn_id": base.get("turn_id"), | |
| "event": "feedback", | |
| "reaction": reaction, | |
| "user": base.get("user"), | |
| "answer": base.get("answer"), | |
| "reasoning": base.get("reasoning"), | |
| "code": base.get("code"), | |
| } | |
| ) | |
| emoji = "π" if evt.liked else "π" | |
| return f"Saved feedback {emoji} (turn {turn + 1})" | |
| CARD = """ | |
| <div style="display:flex;gap:14px;align-items:center;flex-wrap:wrap; | |
| border:1px solid var(--border-color-primary,#444);border-radius:12px; | |
| padding:14px 16px;background:var(--block-background-fill,transparent); | |
| color:var(--body-text-color,inherit)"> | |
| <div style="font-size:34px">π¦</div> | |
| <div style="flex:1;min-width:260px"> | |
| <div style="font-size:19px;font-weight:700;color:var(--body-text-color,inherit)">My Pi Agent β Soyuz</div> | |
| <div style="color:var(--body-text-color-subdued,#9aa);font-size:13px;margin-top:2px"> | |
| <b>qwen35-4b-soyuz-merged</b> Β· Qwen3.5-4B (hybrid linear-attention) Β· | |
| GGUF <code>Q4_K_M</code> Β· runs on <b>ZeroGPU</b> via <code>llama-cpp-python</code> | |
| </div> | |
| <div style="color:var(--body-text-color-subdued,#9aa);font-size:13px;margin-top:6px"> | |
| π¬ chat Β· π§ shows the model's reasoning Β· | |
| π₯ live HTML/JS preview (Artifacts-style) Β· | |
| π text-only (<b>no image input</b>) | |
| </div> | |
| <div style="margin-top:8px;font-size:13px"> | |
| <a href="https://huggingface.co/AlexWortega/qwen35-4b-soyuz-merged" target="_blank" | |
| style="color:var(--link-text-color,#3b82f6)">base model</a> Β· | |
| <a href="https://huggingface.co/AlexWortega/qwen35-4b-soyuz-merged-gguf" target="_blank" | |
| style="color:var(--link-text-color,#3b82f6)">GGUF</a> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| with gr.Blocks(title="My Pi Agent β Soyuz", fill_height=True) as demo: | |
| gr.HTML(CARD) | |
| gr.Markdown( | |
| "Ask for a web app, page or game and the model writes a complete HTML file β " | |
| "the **π₯ Preview** tab runs it live. Tips: bump *Max tokens* for bigger apps." | |
| ) | |
| meta_state = gr.State([]) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(height=480, label="Chat (use π / π on replies)") | |
| fb_status = gr.Markdown("", elem_id="fb_status") | |
| msg = gr.Textbox( | |
| placeholder="e.g. \"build a pomodoro timer in HTML/JS\" (Enter to send)", | |
| label="Message", | |
| autofocus=True, | |
| ) | |
| with gr.Row(): | |
| send = gr.Button("Send", variant="primary") | |
| clear = gr.Button("Clear") | |
| with gr.Column(scale=3): | |
| with gr.Tab("π₯ Preview"): | |
| preview = gr.HTML(_EMPTY_PREVIEW) | |
| with gr.Tab("π§© Code"): | |
| code_box = gr.Code(label="Artifact (HTML)", language="html") | |
| with gr.Tab("π§ Reasoning"): | |
| think_box = gr.Textbox(label="What the model was thinking", lines=18) | |
| with gr.Tab("π Raw"): | |
| raw_box = gr.Textbox(label="Full raw output (with tags)", lines=18) | |
| with gr.Accordion("βοΈ Settings", open=False): | |
| system_prompt = gr.Textbox( | |
| value=DEFAULT_SYS, label="System prompt", lines=4 | |
| ) | |
| temperature = gr.Slider( | |
| 0.0, 1.5, value=0.6, step=0.05, label="Temperature" | |
| ) | |
| max_tokens = gr.Slider( | |
| 256, 8192, value=4096, step=128, label="Max tokens" | |
| ) | |
| inputs = [msg, chatbot, system_prompt, temperature, max_tokens, meta_state] | |
| outputs = [chatbot, msg, raw_box, think_box, code_box, preview, meta_state] | |
| send.click(respond, inputs, outputs) | |
| msg.submit(respond, inputs, outputs) | |
| clear.click( | |
| lambda: ([], "", "", "", "", _EMPTY_PREVIEW, []), None, outputs | |
| ) | |
| chatbot.like(on_like, [meta_state], [fb_status]) | |
| demo.queue().launch() | |