'use strict'; const fetch = require('node-fetch'); const fs = require('fs'); const path = require('path'); // ── Environment ─────────────────────────────────────────────────────────────── const BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN; const GATEWAY_TOKEN = process.env.GATEWAY_TOKEN; const LLAMA_URL = 'http://127.0.0.1:8080'; const PROXY_BASE = (process.env.CLOUDFLARE_TELEGRAM_PROXY_URL || 'https://api.telegram.org').replace(/\/$/, ''); // ── Persistence ─────────────────────────────────────────────────────────────── const DATA_DIR = '/app/data'; const CONV_FILE = path.join(DATA_DIR, 'conversations.json'); const SAVE_INTERVAL = 60 * 1000; // ── Deduplication tracking ──────────────────────────────────────────────────── const processedUpdates = new Set(); // ── Token Scrubber ──────────────────────────────────────────────────────────── const TOKEN_PATTERN = /bot[0-9]+:[A-Za-z0-9_-]+/g; function scrub(str) { return String(str).replace(TOKEN_PATTERN, 'bot[REDACTED]'); } // ── Startup Validation ──────────────────────────────────────────────────────── if (!BOT_TOKEN) { console.error('❌ CRITICAL: TELEGRAM_BOT_TOKEN secret is not set!'); } if (!GATEWAY_TOKEN || GATEWAY_TOKEN === 'changeme') { console.warn('⚠️ WARNING: GATEWAY_TOKEN is not set or using default!'); } // ── In-memory conversation history ─────────────────────────────────────────── let conversations = {}; let lastActive = {}; const CONV_TTL_MS = 60 * 60 * 1000; const MAX_HIST = parseInt(process.env.MAX_HISTORY_TURNS || '8'); // ── Shared state for dashboard (consumed by health-server.js) ───────────────── // Attach to global so health-server can read without circular imports global.botState = { lastMessageAt: null, totalMessages: 0, activeChats: 0, }; // ── Boot: restore conversations from disk ──────────────────────────────────── try { if (fs.existsSync(CONV_FILE)) { const saved = JSON.parse(fs.readFileSync(CONV_FILE, 'utf8')); conversations = saved.conversations || {}; lastActive = saved.lastActive || {}; console.log(`📂 Restored ${Object.keys(conversations).length} conversations from disk`); } } catch (e) { console.warn('⚠️ Could not restore conversations from disk:', e.message); } // ── Persist conversations to disk ──────────────────────────────────────────── function saveConversations() { try { fs.mkdirSync(DATA_DIR, { recursive: true }); fs.writeFileSync(CONV_FILE, JSON.stringify({ conversations, lastActive }, null, 2)); } catch (e) { console.error('❌ Failed to save conversations:', e.message); } } setInterval(saveConversations, SAVE_INTERVAL); // ── Stale conversation cleanup ──────────────────────────────────────────────── setInterval(() => { const now = Date.now(); let cleaned = 0; for (const chatId of Object.keys(lastActive)) { if (now - lastActive[chatId] > CONV_TTL_MS) { delete conversations[chatId]; delete lastActive[chatId]; cleaned++; } } if (cleaned > 0) { console.log(`🧹 Cleaned ${cleaned} stale conversations`); saveConversations(); } global.botState.activeChats = Object.keys(conversations).length; }, 30 * 60 * 1000); // ── Core Telegram API helper ────────────────────────────────────────────────── async function telegramCall(method, body) { if (!BOT_TOKEN) throw new Error('TELEGRAM_BOT_TOKEN is not configured'); const url = `${PROXY_BASE}/bot${BOT_TOKEN}/${method}`; try { const res = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body), timeout: 15000 }); if (!res.ok) { const errText = scrub(await res.text()); throw new Error(`Telegram ${method} failed (${res.status}): ${errText}`); } return await res.json(); } catch (e) { throw new Error(scrub(e.message)); } } async function sendTelegram(chatId, text) { try { const htmlText = markdownToTelegramHtml(text); return await telegramCall('sendMessage', { chat_id: chatId, text: htmlText, parse_mode: 'HTML' }); } catch (_) { return await telegramCall('sendMessage', { chat_id: chatId, text: text.substring(0, 4096) }); } } function markdownToTelegramHtml(text) { return text .replace(/&/g, '&') .replace(//g, '>') .replace(/```[\w]*\n?([\s\S]*?)```/g, (_, code) => `
${code.trim()}
`) .replace(/`([^`\n]+)`/g, '$1') .replace(/\*\*(.+?)\*\*/g, '$1') .replace(/(?$1') .substring(0, 4096); } async function sendTyping(chatId) { return telegramCall('sendChatAction', { chat_id: chatId, action: 'typing' }); } // ── LLM Call ────────────────────────────────────────────────────────────────── async function callLLM(messages) { const res = await fetch(`${LLAMA_URL}/v1/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${GATEWAY_TOKEN || 'changeme'}` }, body: JSON.stringify({ model: 'local-model', messages, // Qwen3 thinking models need enough budget for the block AND // the final answer. 256 was too small — all tokens went to thinking // and no answer was ever written. 1024 gives ~400 tokens for reasoning // and ~600 for the reply. Override with MAX_TOKENS env var. max_tokens: parseInt(process.env.MAX_TOKENS || '1024'), temperature: parseFloat(process.env.TEMPERATURE || '0.7'), stream: false, }) }); if (!res.ok) throw new Error(`LLM error: ${res.status} ${await res.text()}`); const data = await res.json(); if (!data.choices || !data.choices[0]) throw new Error('Invalid LLM response'); let content = data.choices[0].message.content || ''; // Qwen3 always produces chain-of-thought THEN the answer. // We want only the part AFTER . // // Case 1 — thinking finished → extract everything after first if (content.includes('')) { content = content.split('').slice(1).join('').trim(); } // Case 2 — thinking was truncated (token limit hit mid-thought) else if (//i.test(content)) { console.warn('⚠️ Qwen3 thinking truncated — increase MAX_TOKENS or switch model'); content = '⏱ Hit token limit during reasoning. Try /clear then ask again, or add MAX_TOKENS=2048 in Space secrets.'; } // Safety: strip any stray tags content = content.replace(/<\/?think>/gi, '').trim(); return content || '…'; } // ── Update Handler ──────────────────────────────────────────────────────────── async function handleTelegramUpdate(update) { if (!update.update_id || processedUpdates.has(update.update_id)) return; processedUpdates.add(update.update_id); if (processedUpdates.size > 500) { const iter = processedUpdates.values(); for (let i = 0; i < 300; i++) processedUpdates.delete(iter.next().value); } try { const msg = update.message || update.edited_message; if (!msg || !msg.text) return; const chatId = msg.chat.id; const userId = msg.from ? msg.from.id : 'unknown'; const text = msg.text.trim(); const username = (msg.from && msg.from.username) || String(userId); console.log(`📨 [${username}] ${scrub(text)}`); lastActive[chatId] = Date.now(); // Update dashboard state global.botState.lastMessageAt = Date.now(); global.botState.totalMessages += 1; global.botState.activeChats = Object.keys(conversations).length; if (!conversations[chatId]) { // FIX: Added /no_think directive to the system prompt. // This is the primary signal to Qwen3 to skip chain-of-thought reasoning. // Also instructs the model to be concise, reducing output token count. conversations[chatId] = [{ role: 'system', content: process.env.SYSTEM_PROMPT || '/no_think\nYou are a helpful AI assistant. Keep replies concise and direct.' }]; } // ── Commands ───────────────────────────────────────────────────────────── if (text === '/start') { await sendTelegram(chatId, `👋 Hello! I'm your AI assistant.\n\nJust send me a message to get started. Use /help to see available commands.`); return; } if (text === '/help') { await sendTelegram(chatId, `Available commands\n\n` + `/start — Welcome message\n` + `/clear — Clear conversation history\n` + `/status — Show active turns\n` + `/help — This message` ); return; } if (text === '/clear') { conversations[chatId] = conversations[chatId].slice(0, 1); saveConversations(); await sendTelegram(chatId, '🧹 Conversation history cleared!'); return; } if (text === '/status') { const turns = Math.floor((conversations[chatId].length - 1) / 2); await sendTelegram(chatId, `📊 Active turns: ${turns}/${MAX_HIST}`); return; } // ── Normal message ──────────────────────────────────────────────────────── conversations[chatId].push({ role: 'user', content: text }); if (conversations[chatId].length > MAX_HIST * 2 + 1) { conversations[chatId] = [ conversations[chatId][0], ...conversations[chatId].slice(-(MAX_HIST * 2)) ]; } await sendTyping(chatId); const reply = await callLLM(conversations[chatId]); conversations[chatId].push({ role: 'assistant', content: reply }); await sendTelegram(chatId, reply); } catch (e) { const safeMsg = scrub(e.message); console.error('❌ Update Handler Error:', safeMsg); try { const chatId = (update.message || update.edited_message)?.chat?.id; if (chatId) await sendTelegram(chatId, `⚠️ Something went wrong. Please try again.`); } catch (_) {} } } // ── Graceful shutdown ───────────────────────────────────────────────────────── process.on('SIGTERM', () => { saveConversations(); process.exit(0); }); process.on('SIGINT', () => { saveConversations(); process.exit(0); }); module.exports = { handleTelegramUpdate, conversations };