Files
nanoclaw/src/group-queue.ts
gavrielc ae177156ec feat: per-group queue, SQLite state, graceful shutdown (#111)
* fix: wire up queue processMessagesFn before recovery to prevent silent message loss

recoverPendingMessages() was called after startMessageLoop(), which meant:
1. Recovery could race with the message loop's first iteration
2. processMessagesFn was set inside startMessageLoop, so recovery
   enqueues would fire runForGroup with processMessagesFn still null,
   silently skipping message processing

Move setProcessMessagesFn and recoverPendingMessages before startMessageLoop
so the queue is fully wired before any messages are enqueued.

https://claude.ai/code/session_01PCY8zNjDa2N29jvBAV5vfL

* feat: structured agent output to fix infinite retry on silent responses (#113)

Use Agent SDK's outputFormat with json_schema to get typed responses
from the agent. The agent now returns { status: 'responded' | 'silent',
userMessage?, internalLog? } instead of a plain string. This fixes a
critical bug where a null/empty agent response caused infinite 5-second
retry loops by conflating "nothing to say" with "error".

- Agent runner: add AGENT_RESPONSE_SCHEMA and parse structured_output
- Host: advance lastAgentTimestamp on both responded AND silent status
- GroupQueue: add exponential backoff (5s-80s) with max 5 retries for
  actual errors, replacing unbounded fixed-interval retries

https://claude.ai/code/session_014SLc8MxP9BYhEhDCLox9U8

Co-authored-by: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2026-02-06 18:54:26 +02:00

293 lines
8.0 KiB
TypeScript

import { ChildProcess, exec } from 'child_process';
import { MAX_CONCURRENT_CONTAINERS } from './config.js';
import { logger } from './logger.js';
interface QueuedTask {
id: string;
groupJid: string;
fn: () => Promise<void>;
}
const MAX_RETRIES = 5;
const BASE_RETRY_MS = 5000;
interface GroupState {
active: boolean;
pendingMessages: boolean;
pendingTasks: QueuedTask[];
process: ChildProcess | null;
containerName: string | null;
retryCount: number;
}
export class GroupQueue {
private groups = new Map<string, GroupState>();
private activeCount = 0;
private waitingGroups: string[] = [];
private processMessagesFn: ((groupJid: string) => Promise<boolean>) | null =
null;
private shuttingDown = false;
private getGroup(groupJid: string): GroupState {
let state = this.groups.get(groupJid);
if (!state) {
state = {
active: false,
pendingMessages: false,
pendingTasks: [],
process: null,
containerName: null,
retryCount: 0,
};
this.groups.set(groupJid, state);
}
return state;
}
setProcessMessagesFn(fn: (groupJid: string) => Promise<boolean>): void {
this.processMessagesFn = fn;
}
enqueueMessageCheck(groupJid: string): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
if (state.active) {
state.pendingMessages = true;
logger.debug({ groupJid }, 'Container active, message queued');
return;
}
if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) {
state.pendingMessages = true;
if (!this.waitingGroups.includes(groupJid)) {
this.waitingGroups.push(groupJid);
}
logger.debug(
{ groupJid, activeCount: this.activeCount },
'At concurrency limit, message queued',
);
return;
}
this.runForGroup(groupJid, 'messages');
}
enqueueTask(groupJid: string, taskId: string, fn: () => Promise<void>): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
// Prevent double-queuing of the same task
if (state.pendingTasks.some((t) => t.id === taskId)) {
logger.debug({ groupJid, taskId }, 'Task already queued, skipping');
return;
}
if (state.active) {
state.pendingTasks.push({ id: taskId, groupJid, fn });
logger.debug({ groupJid, taskId }, 'Container active, task queued');
return;
}
if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) {
state.pendingTasks.push({ id: taskId, groupJid, fn });
if (!this.waitingGroups.includes(groupJid)) {
this.waitingGroups.push(groupJid);
}
logger.debug(
{ groupJid, taskId, activeCount: this.activeCount },
'At concurrency limit, task queued',
);
return;
}
// Run immediately
this.runTask(groupJid, { id: taskId, groupJid, fn });
}
registerProcess(groupJid: string, proc: ChildProcess, containerName: string): void {
const state = this.getGroup(groupJid);
state.process = proc;
state.containerName = containerName;
}
private async runForGroup(
groupJid: string,
reason: 'messages' | 'drain',
): Promise<void> {
const state = this.getGroup(groupJid);
state.active = true;
state.pendingMessages = false;
this.activeCount++;
logger.debug(
{ groupJid, reason, activeCount: this.activeCount },
'Starting container for group',
);
try {
if (this.processMessagesFn) {
const success = await this.processMessagesFn(groupJid);
if (success) {
state.retryCount = 0;
} else {
this.scheduleRetry(groupJid, state);
}
}
} catch (err) {
logger.error({ groupJid, err }, 'Error processing messages for group');
this.scheduleRetry(groupJid, state);
} finally {
state.active = false;
state.process = null;
state.containerName = null;
this.activeCount--;
this.drainGroup(groupJid);
}
}
private async runTask(groupJid: string, task: QueuedTask): Promise<void> {
const state = this.getGroup(groupJid);
state.active = true;
this.activeCount++;
logger.debug(
{ groupJid, taskId: task.id, activeCount: this.activeCount },
'Running queued task',
);
try {
await task.fn();
} catch (err) {
logger.error({ groupJid, taskId: task.id, err }, 'Error running task');
} finally {
state.active = false;
state.process = null;
state.containerName = null;
this.activeCount--;
this.drainGroup(groupJid);
}
}
private scheduleRetry(groupJid: string, state: GroupState): void {
state.retryCount++;
if (state.retryCount > MAX_RETRIES) {
logger.error(
{ groupJid, retryCount: state.retryCount },
'Max retries exceeded, dropping messages (will retry on next incoming message)',
);
state.retryCount = 0;
return;
}
const delayMs = BASE_RETRY_MS * Math.pow(2, state.retryCount - 1);
logger.info(
{ groupJid, retryCount: state.retryCount, delayMs },
'Scheduling retry with backoff',
);
setTimeout(() => {
if (!this.shuttingDown) {
this.enqueueMessageCheck(groupJid);
}
}, delayMs);
}
private drainGroup(groupJid: string): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
// Tasks first (they won't be re-discovered from SQLite like messages)
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(groupJid, task);
return;
}
// Then pending messages
if (state.pendingMessages) {
this.runForGroup(groupJid, 'drain');
return;
}
// Nothing pending for this group; check if other groups are waiting for a slot
this.drainWaiting();
}
private drainWaiting(): void {
while (
this.waitingGroups.length > 0 &&
this.activeCount < MAX_CONCURRENT_CONTAINERS
) {
const nextJid = this.waitingGroups.shift()!;
const state = this.getGroup(nextJid);
// Prioritize tasks over messages
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(nextJid, task);
} else if (state.pendingMessages) {
this.runForGroup(nextJid, 'drain');
}
// If neither pending, skip this group
}
}
async shutdown(gracePeriodMs: number): Promise<void> {
this.shuttingDown = true;
logger.info(
{ activeCount: this.activeCount, gracePeriodMs },
'GroupQueue shutting down',
);
// Collect all active processes
const activeProcs: Array<{ jid: string; proc: ChildProcess; containerName: string | null }> = [];
for (const [jid, state] of this.groups) {
if (state.process && !state.process.killed) {
activeProcs.push({ jid, proc: state.process, containerName: state.containerName });
}
}
if (activeProcs.length === 0) return;
// Stop all active containers gracefully
for (const { jid, proc, containerName } of activeProcs) {
if (containerName) {
logger.info({ jid, containerName }, 'Stopping container');
exec(`container stop ${containerName}`);
} else {
logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to process');
proc.kill('SIGTERM');
}
}
// Wait for grace period
await new Promise<void>((resolve) => {
const checkInterval = setInterval(() => {
const alive = activeProcs.filter(
({ proc }) => !proc.killed && proc.exitCode === null,
);
if (alive.length === 0) {
clearInterval(checkInterval);
resolve();
}
}, 500);
setTimeout(() => {
clearInterval(checkInterval);
// SIGKILL survivors
for (const { jid, proc } of activeProcs) {
if (!proc.killed && proc.exitCode === null) {
logger.warn({ jid, pid: proc.pid }, 'Sending SIGKILL to container');
proc.kill('SIGKILL');
}
}
resolve();
}, gracePeriodMs);
});
}
}