import makeWASocket, { useMultiFileAuthState, DisconnectReason, makeCacheableSignalKeyStore, WASocket } from '@whiskeysockets/baileys'; import pino from 'pino'; import { exec, execSync } from 'child_process'; import fs from 'fs'; import path from 'path'; import { ASSISTANT_NAME, POLL_INTERVAL, STORE_DIR, DATA_DIR, TRIGGER_PATTERN, MAIN_GROUP_FOLDER, IPC_POLL_INTERVAL } from './config.js'; import { RegisteredGroup, Session, NewMessage } from './types.js'; import { initDatabase, storeMessage, storeChatMetadata, getNewMessages, getMessagesSince, getAllTasks, getTaskById, updateChatName, getAllChats, getLastGroupSync, setLastGroupSync } from './db.js'; import { startSchedulerLoop } from './task-scheduler.js'; import { runContainerAgent, writeTasksSnapshot, writeGroupsSnapshot, AvailableGroup } from './container-runner.js'; import { loadJson, saveJson } from './utils.js'; const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours const logger = pino({ level: process.env.LOG_LEVEL || 'info', transport: { target: 'pino-pretty', options: { colorize: true } } }); let sock: WASocket; let lastTimestamp = ''; let sessions: Session = {}; let registeredGroups: Record = {}; let lastAgentTimestamp: Record = {}; async function setTyping(jid: string, isTyping: boolean): Promise { try { await sock.sendPresenceUpdate(isTyping ? 'composing' : 'paused', jid); } catch (err) { logger.debug({ jid, err }, 'Failed to update typing status'); } } function loadState(): void { const statePath = path.join(DATA_DIR, 'router_state.json'); const state = loadJson<{ last_timestamp?: string; last_agent_timestamp?: Record }>(statePath, {}); lastTimestamp = state.last_timestamp || ''; lastAgentTimestamp = state.last_agent_timestamp || {}; sessions = loadJson(path.join(DATA_DIR, 'sessions.json'), {}); registeredGroups = loadJson(path.join(DATA_DIR, 'registered_groups.json'), {}); logger.info({ groupCount: Object.keys(registeredGroups).length }, 'State loaded'); } function saveState(): void { saveJson(path.join(DATA_DIR, 'router_state.json'), { last_timestamp: lastTimestamp, last_agent_timestamp: lastAgentTimestamp }); saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); } /** * Sync group metadata from WhatsApp. * Fetches all participating groups and stores their names in the database. * Called on startup, daily, and on-demand via IPC. */ async function syncGroupMetadata(force = false): Promise { // Check if we need to sync (skip if synced recently, unless forced) if (!force) { const lastSync = getLastGroupSync(); if (lastSync) { const lastSyncTime = new Date(lastSync).getTime(); const now = Date.now(); if (now - lastSyncTime < GROUP_SYNC_INTERVAL_MS) { logger.debug({ lastSync }, 'Skipping group sync - synced recently'); return; } } } try { logger.info('Syncing group metadata from WhatsApp...'); const groups = await sock.groupFetchAllParticipating(); let count = 0; for (const [jid, metadata] of Object.entries(groups)) { if (metadata.subject) { updateChatName(jid, metadata.subject); count++; } } setLastGroupSync(); logger.info({ count }, 'Group metadata synced'); } catch (err) { logger.error({ err }, 'Failed to sync group metadata'); } } /** * Get available groups list for the agent. * Returns groups ordered by most recent activity. */ function getAvailableGroups(): AvailableGroup[] { const chats = getAllChats(); const registeredJids = new Set(Object.keys(registeredGroups)); return chats .filter(c => c.jid !== '__group_sync__' && c.jid.endsWith('@g.us')) .map(c => ({ jid: c.jid, name: c.name, lastActivity: c.last_message_time, isRegistered: registeredJids.has(c.jid) })); } async function processMessage(msg: NewMessage): Promise { const group = registeredGroups[msg.chat_jid]; if (!group) return; const content = msg.content.trim(); const isMainGroup = group.folder === MAIN_GROUP_FOLDER; // Main group responds to all messages; other groups require trigger prefix if (!isMainGroup && !TRIGGER_PATTERN.test(content)) return; // Get all messages since last agent interaction so the session has full context const sinceTimestamp = lastAgentTimestamp[msg.chat_jid] || ''; const missedMessages = getMessagesSince(msg.chat_jid, sinceTimestamp); const lines = missedMessages.map(m => { const d = new Date(m.timestamp); const date = d.toLocaleDateString('en-US', { month: 'short', day: 'numeric' }); const time = d.toLocaleTimeString('en-US', { hour: '2-digit', minute: '2-digit' }); return `[${date} ${time}] ${m.sender_name}: ${m.content}`; }); const prompt = lines.join('\n'); if (!prompt) return; logger.info({ group: group.name, messageCount: missedMessages.length }, 'Processing message'); await setTyping(msg.chat_jid, true); const response = await runAgent(group, prompt, msg.chat_jid); await setTyping(msg.chat_jid, false); if (response) { lastAgentTimestamp[msg.chat_jid] = msg.timestamp; await sendMessage(msg.chat_jid, `${ASSISTANT_NAME}: ${response}`); } } async function runAgent(group: RegisteredGroup, prompt: string, chatJid: string): Promise { const isMain = group.folder === MAIN_GROUP_FOLDER; const sessionId = sessions[group.folder]; // Update tasks snapshot for container to read (filtered by group) const tasks = getAllTasks(); writeTasksSnapshot(group.folder, isMain, tasks.map(t => ({ id: t.id, groupFolder: t.group_folder, prompt: t.prompt, schedule_type: t.schedule_type, schedule_value: t.schedule_value, status: t.status, next_run: t.next_run }))); // Update available groups snapshot (main group only can see all groups) const availableGroups = getAvailableGroups(); writeGroupsSnapshot(group.folder, isMain, availableGroups, new Set(Object.keys(registeredGroups))); try { const output = await runContainerAgent(group, { prompt, sessionId, groupFolder: group.folder, chatJid, isMain }); if (output.newSessionId) { sessions[group.folder] = output.newSessionId; saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); } if (output.status === 'error') { logger.error({ group: group.name, error: output.error }, 'Container agent error'); return null; } return output.result; } catch (err) { logger.error({ group: group.name, err }, 'Agent error'); return null; } } async function sendMessage(jid: string, text: string): Promise { try { await sock.sendMessage(jid, { text }); logger.info({ jid, length: text.length }, 'Message sent'); } catch (err) { logger.error({ jid, err }, 'Failed to send message'); } } function startIpcWatcher(): void { const ipcBaseDir = path.join(DATA_DIR, 'ipc'); fs.mkdirSync(ipcBaseDir, { recursive: true }); const processIpcFiles = async () => { // Scan all group IPC directories (identity determined by directory) let groupFolders: string[]; try { groupFolders = fs.readdirSync(ipcBaseDir).filter(f => { const stat = fs.statSync(path.join(ipcBaseDir, f)); return stat.isDirectory() && f !== 'errors'; }); } catch (err) { logger.error({ err }, 'Error reading IPC base directory'); setTimeout(processIpcFiles, IPC_POLL_INTERVAL); return; } for (const sourceGroup of groupFolders) { const isMain = sourceGroup === MAIN_GROUP_FOLDER; const messagesDir = path.join(ipcBaseDir, sourceGroup, 'messages'); const tasksDir = path.join(ipcBaseDir, sourceGroup, 'tasks'); // Process messages from this group's IPC directory try { if (fs.existsSync(messagesDir)) { const messageFiles = fs.readdirSync(messagesDir).filter(f => f.endsWith('.json')); for (const file of messageFiles) { const filePath = path.join(messagesDir, file); try { const data = JSON.parse(fs.readFileSync(filePath, 'utf-8')); if (data.type === 'message' && data.chatJid && data.text) { // Authorization: verify this group can send to this chatJid const targetGroup = registeredGroups[data.chatJid]; if (isMain || (targetGroup && targetGroup.folder === sourceGroup)) { await sendMessage(data.chatJid, `${ASSISTANT_NAME}: ${data.text}`); logger.info({ chatJid: data.chatJid, sourceGroup }, 'IPC message sent'); } else { logger.warn({ chatJid: data.chatJid, sourceGroup }, 'Unauthorized IPC message attempt blocked'); } } fs.unlinkSync(filePath); } catch (err) { logger.error({ file, sourceGroup, err }, 'Error processing IPC message'); const errorDir = path.join(ipcBaseDir, 'errors'); fs.mkdirSync(errorDir, { recursive: true }); fs.renameSync(filePath, path.join(errorDir, `${sourceGroup}-${file}`)); } } } } catch (err) { logger.error({ err, sourceGroup }, 'Error reading IPC messages directory'); } // Process tasks from this group's IPC directory try { if (fs.existsSync(tasksDir)) { const taskFiles = fs.readdirSync(tasksDir).filter(f => f.endsWith('.json')); for (const file of taskFiles) { const filePath = path.join(tasksDir, file); try { const data = JSON.parse(fs.readFileSync(filePath, 'utf-8')); // Pass source group identity to processTaskIpc for authorization await processTaskIpc(data, sourceGroup, isMain); fs.unlinkSync(filePath); } catch (err) { logger.error({ file, sourceGroup, err }, 'Error processing IPC task'); const errorDir = path.join(ipcBaseDir, 'errors'); fs.mkdirSync(errorDir, { recursive: true }); fs.renameSync(filePath, path.join(errorDir, `${sourceGroup}-${file}`)); } } } } catch (err) { logger.error({ err, sourceGroup }, 'Error reading IPC tasks directory'); } } setTimeout(processIpcFiles, IPC_POLL_INTERVAL); }; processIpcFiles(); logger.info('IPC watcher started (per-group namespaces)'); } async function processTaskIpc( data: { type: string; taskId?: string; prompt?: string; schedule_type?: string; schedule_value?: string; context_mode?: string; groupFolder?: string; chatJid?: string; }, sourceGroup: string, // Verified identity from IPC directory isMain: boolean // Verified from directory path ): Promise { // Import db functions dynamically to avoid circular deps const { createTask, updateTask, deleteTask, getTaskById: getTask } = await import('./db.js'); const { CronExpressionParser } = await import('cron-parser'); switch (data.type) { case 'schedule_task': if (data.prompt && data.schedule_type && data.schedule_value && data.groupFolder) { // Authorization: non-main groups can only schedule for themselves const targetGroup = data.groupFolder; if (!isMain && targetGroup !== sourceGroup) { logger.warn({ sourceGroup, targetGroup }, 'Unauthorized schedule_task attempt blocked'); break; } // Resolve the correct JID for the target group (don't trust IPC payload) const targetJid = Object.entries(registeredGroups).find( ([, group]) => group.folder === targetGroup )?.[0]; if (!targetJid) { logger.warn({ targetGroup }, 'Cannot schedule task: target group not registered'); break; } const scheduleType = data.schedule_type as 'cron' | 'interval' | 'once'; let nextRun: string | null = null; if (scheduleType === 'cron') { try { const interval = CronExpressionParser.parse(data.schedule_value); nextRun = interval.next().toISOString(); } catch { logger.warn({ scheduleValue: data.schedule_value }, 'Invalid cron expression'); break; } } else if (scheduleType === 'interval') { const ms = parseInt(data.schedule_value, 10); if (isNaN(ms) || ms <= 0) { logger.warn({ scheduleValue: data.schedule_value }, 'Invalid interval'); break; } nextRun = new Date(Date.now() + ms).toISOString(); } else if (scheduleType === 'once') { const scheduled = new Date(data.schedule_value); if (isNaN(scheduled.getTime())) { logger.warn({ scheduleValue: data.schedule_value }, 'Invalid timestamp'); break; } nextRun = scheduled.toISOString(); } const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; const contextMode = (data.context_mode === 'group' || data.context_mode === 'isolated') ? data.context_mode : 'isolated'; createTask({ id: taskId, group_folder: targetGroup, chat_jid: targetJid, prompt: data.prompt, schedule_type: scheduleType, schedule_value: data.schedule_value, context_mode: contextMode, next_run: nextRun, status: 'active', created_at: new Date().toISOString() }); logger.info({ taskId, sourceGroup, targetGroup, contextMode }, 'Task created via IPC'); } break; case 'pause_task': if (data.taskId) { const task = getTask(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { updateTask(data.taskId, { status: 'paused' }); logger.info({ taskId: data.taskId, sourceGroup }, 'Task paused via IPC'); } else { logger.warn({ taskId: data.taskId, sourceGroup }, 'Unauthorized task pause attempt'); } } break; case 'resume_task': if (data.taskId) { const task = getTask(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { updateTask(data.taskId, { status: 'active' }); logger.info({ taskId: data.taskId, sourceGroup }, 'Task resumed via IPC'); } else { logger.warn({ taskId: data.taskId, sourceGroup }, 'Unauthorized task resume attempt'); } } break; case 'cancel_task': if (data.taskId) { const task = getTask(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { deleteTask(data.taskId); logger.info({ taskId: data.taskId, sourceGroup }, 'Task cancelled via IPC'); } else { logger.warn({ taskId: data.taskId, sourceGroup }, 'Unauthorized task cancel attempt'); } } break; case 'refresh_groups': // Only main group can request a refresh if (isMain) { logger.info({ sourceGroup }, 'Group metadata refresh requested via IPC'); await syncGroupMetadata(true); // Write updated snapshot immediately const availableGroups = getAvailableGroups(); const { writeGroupsSnapshot: writeGroups } = await import('./container-runner.js'); writeGroups(sourceGroup, true, availableGroups, new Set(Object.keys(registeredGroups))); } else { logger.warn({ sourceGroup }, 'Unauthorized refresh_groups attempt blocked'); } break; default: logger.warn({ type: data.type }, 'Unknown IPC task type'); } } async function connectWhatsApp(): Promise { const authDir = path.join(STORE_DIR, 'auth'); fs.mkdirSync(authDir, { recursive: true }); const { state, saveCreds } = await useMultiFileAuthState(authDir); sock = makeWASocket({ auth: { creds: state.creds, keys: makeCacheableSignalKeyStore(state.keys, logger) }, printQRInTerminal: false, logger, browser: ['NanoClaw', 'Chrome', '1.0.0'] }); sock.ev.on('connection.update', (update) => { const { connection, lastDisconnect, qr } = update; if (qr) { const msg = 'WhatsApp authentication required. Run /setup in Claude Code.'; logger.error(msg); exec(`osascript -e 'display notification "${msg}" with title "NanoClaw" sound name "Basso"'`); setTimeout(() => process.exit(1), 1000); } if (connection === 'close') { const reason = (lastDisconnect?.error as any)?.output?.statusCode; const shouldReconnect = reason !== DisconnectReason.loggedOut; logger.info({ reason, shouldReconnect }, 'Connection closed'); if (shouldReconnect) { logger.info('Reconnecting...'); connectWhatsApp(); } else { logger.info('Logged out. Run /setup to re-authenticate.'); process.exit(0); } } else if (connection === 'open') { logger.info('Connected to WhatsApp'); // Sync group metadata on startup (respects 24h cache) syncGroupMetadata().catch(err => logger.error({ err }, 'Initial group sync failed')); // Set up daily sync timer setInterval(() => { syncGroupMetadata().catch(err => logger.error({ err }, 'Periodic group sync failed')); }, GROUP_SYNC_INTERVAL_MS); startSchedulerLoop({ sendMessage, registeredGroups: () => registeredGroups, getSessions: () => sessions }); startIpcWatcher(); startMessageLoop(); } }); sock.ev.on('creds.update', saveCreds); sock.ev.on('messages.upsert', ({ messages }) => { for (const msg of messages) { if (!msg.message) continue; const chatJid = msg.key.remoteJid; if (!chatJid || chatJid === 'status@broadcast') continue; const timestamp = new Date(Number(msg.messageTimestamp) * 1000).toISOString(); // Always store chat metadata for group discovery storeChatMetadata(chatJid, timestamp); // Only store full message content for registered groups if (registeredGroups[chatJid]) { storeMessage(msg, chatJid, msg.key.fromMe || false, msg.pushName || undefined); } } }); } async function startMessageLoop(): Promise { logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`); while (true) { try { const jids = Object.keys(registeredGroups); const { messages } = getNewMessages(jids, lastTimestamp); if (messages.length > 0) logger.info({ count: messages.length }, 'New messages'); for (const msg of messages) { try { await processMessage(msg); } catch (err) { logger.error({ err, msg: msg.id }, 'Error processing message'); } // Advance timestamp after each message to avoid reprocessing on retry lastTimestamp = msg.timestamp; saveState(); } } catch (err) { logger.error({ err }, 'Error in message loop'); } await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL)); } } function ensureContainerSystemRunning(): void { try { execSync('container system status', { stdio: 'pipe' }); logger.debug('Apple Container system already running'); } catch { logger.info('Starting Apple Container system...'); try { execSync('container system start', { stdio: 'pipe', timeout: 30000 }); logger.info('Apple Container system started'); } catch (err) { logger.error({ err }, 'Failed to start Apple Container system - agents will not work'); } } } async function main(): Promise { ensureContainerSystemRunning(); initDatabase(); logger.info('Database initialized'); loadState(); await connectWhatsApp(); } main().catch(err => { logger.error({ err }, 'Failed to start NanoClaw'); process.exit(1); });