From f794185c21de12cc89bd8fda35e0e2eb1c120fda Mon Sep 17 00:00:00 2001 From: Gabi Simons Date: Wed, 4 Mar 2026 16:23:29 +0200 Subject: [PATCH] fix: atomic claim prevents scheduled tasks from executing twice (#657) * fix: atomic claim prevents scheduled tasks from executing twice (#138) Replace the two-phase getDueTasks() + deferred updateTaskAfterRun() with an atomic SQLite transaction (claimDueTasks) that advances next_run BEFORE dispatching tasks to the queue. This eliminates the race window where subsequent scheduler polls re-discover in-progress tasks. Key changes: - claimDueTasks(): SELECT + UPDATE in a single db.transaction(), so no poll can read stale next_run values. Once-tasks get next_run=NULL; recurring tasks get next_run advanced to the future. - computeNextRun(): anchors interval tasks to the scheduled time (not Date.now()) to prevent cumulative drift. Includes a while-loop to skip missed intervals and a guard against invalid interval values. - updateTaskAfterRun(): simplified to only record last_run/last_result since next_run is already handled by the claim. Closes #138, #211, #300, #578 Co-authored-by: @taslim (PR #601) Co-authored-by: @baijunjie (Issue #138) Co-authored-by: @Michaelliv (Issue #300) Co-authored-by: Claude Opus 4.6 * style: apply prettier formatting Co-Authored-By: Claude Opus 4.6 * fix: track running task ID in GroupQueue to prevent duplicate execution (#138) Previous commits implemented an "atomic claim" approach (claimDueTasks) that advanced next_run before execution. Per Gavriel's review, this solved the symptom at the wrong layer and introduced crash-recovery risks for once-tasks. This commit reverts claimDueTasks and instead fixes the actual bug: GroupQueue.enqueueTask() only checked pendingTasks for duplicates, but running tasks had already been shifted out. Adding runningTaskId to GroupState closes that gap with a 3-line fix at the correct layer. The computeNextRun() drift fix is retained, applied post-execution where it belongs. Closes #138, #211, #300, #578 Co-authored-by: @taslim (PR #601) Co-Authored-By: Claude Opus 4.6 * docs: add changelog entry for scheduler duplicate fix Co-Authored-By: Claude Opus 4.6 * docs: add contributors for scheduler race condition fix Co-Authored-By: Taslim <9999802+taslim@users.noreply.github.com> Co-Authored-By: BaiJunjie <7956480+baijunjie@users.noreply.github.com> Co-Authored-By: Michael <13676242+Michaelliv@users.noreply.github.com> Co-Authored-By: Kyle Zhike Chen <3477852+kk17@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: gavrielc Co-authored-by: Taslim <9999802+taslim@users.noreply.github.com> Co-authored-by: BaiJunjie <7956480+baijunjie@users.noreply.github.com> Co-authored-by: Michael <13676242+Michaelliv@users.noreply.github.com> Co-authored-by: Kyle Zhike Chen <3477852+kk17@users.noreply.github.com> --- CHANGELOG.md | 1 + CONTRIBUTORS.md | 4 ++ src/group-queue.test.ts | 35 ++++++++++++++++++ src/group-queue.ts | 10 ++++- src/task-scheduler.test.ts | 76 ++++++++++++++++++++++++++++++++++++++ src/task-scheduler.ts | 54 +++++++++++++++++++++------ 6 files changed, 167 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1a0f85..bcb6496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,3 +5,4 @@ All notable changes to NanoClaw will be documented in this file. ## [1.2.0](https://github.com/qwibitai/nanoclaw/compare/v1.1.6...v1.2.0) [BREAKING] WhatsApp removed from core, now a skill. Run `/add-whatsapp` to re-add (existing auth/groups preserved). +- **fix:** Prevent scheduled tasks from executing twice when container runtime exceeds poll interval (#138, #669) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 7b08414..1d4a5de 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -9,3 +9,7 @@ Thanks to everyone who has contributed to NanoClaw! - [AmaxGuan](https://github.com/AmaxGuan) — Lingfeng Guan - [happydog-intj](https://github.com/happydog-intj) — happy dog - [bindoon](https://github.com/bindoon) — 潕量 +- [taslim](https://github.com/taslim) — Taslim +- [baijunjie](https://github.com/baijunjie) — BaiJunjie +- [Michaelliv](https://github.com/Michaelliv) — Michael +- [kk17](https://github.com/kk17) — Kyle Zhike Chen diff --git a/src/group-queue.test.ts b/src/group-queue.test.ts index b1a4f9c..ca2702a 100644 --- a/src/group-queue.test.ts +++ b/src/group-queue.test.ts @@ -243,6 +243,41 @@ describe('GroupQueue', () => { expect(processed).toContain('group3@g.us'); }); + // --- Running task dedup (Issue #138) --- + + it('rejects duplicate enqueue of a currently-running task', async () => { + let resolveTask: () => void; + let taskCallCount = 0; + + const taskFn = vi.fn(async () => { + taskCallCount++; + await new Promise((resolve) => { + resolveTask = resolve; + }); + }); + + // Start the task (runs immediately — slot available) + queue.enqueueTask('group1@g.us', 'task-1', taskFn); + await vi.advanceTimersByTimeAsync(10); + expect(taskCallCount).toBe(1); + + // Scheduler poll re-discovers the same task while it's running — + // this must be silently dropped + const dupFn = vi.fn(async () => {}); + queue.enqueueTask('group1@g.us', 'task-1', dupFn); + await vi.advanceTimersByTimeAsync(10); + + // Duplicate was NOT queued + expect(dupFn).not.toHaveBeenCalled(); + + // Complete the original task + resolveTask!(); + await vi.advanceTimersByTimeAsync(10); + + // Only one execution total + expect(taskCallCount).toBe(1); + }); + // --- Idle preemption --- it('does NOT preempt active container when not idle', async () => { diff --git a/src/group-queue.ts b/src/group-queue.ts index 06a56cc..f2984ce 100644 --- a/src/group-queue.ts +++ b/src/group-queue.ts @@ -18,6 +18,7 @@ interface GroupState { active: boolean; idleWaiting: boolean; isTaskContainer: boolean; + runningTaskId: string | null; pendingMessages: boolean; pendingTasks: QueuedTask[]; process: ChildProcess | null; @@ -41,6 +42,7 @@ export class GroupQueue { active: false, idleWaiting: false, isTaskContainer: false, + runningTaskId: null, pendingMessages: false, pendingTasks: [], process: null, @@ -90,7 +92,11 @@ export class GroupQueue { const state = this.getGroup(groupJid); - // Prevent double-queuing of the same task + // Prevent double-queuing: check both pending and currently-running task + if (state.runningTaskId === taskId) { + logger.debug({ groupJid, taskId }, 'Task already running, skipping'); + return; + } if (state.pendingTasks.some((t) => t.id === taskId)) { logger.debug({ groupJid, taskId }, 'Task already queued, skipping'); return; @@ -230,6 +236,7 @@ export class GroupQueue { state.active = true; state.idleWaiting = false; state.isTaskContainer = true; + state.runningTaskId = task.id; this.activeCount++; logger.debug( @@ -244,6 +251,7 @@ export class GroupQueue { } finally { state.active = false; state.isTaskContainer = false; + state.runningTaskId = null; state.process = null; state.containerName = null; state.groupFolder = null; diff --git a/src/task-scheduler.test.ts b/src/task-scheduler.test.ts index 62129e8..2032b51 100644 --- a/src/task-scheduler.test.ts +++ b/src/task-scheduler.test.ts @@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { _initTestDatabase, createTask, getTaskById } from './db.js'; import { _resetSchedulerLoopForTests, + computeNextRun, startSchedulerLoop, } from './task-scheduler.js'; @@ -50,4 +51,79 @@ describe('task scheduler', () => { const task = getTaskById('task-invalid-folder'); expect(task?.status).toBe('paused'); }); + + it('computeNextRun anchors interval tasks to scheduled time to prevent drift', () => { + const scheduledTime = new Date(Date.now() - 2000).toISOString(); // 2s ago + const task = { + id: 'drift-test', + group_folder: 'test', + chat_jid: 'test@g.us', + prompt: 'test', + schedule_type: 'interval' as const, + schedule_value: '60000', // 1 minute + context_mode: 'isolated' as const, + next_run: scheduledTime, + last_run: null, + last_result: null, + status: 'active' as const, + created_at: '2026-01-01T00:00:00.000Z', + }; + + const nextRun = computeNextRun(task); + expect(nextRun).not.toBeNull(); + + // Should be anchored to scheduledTime + 60s, NOT Date.now() + 60s + const expected = new Date(scheduledTime).getTime() + 60000; + expect(new Date(nextRun!).getTime()).toBe(expected); + }); + + it('computeNextRun returns null for once-tasks', () => { + const task = { + id: 'once-test', + group_folder: 'test', + chat_jid: 'test@g.us', + prompt: 'test', + schedule_type: 'once' as const, + schedule_value: '2026-01-01T00:00:00.000Z', + context_mode: 'isolated' as const, + next_run: new Date(Date.now() - 1000).toISOString(), + last_run: null, + last_result: null, + status: 'active' as const, + created_at: '2026-01-01T00:00:00.000Z', + }; + + expect(computeNextRun(task)).toBeNull(); + }); + + it('computeNextRun skips missed intervals without infinite loop', () => { + // Task was due 10 intervals ago (missed) + const ms = 60000; + const missedBy = ms * 10; + const scheduledTime = new Date(Date.now() - missedBy).toISOString(); + + const task = { + id: 'skip-test', + group_folder: 'test', + chat_jid: 'test@g.us', + prompt: 'test', + schedule_type: 'interval' as const, + schedule_value: String(ms), + context_mode: 'isolated' as const, + next_run: scheduledTime, + last_run: null, + last_result: null, + status: 'active' as const, + created_at: '2026-01-01T00:00:00.000Z', + }; + + const nextRun = computeNextRun(task); + expect(nextRun).not.toBeNull(); + // Must be in the future + expect(new Date(nextRun!).getTime()).toBeGreaterThan(Date.now()); + // Must be aligned to the original schedule grid + const offset = + (new Date(nextRun!).getTime() - new Date(scheduledTime).getTime()) % ms; + expect(offset).toBe(0); + }); }); diff --git a/src/task-scheduler.ts b/src/task-scheduler.ts index e4f606f..8c533c7 100644 --- a/src/task-scheduler.ts +++ b/src/task-scheduler.ts @@ -21,6 +21,47 @@ import { resolveGroupFolderPath } from './group-folder.js'; import { logger } from './logger.js'; import { RegisteredGroup, ScheduledTask } from './types.js'; +/** + * Compute the next run time for a recurring task, anchored to the + * task's scheduled time rather than Date.now() to prevent cumulative + * drift on interval-based tasks. + * + * Co-authored-by: @community-pr-601 + */ +export function computeNextRun(task: ScheduledTask): string | null { + if (task.schedule_type === 'once') return null; + + const now = Date.now(); + + if (task.schedule_type === 'cron') { + const interval = CronExpressionParser.parse(task.schedule_value, { + tz: TIMEZONE, + }); + return interval.next().toISOString(); + } + + if (task.schedule_type === 'interval') { + const ms = parseInt(task.schedule_value, 10); + if (!ms || ms <= 0) { + // Guard against malformed interval that would cause an infinite loop + logger.warn( + { taskId: task.id, value: task.schedule_value }, + 'Invalid interval value', + ); + return new Date(now + 60_000).toISOString(); + } + // Anchor to the scheduled time, not now, to prevent drift. + // Skip past any missed intervals so we always land in the future. + let next = new Date(task.next_run!).getTime() + ms; + while (next <= now) { + next += ms; + } + return new Date(next).toISOString(); + } + + return null; +} + export interface SchedulerDependencies { registeredGroups: () => Record; getSessions: () => Record; @@ -187,18 +228,7 @@ async function runTask( error, }); - let nextRun: string | null = null; - if (task.schedule_type === 'cron') { - const interval = CronExpressionParser.parse(task.schedule_value, { - tz: TIMEZONE, - }); - nextRun = interval.next().toISOString(); - } else if (task.schedule_type === 'interval') { - const ms = parseInt(task.schedule_value, 10); - nextRun = new Date(Date.now() + ms).toISOString(); - } - // 'once' tasks have no next run - + const nextRun = computeNextRun(task); const resultSummary = error ? `Error: ${error}` : result