fix: only preempt idle containers when scheduled tasks enqueue

Containers that finish work but stay alive in waitForIpcMessage() block
queued scheduled tasks. Previous approaches killed active containers
mid-work. This fix tracks idle state via the session-update marker
(status: success, result: null) and only preempts when the container
is idle-waiting, not actively working.

Closes #293

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-02-21 21:16:13 +02:00
parent 6f177adafe
commit 93bb94ff55
4 changed files with 145 additions and 0 deletions

View File

@@ -242,4 +242,123 @@ describe('GroupQueue', () => {
expect(processed).toContain('group3@g.us'); expect(processed).toContain('group3@g.us');
}); });
// --- Idle preemption ---
it('does NOT preempt active container when not idle', async () => {
const fs = await import('fs');
let resolveProcess: () => void;
const processMessages = vi.fn(async () => {
await new Promise<void>((resolve) => {
resolveProcess = resolve;
});
return true;
});
queue.setProcessMessagesFn(processMessages);
// Start processing (takes the active slot)
queue.enqueueMessageCheck('group1@g.us');
await vi.advanceTimersByTimeAsync(10);
// Register a process so closeStdin has a groupFolder
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
// Enqueue a task while container is active but NOT idle
const taskFn = vi.fn(async () => {});
queue.enqueueTask('group1@g.us', 'task-1', taskFn);
// _close should NOT have been written (container is working, not idle)
const writeFileSync = vi.mocked(fs.default.writeFileSync);
const closeWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && call[0].endsWith('_close'),
);
expect(closeWrites).toHaveLength(0);
resolveProcess!();
await vi.advanceTimersByTimeAsync(10);
});
it('preempts idle container when task is enqueued', async () => {
const fs = await import('fs');
let resolveProcess: () => void;
const processMessages = vi.fn(async () => {
await new Promise<void>((resolve) => {
resolveProcess = resolve;
});
return true;
});
queue.setProcessMessagesFn(processMessages);
// Start processing
queue.enqueueMessageCheck('group1@g.us');
await vi.advanceTimersByTimeAsync(10);
// Register process and mark idle
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
queue.notifyIdle('group1@g.us');
// Clear previous writes, then enqueue a task
const writeFileSync = vi.mocked(fs.default.writeFileSync);
writeFileSync.mockClear();
const taskFn = vi.fn(async () => {});
queue.enqueueTask('group1@g.us', 'task-1', taskFn);
// _close SHOULD have been written (container is idle)
const closeWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && call[0].endsWith('_close'),
);
expect(closeWrites).toHaveLength(1);
resolveProcess!();
await vi.advanceTimersByTimeAsync(10);
});
it('preempts when idle arrives with pending tasks', async () => {
const fs = await import('fs');
let resolveProcess: () => void;
const processMessages = vi.fn(async () => {
await new Promise<void>((resolve) => {
resolveProcess = resolve;
});
return true;
});
queue.setProcessMessagesFn(processMessages);
// Start processing
queue.enqueueMessageCheck('group1@g.us');
await vi.advanceTimersByTimeAsync(10);
// Register process and enqueue a task (no idle yet — no preemption)
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
const writeFileSync = vi.mocked(fs.default.writeFileSync);
writeFileSync.mockClear();
const taskFn = vi.fn(async () => {});
queue.enqueueTask('group1@g.us', 'task-1', taskFn);
let closeWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && call[0].endsWith('_close'),
);
expect(closeWrites).toHaveLength(0);
// Now container becomes idle — should preempt because task is pending
writeFileSync.mockClear();
queue.notifyIdle('group1@g.us');
closeWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && call[0].endsWith('_close'),
);
expect(closeWrites).toHaveLength(1);
resolveProcess!();
await vi.advanceTimersByTimeAsync(10);
});
}); });

View File

@@ -16,6 +16,7 @@ const BASE_RETRY_MS = 5000;
interface GroupState { interface GroupState {
active: boolean; active: boolean;
idleWaiting: boolean;
pendingMessages: boolean; pendingMessages: boolean;
pendingTasks: QueuedTask[]; pendingTasks: QueuedTask[];
process: ChildProcess | null; process: ChildProcess | null;
@@ -37,6 +38,7 @@ export class GroupQueue {
if (!state) { if (!state) {
state = { state = {
active: false, active: false,
idleWaiting: false,
pendingMessages: false, pendingMessages: false,
pendingTasks: [], pendingTasks: [],
process: null, process: null,
@@ -92,6 +94,9 @@ export class GroupQueue {
if (state.active) { if (state.active) {
state.pendingTasks.push({ id: taskId, groupJid, fn }); state.pendingTasks.push({ id: taskId, groupJid, fn });
if (state.idleWaiting) {
this.closeStdin(groupJid);
}
logger.debug({ groupJid, taskId }, 'Container active, task queued'); logger.debug({ groupJid, taskId }, 'Container active, task queued');
return; return;
} }
@@ -119,6 +124,18 @@ export class GroupQueue {
if (groupFolder) state.groupFolder = groupFolder; if (groupFolder) state.groupFolder = groupFolder;
} }
/**
* Mark the container as idle-waiting (finished work, waiting for IPC input).
* If tasks are pending, preempt the idle container immediately.
*/
notifyIdle(groupJid: string): void {
const state = this.getGroup(groupJid);
state.idleWaiting = true;
if (state.pendingTasks.length > 0) {
this.closeStdin(groupJid);
}
}
/** /**
* Send a follow-up message to the active container via IPC file. * Send a follow-up message to the active container via IPC file.
* Returns true if the message was written, false if no active container. * Returns true if the message was written, false if no active container.
@@ -163,6 +180,7 @@ export class GroupQueue {
): Promise<void> { ): Promise<void> {
const state = this.getGroup(groupJid); const state = this.getGroup(groupJid);
state.active = true; state.active = true;
state.idleWaiting = false;
state.pendingMessages = false; state.pendingMessages = false;
this.activeCount++; this.activeCount++;
@@ -196,6 +214,7 @@ export class GroupQueue {
private async runTask(groupJid: string, task: QueuedTask): Promise<void> { private async runTask(groupJid: string, task: QueuedTask): Promise<void> {
const state = this.getGroup(groupJid); const state = this.getGroup(groupJid);
state.active = true; state.active = true;
state.idleWaiting = false;
this.activeCount++; this.activeCount++;
logger.debug( logger.debug(

View File

@@ -187,6 +187,10 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
resetIdleTimer(); resetIdleTimer();
} }
if (!result.result && result.status === 'success') {
queue.notifyIdle(chatJid);
}
if (result.status === 'error') { if (result.status === 'error') {
hadError = true; hadError = true;
} }

View File

@@ -121,6 +121,9 @@ async function runTask(
// Only reset idle timer on actual results, not session-update markers // Only reset idle timer on actual results, not session-update markers
resetIdleTimer(); resetIdleTimer();
} }
if (!streamedOutput.result && streamedOutput.status === 'success') {
deps.queue.notifyIdle(task.chat_jid);
}
if (streamedOutput.status === 'error') { if (streamedOutput.status === 'error') {
error = streamedOutput.error || 'Unknown error'; error = streamedOutput.error || 'Unknown error';
} }