fix: correctly trigger idle preemption in streaming input mode

The original notifyIdle condition (!result.result) never fired in
streaming input mode because every result has non-null text content.
This caused due tasks to wait up to 30 minutes for the idle timer.

- Call notifyIdle for ALL successful results (not just null ones)
- Add isTaskContainer flag so user messages queue instead of being
  forwarded to task containers (which blocked notifyIdle from the
  message container's onOutput path)
- Reset idleWaiting in sendMessage so containers aren't preempted
  while actively working on a new incoming message
- Replace 30-min IDLE_TIMEOUT with 10s close timer for task containers
  since they are single-turn and should exit promptly after their result

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Gavriel Cohen
2026-02-21 22:11:42 +02:00
committed by gavrielc
parent 93bb94ff55
commit c6b69e87a9
3 changed files with 22 additions and 15 deletions

View File

@@ -17,6 +17,7 @@ const BASE_RETRY_MS = 5000;
interface GroupState { interface GroupState {
active: boolean; active: boolean;
idleWaiting: boolean; idleWaiting: boolean;
isTaskContainer: boolean;
pendingMessages: boolean; pendingMessages: boolean;
pendingTasks: QueuedTask[]; pendingTasks: QueuedTask[];
process: ChildProcess | null; process: ChildProcess | null;
@@ -39,6 +40,7 @@ export class GroupQueue {
state = { state = {
active: false, active: false,
idleWaiting: false, idleWaiting: false,
isTaskContainer: false,
pendingMessages: false, pendingMessages: false,
pendingTasks: [], pendingTasks: [],
process: null, process: null,
@@ -142,7 +144,8 @@ export class GroupQueue {
*/ */
sendMessage(groupJid: string, text: string): boolean { sendMessage(groupJid: string, text: string): boolean {
const state = this.getGroup(groupJid); const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder) return false; if (!state.active || !state.groupFolder || state.isTaskContainer) return false;
state.idleWaiting = false; // Agent is about to receive work, no longer idle
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input'); const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
try { try {
@@ -181,6 +184,7 @@ export class GroupQueue {
const state = this.getGroup(groupJid); const state = this.getGroup(groupJid);
state.active = true; state.active = true;
state.idleWaiting = false; state.idleWaiting = false;
state.isTaskContainer = false;
state.pendingMessages = false; state.pendingMessages = false;
this.activeCount++; this.activeCount++;
@@ -215,6 +219,7 @@ export class GroupQueue {
const state = this.getGroup(groupJid); const state = this.getGroup(groupJid);
state.active = true; state.active = true;
state.idleWaiting = false; state.idleWaiting = false;
state.isTaskContainer = true;
this.activeCount++; this.activeCount++;
logger.debug( logger.debug(
@@ -228,6 +233,7 @@ export class GroupQueue {
logger.error({ groupJid, taskId: task.id, err }, 'Error running task'); logger.error({ groupJid, taskId: task.id, err }, 'Error running task');
} finally { } finally {
state.active = false; state.active = false;
state.isTaskContainer = false;
state.process = null; state.process = null;
state.containerName = null; state.containerName = null;
state.groupFolder = null; state.groupFolder = null;

View File

@@ -187,7 +187,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
resetIdleTimer(); resetIdleTimer();
} }
if (!result.result && result.status === 'success') { if (result.status === 'success') {
queue.notifyIdle(chatJid); queue.notifyIdle(chatJid);
} }

View File

@@ -89,16 +89,18 @@ async function runTask(
const sessionId = const sessionId =
task.context_mode === 'group' ? sessions[task.group_folder] : undefined; task.context_mode === 'group' ? sessions[task.group_folder] : undefined;
// Idle timer: writes _close sentinel after IDLE_TIMEOUT of no output, // After the task produces a result, close the container promptly.
// so the container exits instead of hanging at waitForIpcMessage forever. // Tasks are single-turn — no need to wait IDLE_TIMEOUT (30 min) for the
let idleTimer: ReturnType<typeof setTimeout> | null = null; // query loop to time out. A short delay handles any final MCP calls.
const TASK_CLOSE_DELAY_MS = 10000;
let closeTimer: ReturnType<typeof setTimeout> | null = null;
const resetIdleTimer = () => { const scheduleClose = () => {
if (idleTimer) clearTimeout(idleTimer); if (closeTimer) return; // already scheduled
idleTimer = setTimeout(() => { closeTimer = setTimeout(() => {
logger.debug({ taskId: task.id }, 'Scheduled task idle timeout, closing container stdin'); logger.debug({ taskId: task.id }, 'Closing task container after result');
deps.queue.closeStdin(task.chat_jid); deps.queue.closeStdin(task.chat_jid);
}, IDLE_TIMEOUT); }, TASK_CLOSE_DELAY_MS);
}; };
try { try {
@@ -118,10 +120,9 @@ async function runTask(
result = streamedOutput.result; result = streamedOutput.result;
// Forward result to user (sendMessage handles formatting) // Forward result to user (sendMessage handles formatting)
await deps.sendMessage(task.chat_jid, streamedOutput.result); await deps.sendMessage(task.chat_jid, streamedOutput.result);
// Only reset idle timer on actual results, not session-update markers scheduleClose();
resetIdleTimer();
} }
if (!streamedOutput.result && streamedOutput.status === 'success') { if (streamedOutput.status === 'success') {
deps.queue.notifyIdle(task.chat_jid); deps.queue.notifyIdle(task.chat_jid);
} }
if (streamedOutput.status === 'error') { if (streamedOutput.status === 'error') {
@@ -130,7 +131,7 @@ async function runTask(
}, },
); );
if (idleTimer) clearTimeout(idleTimer); if (closeTimer) clearTimeout(closeTimer);
if (output.status === 'error') { if (output.status === 'error') {
error = output.error || 'Unknown error'; error = output.error || 'Unknown error';
@@ -144,7 +145,7 @@ async function runTask(
'Task completed', 'Task completed',
); );
} catch (err) { } catch (err) {
if (idleTimer) clearTimeout(idleTimer); if (closeTimer) clearTimeout(closeTimer);
error = err instanceof Error ? err.message : String(err); error = err instanceof Error ? err.message : String(err);
logger.error({ taskId: task.id, error }, 'Task failed'); logger.error({ taskId: task.id, error }, 'Task failed');
} }