fix: prevent full message history from being sent to container agents
When lastAgentTimestamp was missing (new group, corrupted state, or startup recovery), the empty-string fallback caused getMessagesSince to return up to 200 messages — the entire group history. This sent a massive prompt to the container agent instead of just recent messages. Fix: recover the cursor from the last bot reply timestamp in the DB (proof of what we already processed), and cap all prompt queries to a configurable MAX_MESSAGES_PER_PROMPT (default 10). Covers all three call sites: processGroupMessages, the piping path, and recoverPendingMessages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -53,6 +53,10 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
|
|||||||
); // 10MB default
|
); // 10MB default
|
||||||
export const ONECLI_URL =
|
export const ONECLI_URL =
|
||||||
process.env.ONECLI_URL || envConfig.ONECLI_URL || 'http://localhost:10254';
|
process.env.ONECLI_URL || envConfig.ONECLI_URL || 'http://localhost:10254';
|
||||||
|
export const MAX_MESSAGES_PER_PROMPT = Math.max(
|
||||||
|
1,
|
||||||
|
parseInt(process.env.MAX_MESSAGES_PER_PROMPT || '10', 10) || 10,
|
||||||
|
);
|
||||||
export const IPC_POLL_INTERVAL = 1000;
|
export const IPC_POLL_INTERVAL = 1000;
|
||||||
export const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '1800000', 10); // 30min default — how long to keep container alive after last result
|
export const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '1800000', 10); // 30min default — how long to keep container alive after last result
|
||||||
export const MAX_CONCURRENT_CONTAINERS = Math.max(
|
export const MAX_CONCURRENT_CONTAINERS = Math.max(
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import {
|
|||||||
deleteTask,
|
deleteTask,
|
||||||
getAllChats,
|
getAllChats,
|
||||||
getAllRegisteredGroups,
|
getAllRegisteredGroups,
|
||||||
|
getLastBotMessageTimestamp,
|
||||||
getMessagesSince,
|
getMessagesSince,
|
||||||
getNewMessages,
|
getNewMessages,
|
||||||
getTaskById,
|
getTaskById,
|
||||||
@@ -14,6 +15,7 @@ import {
|
|||||||
storeMessage,
|
storeMessage,
|
||||||
updateTask,
|
updateTask,
|
||||||
} from './db.js';
|
} from './db.js';
|
||||||
|
import { formatMessages } from './router.js';
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
_initTestDatabase();
|
_initTestDatabase();
|
||||||
@@ -208,6 +210,92 @@ describe('getMessagesSince', () => {
|
|||||||
expect(msgs).toHaveLength(3);
|
expect(msgs).toHaveLength(3);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('recovers cursor from last bot reply when lastAgentTimestamp is missing', () => {
|
||||||
|
// beforeEach already inserts m3 (bot reply at 00:00:03) and m4 (user at 00:00:04)
|
||||||
|
// Add more old history before the bot reply
|
||||||
|
for (let i = 1; i <= 50; i++) {
|
||||||
|
store({
|
||||||
|
id: `history-${i}`,
|
||||||
|
chat_jid: 'group@g.us',
|
||||||
|
sender: 'user@s.whatsapp.net',
|
||||||
|
sender_name: 'User',
|
||||||
|
content: `old message ${i}`,
|
||||||
|
timestamp: `2023-06-${String(i).padStart(2, '0')}T12:00:00.000Z`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// New message after the bot reply (m3 at 00:00:03)
|
||||||
|
store({
|
||||||
|
id: 'new-1',
|
||||||
|
chat_jid: 'group@g.us',
|
||||||
|
sender: 'user@s.whatsapp.net',
|
||||||
|
sender_name: 'User',
|
||||||
|
content: 'new message after bot reply',
|
||||||
|
timestamp: '2024-01-02T00:00:00.000Z',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Recover cursor from the last bot message (m3 from beforeEach)
|
||||||
|
const recovered = getLastBotMessageTimestamp('group@g.us', 'Andy');
|
||||||
|
expect(recovered).toBe('2024-01-01T00:00:03.000Z');
|
||||||
|
|
||||||
|
// Using recovered cursor: only gets messages after the bot reply
|
||||||
|
const msgs = getMessagesSince('group@g.us', recovered!, 'Andy', 10);
|
||||||
|
// m4 (third, 00:00:04) + new-1 — skips all 50 old messages and m1/m2
|
||||||
|
expect(msgs).toHaveLength(2);
|
||||||
|
expect(msgs[0].content).toBe('third');
|
||||||
|
expect(msgs[1].content).toBe('new message after bot reply');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('caps messages to configured limit even with recovered cursor', () => {
|
||||||
|
// beforeEach inserts m3 (bot at 00:00:03). Add 30 messages after it.
|
||||||
|
for (let i = 1; i <= 30; i++) {
|
||||||
|
store({
|
||||||
|
id: `pending-${i}`,
|
||||||
|
chat_jid: 'group@g.us',
|
||||||
|
sender: 'user@s.whatsapp.net',
|
||||||
|
sender_name: 'User',
|
||||||
|
content: `pending message ${i}`,
|
||||||
|
timestamp: `2024-02-${String(i).padStart(2, '0')}T12:00:00.000Z`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const recovered = getLastBotMessageTimestamp('group@g.us', 'Andy');
|
||||||
|
expect(recovered).toBe('2024-01-01T00:00:03.000Z');
|
||||||
|
|
||||||
|
// With limit=10, only the 10 most recent are returned
|
||||||
|
const msgs = getMessagesSince('group@g.us', recovered!, 'Andy', 10);
|
||||||
|
expect(msgs).toHaveLength(10);
|
||||||
|
// Most recent 10: pending-21 through pending-30
|
||||||
|
expect(msgs[0].content).toBe('pending message 21');
|
||||||
|
expect(msgs[9].content).toBe('pending message 30');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns last N messages when no bot reply and no cursor exist', () => {
|
||||||
|
// Use a fresh group with no bot messages
|
||||||
|
storeChatMetadata('fresh@g.us', '2024-01-01T00:00:00.000Z');
|
||||||
|
for (let i = 1; i <= 20; i++) {
|
||||||
|
store({
|
||||||
|
id: `fresh-${i}`,
|
||||||
|
chat_jid: 'fresh@g.us',
|
||||||
|
sender: 'user@s.whatsapp.net',
|
||||||
|
sender_name: 'User',
|
||||||
|
content: `message ${i}`,
|
||||||
|
timestamp: `2024-02-${String(i).padStart(2, '0')}T12:00:00.000Z`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const recovered = getLastBotMessageTimestamp('fresh@g.us', 'Andy');
|
||||||
|
expect(recovered).toBeUndefined();
|
||||||
|
|
||||||
|
// No cursor → sinceTimestamp = '' but limit caps the result
|
||||||
|
const msgs = getMessagesSince('fresh@g.us', '', 'Andy', 10);
|
||||||
|
expect(msgs).toHaveLength(10);
|
||||||
|
|
||||||
|
const prompt = formatMessages(msgs, 'Asia/Jerusalem');
|
||||||
|
const messageTagCount = (prompt.match(/<message /g) || []).length;
|
||||||
|
expect(messageTagCount).toBe(10);
|
||||||
|
});
|
||||||
|
|
||||||
it('filters pre-migration bot messages via content prefix backstop', () => {
|
it('filters pre-migration bot messages via content prefix backstop', () => {
|
||||||
// Simulate a message written before migration: has prefix but is_bot_message = 0
|
// Simulate a message written before migration: has prefix but is_bot_message = 0
|
||||||
store({
|
store({
|
||||||
|
|||||||
13
src/db.ts
13
src/db.ts
@@ -375,6 +375,19 @@ export function getMessagesSince(
|
|||||||
.all(chatJid, sinceTimestamp, `${botPrefix}:%`, limit) as NewMessage[];
|
.all(chatJid, sinceTimestamp, `${botPrefix}:%`, limit) as NewMessage[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function getLastBotMessageTimestamp(
|
||||||
|
chatJid: string,
|
||||||
|
botPrefix: string,
|
||||||
|
): string | undefined {
|
||||||
|
const row = db
|
||||||
|
.prepare(
|
||||||
|
`SELECT MAX(timestamp) as ts FROM messages
|
||||||
|
WHERE chat_jid = ? AND (is_bot_message = 1 OR content LIKE ?)`,
|
||||||
|
)
|
||||||
|
.get(chatJid, `${botPrefix}:%`) as { ts: string | null } | undefined;
|
||||||
|
return row?.ts ?? undefined;
|
||||||
|
}
|
||||||
|
|
||||||
export function createTask(
|
export function createTask(
|
||||||
task: Omit<ScheduledTask, 'last_run' | 'last_result'>,
|
task: Omit<ScheduledTask, 'last_run' | 'last_result'>,
|
||||||
): void {
|
): void {
|
||||||
|
|||||||
38
src/index.ts
38
src/index.ts
@@ -9,6 +9,7 @@ import {
|
|||||||
getTriggerPattern,
|
getTriggerPattern,
|
||||||
GROUPS_DIR,
|
GROUPS_DIR,
|
||||||
IDLE_TIMEOUT,
|
IDLE_TIMEOUT,
|
||||||
|
MAX_MESSAGES_PER_PROMPT,
|
||||||
ONECLI_URL,
|
ONECLI_URL,
|
||||||
POLL_INTERVAL,
|
POLL_INTERVAL,
|
||||||
TIMEZONE,
|
TIMEZONE,
|
||||||
@@ -33,6 +34,7 @@ import {
|
|||||||
getAllRegisteredGroups,
|
getAllRegisteredGroups,
|
||||||
getAllSessions,
|
getAllSessions,
|
||||||
getAllTasks,
|
getAllTasks,
|
||||||
|
getLastBotMessageTimestamp,
|
||||||
getMessagesSince,
|
getMessagesSince,
|
||||||
getNewMessages,
|
getNewMessages,
|
||||||
getRouterState,
|
getRouterState,
|
||||||
@@ -112,6 +114,27 @@ function loadState(): void {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the message cursor for a group, recovering from the last bot reply
|
||||||
|
* if lastAgentTimestamp is missing (new group, corrupted state, restart).
|
||||||
|
*/
|
||||||
|
function getOrRecoverCursor(chatJid: string): string {
|
||||||
|
const existing = lastAgentTimestamp[chatJid];
|
||||||
|
if (existing) return existing;
|
||||||
|
|
||||||
|
const botTs = getLastBotMessageTimestamp(chatJid, ASSISTANT_NAME);
|
||||||
|
if (botTs) {
|
||||||
|
logger.info(
|
||||||
|
{ chatJid, recoveredFrom: botTs },
|
||||||
|
'Recovered message cursor from last bot reply',
|
||||||
|
);
|
||||||
|
lastAgentTimestamp[chatJid] = botTs;
|
||||||
|
saveState();
|
||||||
|
return botTs;
|
||||||
|
}
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
||||||
function saveState(): void {
|
function saveState(): void {
|
||||||
setRouterState('last_timestamp', lastTimestamp);
|
setRouterState('last_timestamp', lastTimestamp);
|
||||||
setRouterState('last_agent_timestamp', JSON.stringify(lastAgentTimestamp));
|
setRouterState('last_agent_timestamp', JSON.stringify(lastAgentTimestamp));
|
||||||
@@ -205,11 +228,11 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
|
|||||||
|
|
||||||
const isMainGroup = group.isMain === true;
|
const isMainGroup = group.isMain === true;
|
||||||
|
|
||||||
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
|
|
||||||
const missedMessages = getMessagesSince(
|
const missedMessages = getMessagesSince(
|
||||||
chatJid,
|
chatJid,
|
||||||
sinceTimestamp,
|
getOrRecoverCursor(chatJid),
|
||||||
ASSISTANT_NAME,
|
ASSISTANT_NAME,
|
||||||
|
MAX_MESSAGES_PER_PROMPT,
|
||||||
);
|
);
|
||||||
|
|
||||||
if (missedMessages.length === 0) return true;
|
if (missedMessages.length === 0) return true;
|
||||||
@@ -460,8 +483,9 @@ async function startMessageLoop(): Promise<void> {
|
|||||||
// context that accumulated between triggers is included.
|
// context that accumulated between triggers is included.
|
||||||
const allPending = getMessagesSince(
|
const allPending = getMessagesSince(
|
||||||
chatJid,
|
chatJid,
|
||||||
lastAgentTimestamp[chatJid] || '',
|
getOrRecoverCursor(chatJid),
|
||||||
ASSISTANT_NAME,
|
ASSISTANT_NAME,
|
||||||
|
MAX_MESSAGES_PER_PROMPT,
|
||||||
);
|
);
|
||||||
const messagesToSend =
|
const messagesToSend =
|
||||||
allPending.length > 0 ? allPending : groupMessages;
|
allPending.length > 0 ? allPending : groupMessages;
|
||||||
@@ -500,8 +524,12 @@ async function startMessageLoop(): Promise<void> {
|
|||||||
*/
|
*/
|
||||||
function recoverPendingMessages(): void {
|
function recoverPendingMessages(): void {
|
||||||
for (const [chatJid, group] of Object.entries(registeredGroups)) {
|
for (const [chatJid, group] of Object.entries(registeredGroups)) {
|
||||||
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
|
const pending = getMessagesSince(
|
||||||
const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME);
|
chatJid,
|
||||||
|
getOrRecoverCursor(chatJid),
|
||||||
|
ASSISTANT_NAME,
|
||||||
|
MAX_MESSAGES_PER_PROMPT,
|
||||||
|
);
|
||||||
if (pending.length > 0) {
|
if (pending.length > 0) {
|
||||||
logger.info(
|
logger.info(
|
||||||
{ group: group.name, pendingCount: pending.length },
|
{ group: group.name, pendingCount: pending.length },
|
||||||
|
|||||||
Reference in New Issue
Block a user