diff --git a/src/config.ts b/src/config.ts index e1cbe11..12f04d9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -53,6 +53,10 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt( ); // 10MB default export const ONECLI_URL = process.env.ONECLI_URL || envConfig.ONECLI_URL || 'http://localhost:10254'; +export const MAX_MESSAGES_PER_PROMPT = Math.max( + 1, + parseInt(process.env.MAX_MESSAGES_PER_PROMPT || '10', 10) || 10, +); export const IPC_POLL_INTERVAL = 1000; export const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '1800000', 10); // 30min default — how long to keep container alive after last result export const MAX_CONCURRENT_CONTAINERS = Math.max( diff --git a/src/db.test.ts b/src/db.test.ts index a40d376..ff4872a 100644 --- a/src/db.test.ts +++ b/src/db.test.ts @@ -6,6 +6,7 @@ import { deleteTask, getAllChats, getAllRegisteredGroups, + getLastBotMessageTimestamp, getMessagesSince, getNewMessages, getTaskById, @@ -14,6 +15,7 @@ import { storeMessage, updateTask, } from './db.js'; +import { formatMessages } from './router.js'; beforeEach(() => { _initTestDatabase(); @@ -208,6 +210,92 @@ describe('getMessagesSince', () => { expect(msgs).toHaveLength(3); }); + it('recovers cursor from last bot reply when lastAgentTimestamp is missing', () => { + // beforeEach already inserts m3 (bot reply at 00:00:03) and m4 (user at 00:00:04) + // Add more old history before the bot reply + for (let i = 1; i <= 50; i++) { + store({ + id: `history-${i}`, + chat_jid: 'group@g.us', + sender: 'user@s.whatsapp.net', + sender_name: 'User', + content: `old message ${i}`, + timestamp: `2023-06-${String(i).padStart(2, '0')}T12:00:00.000Z`, + }); + } + + // New message after the bot reply (m3 at 00:00:03) + store({ + id: 'new-1', + chat_jid: 'group@g.us', + sender: 'user@s.whatsapp.net', + sender_name: 'User', + content: 'new message after bot reply', + timestamp: '2024-01-02T00:00:00.000Z', + }); + + // Recover cursor from the last bot message (m3 from beforeEach) + const recovered = getLastBotMessageTimestamp('group@g.us', 'Andy'); + expect(recovered).toBe('2024-01-01T00:00:03.000Z'); + + // Using recovered cursor: only gets messages after the bot reply + const msgs = getMessagesSince('group@g.us', recovered!, 'Andy', 10); + // m4 (third, 00:00:04) + new-1 — skips all 50 old messages and m1/m2 + expect(msgs).toHaveLength(2); + expect(msgs[0].content).toBe('third'); + expect(msgs[1].content).toBe('new message after bot reply'); + }); + + it('caps messages to configured limit even with recovered cursor', () => { + // beforeEach inserts m3 (bot at 00:00:03). Add 30 messages after it. + for (let i = 1; i <= 30; i++) { + store({ + id: `pending-${i}`, + chat_jid: 'group@g.us', + sender: 'user@s.whatsapp.net', + sender_name: 'User', + content: `pending message ${i}`, + timestamp: `2024-02-${String(i).padStart(2, '0')}T12:00:00.000Z`, + }); + } + + const recovered = getLastBotMessageTimestamp('group@g.us', 'Andy'); + expect(recovered).toBe('2024-01-01T00:00:03.000Z'); + + // With limit=10, only the 10 most recent are returned + const msgs = getMessagesSince('group@g.us', recovered!, 'Andy', 10); + expect(msgs).toHaveLength(10); + // Most recent 10: pending-21 through pending-30 + expect(msgs[0].content).toBe('pending message 21'); + expect(msgs[9].content).toBe('pending message 30'); + }); + + it('returns last N messages when no bot reply and no cursor exist', () => { + // Use a fresh group with no bot messages + storeChatMetadata('fresh@g.us', '2024-01-01T00:00:00.000Z'); + for (let i = 1; i <= 20; i++) { + store({ + id: `fresh-${i}`, + chat_jid: 'fresh@g.us', + sender: 'user@s.whatsapp.net', + sender_name: 'User', + content: `message ${i}`, + timestamp: `2024-02-${String(i).padStart(2, '0')}T12:00:00.000Z`, + }); + } + + const recovered = getLastBotMessageTimestamp('fresh@g.us', 'Andy'); + expect(recovered).toBeUndefined(); + + // No cursor → sinceTimestamp = '' but limit caps the result + const msgs = getMessagesSince('fresh@g.us', '', 'Andy', 10); + expect(msgs).toHaveLength(10); + + const prompt = formatMessages(msgs, 'Asia/Jerusalem'); + const messageTagCount = (prompt.match(/ { // Simulate a message written before migration: has prefix but is_bot_message = 0 store({ diff --git a/src/db.ts b/src/db.ts index 718bc60..7fba354 100644 --- a/src/db.ts +++ b/src/db.ts @@ -375,6 +375,19 @@ export function getMessagesSince( .all(chatJid, sinceTimestamp, `${botPrefix}:%`, limit) as NewMessage[]; } +export function getLastBotMessageTimestamp( + chatJid: string, + botPrefix: string, +): string | undefined { + const row = db + .prepare( + `SELECT MAX(timestamp) as ts FROM messages + WHERE chat_jid = ? AND (is_bot_message = 1 OR content LIKE ?)`, + ) + .get(chatJid, `${botPrefix}:%`) as { ts: string | null } | undefined; + return row?.ts ?? undefined; +} + export function createTask( task: Omit, ): void { diff --git a/src/index.ts b/src/index.ts index bf57823..eaf9432 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,7 @@ import { getTriggerPattern, GROUPS_DIR, IDLE_TIMEOUT, + MAX_MESSAGES_PER_PROMPT, ONECLI_URL, POLL_INTERVAL, TIMEZONE, @@ -33,6 +34,7 @@ import { getAllRegisteredGroups, getAllSessions, getAllTasks, + getLastBotMessageTimestamp, getMessagesSince, getNewMessages, getRouterState, @@ -112,6 +114,27 @@ function loadState(): void { ); } +/** + * Return the message cursor for a group, recovering from the last bot reply + * if lastAgentTimestamp is missing (new group, corrupted state, restart). + */ +function getOrRecoverCursor(chatJid: string): string { + const existing = lastAgentTimestamp[chatJid]; + if (existing) return existing; + + const botTs = getLastBotMessageTimestamp(chatJid, ASSISTANT_NAME); + if (botTs) { + logger.info( + { chatJid, recoveredFrom: botTs }, + 'Recovered message cursor from last bot reply', + ); + lastAgentTimestamp[chatJid] = botTs; + saveState(); + return botTs; + } + return ''; +} + function saveState(): void { setRouterState('last_timestamp', lastTimestamp); setRouterState('last_agent_timestamp', JSON.stringify(lastAgentTimestamp)); @@ -205,11 +228,11 @@ async function processGroupMessages(chatJid: string): Promise { const isMainGroup = group.isMain === true; - const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; const missedMessages = getMessagesSince( chatJid, - sinceTimestamp, + getOrRecoverCursor(chatJid), ASSISTANT_NAME, + MAX_MESSAGES_PER_PROMPT, ); if (missedMessages.length === 0) return true; @@ -461,8 +484,9 @@ async function startMessageLoop(): Promise { // context that accumulated between triggers is included. const allPending = getMessagesSince( chatJid, - lastAgentTimestamp[chatJid] || '', + getOrRecoverCursor(chatJid), ASSISTANT_NAME, + MAX_MESSAGES_PER_PROMPT, ); const messagesToSend = allPending.length > 0 ? allPending : groupMessages; @@ -501,8 +525,12 @@ async function startMessageLoop(): Promise { */ function recoverPendingMessages(): void { for (const [chatJid, group] of Object.entries(registeredGroups)) { - const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; - const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME); + const pending = getMessagesSince( + chatJid, + getOrRecoverCursor(chatJid), + ASSISTANT_NAME, + MAX_MESSAGES_PER_PROMPT, + ); if (pending.length > 0) { logger.info( { group: group.name, pendingCount: pending.length },