import { ChildProcess, exec } from 'child_process'; import { MAX_CONCURRENT_CONTAINERS } from './config.js'; import { logger } from './logger.js'; interface QueuedTask { id: string; groupJid: string; fn: () => Promise; } const MAX_RETRIES = 5; const BASE_RETRY_MS = 5000; interface GroupState { active: boolean; pendingMessages: boolean; pendingTasks: QueuedTask[]; process: ChildProcess | null; containerName: string | null; retryCount: number; } export class GroupQueue { private groups = new Map(); private activeCount = 0; private waitingGroups: string[] = []; private processMessagesFn: ((groupJid: string) => Promise) | null = null; private shuttingDown = false; private getGroup(groupJid: string): GroupState { let state = this.groups.get(groupJid); if (!state) { state = { active: false, pendingMessages: false, pendingTasks: [], process: null, containerName: null, retryCount: 0, }; this.groups.set(groupJid, state); } return state; } setProcessMessagesFn(fn: (groupJid: string) => Promise): void { this.processMessagesFn = fn; } enqueueMessageCheck(groupJid: string): void { if (this.shuttingDown) return; const state = this.getGroup(groupJid); if (state.active) { state.pendingMessages = true; logger.debug({ groupJid }, 'Container active, message queued'); return; } if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) { state.pendingMessages = true; if (!this.waitingGroups.includes(groupJid)) { this.waitingGroups.push(groupJid); } logger.debug( { groupJid, activeCount: this.activeCount }, 'At concurrency limit, message queued', ); return; } this.runForGroup(groupJid, 'messages'); } enqueueTask(groupJid: string, taskId: string, fn: () => Promise): void { if (this.shuttingDown) return; const state = this.getGroup(groupJid); // Prevent double-queuing of the same task if (state.pendingTasks.some((t) => t.id === taskId)) { logger.debug({ groupJid, taskId }, 'Task already queued, skipping'); return; } if (state.active) { state.pendingTasks.push({ id: taskId, groupJid, fn }); logger.debug({ groupJid, taskId }, 'Container active, task queued'); return; } if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) { state.pendingTasks.push({ id: taskId, groupJid, fn }); if (!this.waitingGroups.includes(groupJid)) { this.waitingGroups.push(groupJid); } logger.debug( { groupJid, taskId, activeCount: this.activeCount }, 'At concurrency limit, task queued', ); return; } // Run immediately this.runTask(groupJid, { id: taskId, groupJid, fn }); } registerProcess(groupJid: string, proc: ChildProcess, containerName: string): void { const state = this.getGroup(groupJid); state.process = proc; state.containerName = containerName; } private async runForGroup( groupJid: string, reason: 'messages' | 'drain', ): Promise { const state = this.getGroup(groupJid); state.active = true; state.pendingMessages = false; this.activeCount++; logger.debug( { groupJid, reason, activeCount: this.activeCount }, 'Starting container for group', ); try { if (this.processMessagesFn) { const success = await this.processMessagesFn(groupJid); if (success) { state.retryCount = 0; } else { this.scheduleRetry(groupJid, state); } } } catch (err) { logger.error({ groupJid, err }, 'Error processing messages for group'); this.scheduleRetry(groupJid, state); } finally { state.active = false; state.process = null; state.containerName = null; this.activeCount--; this.drainGroup(groupJid); } } private async runTask(groupJid: string, task: QueuedTask): Promise { const state = this.getGroup(groupJid); state.active = true; this.activeCount++; logger.debug( { groupJid, taskId: task.id, activeCount: this.activeCount }, 'Running queued task', ); try { await task.fn(); } catch (err) { logger.error({ groupJid, taskId: task.id, err }, 'Error running task'); } finally { state.active = false; state.process = null; state.containerName = null; this.activeCount--; this.drainGroup(groupJid); } } private scheduleRetry(groupJid: string, state: GroupState): void { state.retryCount++; if (state.retryCount > MAX_RETRIES) { logger.error( { groupJid, retryCount: state.retryCount }, 'Max retries exceeded, dropping messages (will retry on next incoming message)', ); state.retryCount = 0; return; } const delayMs = BASE_RETRY_MS * Math.pow(2, state.retryCount - 1); logger.info( { groupJid, retryCount: state.retryCount, delayMs }, 'Scheduling retry with backoff', ); setTimeout(() => { if (!this.shuttingDown) { this.enqueueMessageCheck(groupJid); } }, delayMs); } private drainGroup(groupJid: string): void { if (this.shuttingDown) return; const state = this.getGroup(groupJid); // Tasks first (they won't be re-discovered from SQLite like messages) if (state.pendingTasks.length > 0) { const task = state.pendingTasks.shift()!; this.runTask(groupJid, task); return; } // Then pending messages if (state.pendingMessages) { this.runForGroup(groupJid, 'drain'); return; } // Nothing pending for this group; check if other groups are waiting for a slot this.drainWaiting(); } private drainWaiting(): void { while ( this.waitingGroups.length > 0 && this.activeCount < MAX_CONCURRENT_CONTAINERS ) { const nextJid = this.waitingGroups.shift()!; const state = this.getGroup(nextJid); // Prioritize tasks over messages if (state.pendingTasks.length > 0) { const task = state.pendingTasks.shift()!; this.runTask(nextJid, task); } else if (state.pendingMessages) { this.runForGroup(nextJid, 'drain'); } // If neither pending, skip this group } } async shutdown(gracePeriodMs: number): Promise { this.shuttingDown = true; logger.info( { activeCount: this.activeCount, gracePeriodMs }, 'GroupQueue shutting down', ); // Collect all active processes const activeProcs: Array<{ jid: string; proc: ChildProcess; containerName: string | null }> = []; for (const [jid, state] of this.groups) { if (state.process && !state.process.killed) { activeProcs.push({ jid, proc: state.process, containerName: state.containerName }); } } if (activeProcs.length === 0) return; // Stop all active containers gracefully for (const { jid, proc, containerName } of activeProcs) { if (containerName) { // Defense-in-depth: re-sanitize before shell interpolation. // Primary sanitization is in container-runner.ts when building the name, // but we sanitize again here since exec() runs through a shell. const safeName = containerName.replace(/[^a-zA-Z0-9-]/g, ''); logger.info({ jid, containerName: safeName }, 'Stopping container'); exec(`container stop ${safeName}`, (err) => { if (err) { logger.warn({ jid, containerName: safeName, err: err.message }, 'container stop failed'); } }); } else { logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to process'); proc.kill('SIGTERM'); } } // Wait for grace period await new Promise((resolve) => { const checkInterval = setInterval(() => { const alive = activeProcs.filter( ({ proc }) => !proc.killed && proc.exitCode === null, ); if (alive.length === 0) { clearInterval(checkInterval); resolve(); } }, 500); setTimeout(() => { clearInterval(checkInterval); // SIGKILL survivors for (const { jid, proc } of activeProcs) { if (!proc.killed && proc.exitCode === null) { logger.warn({ jid, pid: proc.pid }, 'Sending SIGKILL to container'); proc.kill('SIGKILL'); } } resolve(); }, gracePeriodMs); }); } }