Files
nanoclaw/src/group-queue.ts
Gabi Simons f794185c21 fix: atomic claim prevents scheduled tasks from executing twice (#657)
* fix: atomic claim prevents scheduled tasks from executing twice (#138)

Replace the two-phase getDueTasks() + deferred updateTaskAfterRun() with
an atomic SQLite transaction (claimDueTasks) that advances next_run
BEFORE dispatching tasks to the queue. This eliminates the race window
where subsequent scheduler polls re-discover in-progress tasks.

Key changes:
- claimDueTasks(): SELECT + UPDATE in a single db.transaction(), so no
  poll can read stale next_run values. Once-tasks get next_run=NULL;
  recurring tasks get next_run advanced to the future.
- computeNextRun(): anchors interval tasks to the scheduled time (not
  Date.now()) to prevent cumulative drift. Includes a while-loop to
  skip missed intervals and a guard against invalid interval values.
- updateTaskAfterRun(): simplified to only record last_run/last_result
  since next_run is already handled by the claim.

Closes #138, #211, #300, #578

Co-authored-by: @taslim (PR #601)
Co-authored-by: @baijunjie (Issue #138)
Co-authored-by: @Michaelliv (Issue #300)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* style: apply prettier formatting

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: track running task ID in GroupQueue to prevent duplicate execution (#138)

Previous commits implemented an "atomic claim" approach (claimDueTasks)
that advanced next_run before execution. Per Gavriel's review, this
solved the symptom at the wrong layer and introduced crash-recovery
risks for once-tasks.

This commit reverts claimDueTasks and instead fixes the actual bug:
GroupQueue.enqueueTask() only checked pendingTasks for duplicates, but
running tasks had already been shifted out. Adding runningTaskId to
GroupState closes that gap with a 3-line fix at the correct layer.

The computeNextRun() drift fix is retained, applied post-execution
where it belongs.

Closes #138, #211, #300, #578

Co-authored-by: @taslim (PR #601)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add changelog entry for scheduler duplicate fix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add contributors for scheduler race condition fix

Co-Authored-By: Taslim <9999802+taslim@users.noreply.github.com>
Co-Authored-By: BaiJunjie <7956480+baijunjie@users.noreply.github.com>
Co-Authored-By: Michael <13676242+Michaelliv@users.noreply.github.com>
Co-Authored-By: Kyle Zhike Chen <3477852+kk17@users.noreply.github.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: gavrielc <gabicohen22@yahoo.com>
Co-authored-by: Taslim <9999802+taslim@users.noreply.github.com>
Co-authored-by: BaiJunjie <7956480+baijunjie@users.noreply.github.com>
Co-authored-by: Michael <13676242+Michaelliv@users.noreply.github.com>
Co-authored-by: Kyle Zhike Chen <3477852+kk17@users.noreply.github.com>
2026-03-04 16:23:29 +02:00

366 lines
10 KiB
TypeScript

import { ChildProcess } from 'child_process';
import fs from 'fs';
import path from 'path';
import { DATA_DIR, MAX_CONCURRENT_CONTAINERS } from './config.js';
import { logger } from './logger.js';
interface QueuedTask {
id: string;
groupJid: string;
fn: () => Promise<void>;
}
const MAX_RETRIES = 5;
const BASE_RETRY_MS = 5000;
interface GroupState {
active: boolean;
idleWaiting: boolean;
isTaskContainer: boolean;
runningTaskId: string | null;
pendingMessages: boolean;
pendingTasks: QueuedTask[];
process: ChildProcess | null;
containerName: string | null;
groupFolder: string | null;
retryCount: number;
}
export class GroupQueue {
private groups = new Map<string, GroupState>();
private activeCount = 0;
private waitingGroups: string[] = [];
private processMessagesFn: ((groupJid: string) => Promise<boolean>) | null =
null;
private shuttingDown = false;
private getGroup(groupJid: string): GroupState {
let state = this.groups.get(groupJid);
if (!state) {
state = {
active: false,
idleWaiting: false,
isTaskContainer: false,
runningTaskId: null,
pendingMessages: false,
pendingTasks: [],
process: null,
containerName: null,
groupFolder: null,
retryCount: 0,
};
this.groups.set(groupJid, state);
}
return state;
}
setProcessMessagesFn(fn: (groupJid: string) => Promise<boolean>): void {
this.processMessagesFn = fn;
}
enqueueMessageCheck(groupJid: string): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
if (state.active) {
state.pendingMessages = true;
logger.debug({ groupJid }, 'Container active, message queued');
return;
}
if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) {
state.pendingMessages = true;
if (!this.waitingGroups.includes(groupJid)) {
this.waitingGroups.push(groupJid);
}
logger.debug(
{ groupJid, activeCount: this.activeCount },
'At concurrency limit, message queued',
);
return;
}
this.runForGroup(groupJid, 'messages').catch((err) =>
logger.error({ groupJid, err }, 'Unhandled error in runForGroup'),
);
}
enqueueTask(groupJid: string, taskId: string, fn: () => Promise<void>): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
// Prevent double-queuing: check both pending and currently-running task
if (state.runningTaskId === taskId) {
logger.debug({ groupJid, taskId }, 'Task already running, skipping');
return;
}
if (state.pendingTasks.some((t) => t.id === taskId)) {
logger.debug({ groupJid, taskId }, 'Task already queued, skipping');
return;
}
if (state.active) {
state.pendingTasks.push({ id: taskId, groupJid, fn });
if (state.idleWaiting) {
this.closeStdin(groupJid);
}
logger.debug({ groupJid, taskId }, 'Container active, task queued');
return;
}
if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) {
state.pendingTasks.push({ id: taskId, groupJid, fn });
if (!this.waitingGroups.includes(groupJid)) {
this.waitingGroups.push(groupJid);
}
logger.debug(
{ groupJid, taskId, activeCount: this.activeCount },
'At concurrency limit, task queued',
);
return;
}
// Run immediately
this.runTask(groupJid, { id: taskId, groupJid, fn }).catch((err) =>
logger.error({ groupJid, taskId, err }, 'Unhandled error in runTask'),
);
}
registerProcess(
groupJid: string,
proc: ChildProcess,
containerName: string,
groupFolder?: string,
): void {
const state = this.getGroup(groupJid);
state.process = proc;
state.containerName = containerName;
if (groupFolder) state.groupFolder = groupFolder;
}
/**
* Mark the container as idle-waiting (finished work, waiting for IPC input).
* If tasks are pending, preempt the idle container immediately.
*/
notifyIdle(groupJid: string): void {
const state = this.getGroup(groupJid);
state.idleWaiting = true;
if (state.pendingTasks.length > 0) {
this.closeStdin(groupJid);
}
}
/**
* Send a follow-up message to the active container via IPC file.
* Returns true if the message was written, false if no active container.
*/
sendMessage(groupJid: string, text: string): boolean {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder || state.isTaskContainer)
return false;
state.idleWaiting = false; // Agent is about to receive work, no longer idle
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
try {
fs.mkdirSync(inputDir, { recursive: true });
const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 6)}.json`;
const filepath = path.join(inputDir, filename);
const tempPath = `${filepath}.tmp`;
fs.writeFileSync(tempPath, JSON.stringify({ type: 'message', text }));
fs.renameSync(tempPath, filepath);
return true;
} catch {
return false;
}
}
/**
* Signal the active container to wind down by writing a close sentinel.
*/
closeStdin(groupJid: string): void {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder) return;
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
try {
fs.mkdirSync(inputDir, { recursive: true });
fs.writeFileSync(path.join(inputDir, '_close'), '');
} catch {
// ignore
}
}
private async runForGroup(
groupJid: string,
reason: 'messages' | 'drain',
): Promise<void> {
const state = this.getGroup(groupJid);
state.active = true;
state.idleWaiting = false;
state.isTaskContainer = false;
state.pendingMessages = false;
this.activeCount++;
logger.debug(
{ groupJid, reason, activeCount: this.activeCount },
'Starting container for group',
);
try {
if (this.processMessagesFn) {
const success = await this.processMessagesFn(groupJid);
if (success) {
state.retryCount = 0;
} else {
this.scheduleRetry(groupJid, state);
}
}
} catch (err) {
logger.error({ groupJid, err }, 'Error processing messages for group');
this.scheduleRetry(groupJid, state);
} finally {
state.active = false;
state.process = null;
state.containerName = null;
state.groupFolder = null;
this.activeCount--;
this.drainGroup(groupJid);
}
}
private async runTask(groupJid: string, task: QueuedTask): Promise<void> {
const state = this.getGroup(groupJid);
state.active = true;
state.idleWaiting = false;
state.isTaskContainer = true;
state.runningTaskId = task.id;
this.activeCount++;
logger.debug(
{ groupJid, taskId: task.id, activeCount: this.activeCount },
'Running queued task',
);
try {
await task.fn();
} catch (err) {
logger.error({ groupJid, taskId: task.id, err }, 'Error running task');
} finally {
state.active = false;
state.isTaskContainer = false;
state.runningTaskId = null;
state.process = null;
state.containerName = null;
state.groupFolder = null;
this.activeCount--;
this.drainGroup(groupJid);
}
}
private scheduleRetry(groupJid: string, state: GroupState): void {
state.retryCount++;
if (state.retryCount > MAX_RETRIES) {
logger.error(
{ groupJid, retryCount: state.retryCount },
'Max retries exceeded, dropping messages (will retry on next incoming message)',
);
state.retryCount = 0;
return;
}
const delayMs = BASE_RETRY_MS * Math.pow(2, state.retryCount - 1);
logger.info(
{ groupJid, retryCount: state.retryCount, delayMs },
'Scheduling retry with backoff',
);
setTimeout(() => {
if (!this.shuttingDown) {
this.enqueueMessageCheck(groupJid);
}
}, delayMs);
}
private drainGroup(groupJid: string): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
// Tasks first (they won't be re-discovered from SQLite like messages)
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(groupJid, task).catch((err) =>
logger.error(
{ groupJid, taskId: task.id, err },
'Unhandled error in runTask (drain)',
),
);
return;
}
// Then pending messages
if (state.pendingMessages) {
this.runForGroup(groupJid, 'drain').catch((err) =>
logger.error(
{ groupJid, err },
'Unhandled error in runForGroup (drain)',
),
);
return;
}
// Nothing pending for this group; check if other groups are waiting for a slot
this.drainWaiting();
}
private drainWaiting(): void {
while (
this.waitingGroups.length > 0 &&
this.activeCount < MAX_CONCURRENT_CONTAINERS
) {
const nextJid = this.waitingGroups.shift()!;
const state = this.getGroup(nextJid);
// Prioritize tasks over messages
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(nextJid, task).catch((err) =>
logger.error(
{ groupJid: nextJid, taskId: task.id, err },
'Unhandled error in runTask (waiting)',
),
);
} else if (state.pendingMessages) {
this.runForGroup(nextJid, 'drain').catch((err) =>
logger.error(
{ groupJid: nextJid, err },
'Unhandled error in runForGroup (waiting)',
),
);
}
// If neither pending, skip this group
}
}
async shutdown(_gracePeriodMs: number): Promise<void> {
this.shuttingDown = true;
// Count active containers but don't kill them — they'll finish on their own
// via idle timeout or container timeout. The --rm flag cleans them up on exit.
// This prevents WhatsApp reconnection restarts from killing working agents.
const activeContainers: string[] = [];
for (const [jid, state] of this.groups) {
if (state.process && !state.process.killed && state.containerName) {
activeContainers.push(state.containerName);
}
}
logger.info(
{ activeCount: this.activeCount, detachedContainers: activeContainers },
'GroupQueue shutting down (containers detached, not killed)',
);
}
}