From 93bb94ff5583036efbfc9877a20ab163ab6bfd52 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Sat, 21 Feb 2026 21:16:13 +0200 Subject: [PATCH] fix: only preempt idle containers when scheduled tasks enqueue Containers that finish work but stay alive in waitForIpcMessage() block queued scheduled tasks. Previous approaches killed active containers mid-work. This fix tracks idle state via the session-update marker (status: success, result: null) and only preempts when the container is idle-waiting, not actively working. Closes #293 Co-Authored-By: Claude Opus 4.6 --- src/group-queue.test.ts | 119 ++++++++++++++++++++++++++++++++++++++++ src/group-queue.ts | 19 +++++++ src/index.ts | 4 ++ src/task-scheduler.ts | 3 + 4 files changed, 145 insertions(+) diff --git a/src/group-queue.test.ts b/src/group-queue.test.ts index 6a914a0..f236b22 100644 --- a/src/group-queue.test.ts +++ b/src/group-queue.test.ts @@ -242,4 +242,123 @@ describe('GroupQueue', () => { expect(processed).toContain('group3@g.us'); }); + + // --- Idle preemption --- + + it('does NOT preempt active container when not idle', async () => { + const fs = await import('fs'); + let resolveProcess: () => void; + + const processMessages = vi.fn(async () => { + await new Promise((resolve) => { + resolveProcess = resolve; + }); + return true; + }); + + queue.setProcessMessagesFn(processMessages); + + // Start processing (takes the active slot) + queue.enqueueMessageCheck('group1@g.us'); + await vi.advanceTimersByTimeAsync(10); + + // Register a process so closeStdin has a groupFolder + queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group'); + + // Enqueue a task while container is active but NOT idle + const taskFn = vi.fn(async () => {}); + queue.enqueueTask('group1@g.us', 'task-1', taskFn); + + // _close should NOT have been written (container is working, not idle) + const writeFileSync = vi.mocked(fs.default.writeFileSync); + const closeWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && call[0].endsWith('_close'), + ); + expect(closeWrites).toHaveLength(0); + + resolveProcess!(); + await vi.advanceTimersByTimeAsync(10); + }); + + it('preempts idle container when task is enqueued', async () => { + const fs = await import('fs'); + let resolveProcess: () => void; + + const processMessages = vi.fn(async () => { + await new Promise((resolve) => { + resolveProcess = resolve; + }); + return true; + }); + + queue.setProcessMessagesFn(processMessages); + + // Start processing + queue.enqueueMessageCheck('group1@g.us'); + await vi.advanceTimersByTimeAsync(10); + + // Register process and mark idle + queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group'); + queue.notifyIdle('group1@g.us'); + + // Clear previous writes, then enqueue a task + const writeFileSync = vi.mocked(fs.default.writeFileSync); + writeFileSync.mockClear(); + + const taskFn = vi.fn(async () => {}); + queue.enqueueTask('group1@g.us', 'task-1', taskFn); + + // _close SHOULD have been written (container is idle) + const closeWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && call[0].endsWith('_close'), + ); + expect(closeWrites).toHaveLength(1); + + resolveProcess!(); + await vi.advanceTimersByTimeAsync(10); + }); + + it('preempts when idle arrives with pending tasks', async () => { + const fs = await import('fs'); + let resolveProcess: () => void; + + const processMessages = vi.fn(async () => { + await new Promise((resolve) => { + resolveProcess = resolve; + }); + return true; + }); + + queue.setProcessMessagesFn(processMessages); + + // Start processing + queue.enqueueMessageCheck('group1@g.us'); + await vi.advanceTimersByTimeAsync(10); + + // Register process and enqueue a task (no idle yet — no preemption) + queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group'); + + const writeFileSync = vi.mocked(fs.default.writeFileSync); + writeFileSync.mockClear(); + + const taskFn = vi.fn(async () => {}); + queue.enqueueTask('group1@g.us', 'task-1', taskFn); + + let closeWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && call[0].endsWith('_close'), + ); + expect(closeWrites).toHaveLength(0); + + // Now container becomes idle — should preempt because task is pending + writeFileSync.mockClear(); + queue.notifyIdle('group1@g.us'); + + closeWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && call[0].endsWith('_close'), + ); + expect(closeWrites).toHaveLength(1); + + resolveProcess!(); + await vi.advanceTimersByTimeAsync(10); + }); }); diff --git a/src/group-queue.ts b/src/group-queue.ts index cc86dab..d1f980c 100644 --- a/src/group-queue.ts +++ b/src/group-queue.ts @@ -16,6 +16,7 @@ const BASE_RETRY_MS = 5000; interface GroupState { active: boolean; + idleWaiting: boolean; pendingMessages: boolean; pendingTasks: QueuedTask[]; process: ChildProcess | null; @@ -37,6 +38,7 @@ export class GroupQueue { if (!state) { state = { active: false, + idleWaiting: false, pendingMessages: false, pendingTasks: [], process: null, @@ -92,6 +94,9 @@ export class GroupQueue { if (state.active) { state.pendingTasks.push({ id: taskId, groupJid, fn }); + if (state.idleWaiting) { + this.closeStdin(groupJid); + } logger.debug({ groupJid, taskId }, 'Container active, task queued'); return; } @@ -119,6 +124,18 @@ export class GroupQueue { if (groupFolder) state.groupFolder = groupFolder; } + /** + * Mark the container as idle-waiting (finished work, waiting for IPC input). + * If tasks are pending, preempt the idle container immediately. + */ + notifyIdle(groupJid: string): void { + const state = this.getGroup(groupJid); + state.idleWaiting = true; + if (state.pendingTasks.length > 0) { + this.closeStdin(groupJid); + } + } + /** * Send a follow-up message to the active container via IPC file. * Returns true if the message was written, false if no active container. @@ -163,6 +180,7 @@ export class GroupQueue { ): Promise { const state = this.getGroup(groupJid); state.active = true; + state.idleWaiting = false; state.pendingMessages = false; this.activeCount++; @@ -196,6 +214,7 @@ export class GroupQueue { private async runTask(groupJid: string, task: QueuedTask): Promise { const state = this.getGroup(groupJid); state.active = true; + state.idleWaiting = false; this.activeCount++; logger.debug( diff --git a/src/index.ts b/src/index.ts index 34e4e34..d3815db 100644 --- a/src/index.ts +++ b/src/index.ts @@ -187,6 +187,10 @@ async function processGroupMessages(chatJid: string): Promise { resetIdleTimer(); } + if (!result.result && result.status === 'success') { + queue.notifyIdle(chatJid); + } + if (result.status === 'error') { hadError = true; } diff --git a/src/task-scheduler.ts b/src/task-scheduler.ts index 6efe7d5..9910bdd 100644 --- a/src/task-scheduler.ts +++ b/src/task-scheduler.ts @@ -121,6 +121,9 @@ async function runTask( // Only reset idle timer on actual results, not session-update markers resetIdleTimer(); } + if (!streamedOutput.result && streamedOutput.status === 'success') { + deps.queue.notifyIdle(task.chat_jid); + } if (streamedOutput.status === 'error') { error = streamedOutput.error || 'Unknown error'; }