refactor: CI optimization, logging improvements, and codebase formatting (#456)

* fix(db): remove unique constraint on folder to support multi-channel agents

* ci: implement automated skill drift detection and self-healing PRs

* fix: align registration logic with Gavriel's feedback and fix build/test issues from Daniel Mi

* style: conform to prettier standards for CI validation

* test: fix branch naming inconsistency in CI (master vs main)

* fix(ci): robust module resolution by removing file extensions in scripts

* refactor(ci): simplify skill validation by removing redundant combination tests

* style: conform skills-engine to prettier, unify logging in index.ts and cleanup unused imports

* refactor: extract multi-channel DB changes to separate branch

Move channel column, folder suffix logic, and related migrations
to feat/multi-channel-db-v2 for independent review. This PR now
contains only CI/CD optimizations, Prettier formatting, and
logging improvements.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Gabi Simons
2026-02-25 23:13:36 +02:00
committed by GitHub
parent bd2e236f73
commit 11c201088b
76 changed files with 2333 additions and 1308 deletions

View File

@@ -84,7 +84,9 @@ vi.mock('@whiskeysockets/baileys', () => {
timedOut: 408,
restartRequired: 515,
},
fetchLatestWaWebVersion: vi.fn().mockResolvedValue({ version: [2, 3000, 0] }),
fetchLatestWaWebVersion: vi
.fn()
.mockResolvedValue({ version: [2, 3000, 0] }),
makeCacheableSignalKeyStore: vi.fn((keys: unknown) => keys),
useMultiFileAuthState: vi.fn().mockResolvedValue({
state: {
@@ -101,7 +103,9 @@ import { getLastGroupSync, updateChatName, setLastGroupSync } from '../db.js';
// --- Test helpers ---
function createTestOpts(overrides?: Partial<WhatsAppChannelOpts>): WhatsAppChannelOpts {
function createTestOpts(
overrides?: Partial<WhatsAppChannelOpts>,
): WhatsAppChannelOpts {
return {
onMessage: vi.fn(),
onChatMetadata: vi.fn(),
@@ -168,13 +172,17 @@ describe('WhatsAppChannel', () => {
const channel = new WhatsAppChannel(opts);
await connectChannel(channel);
const { fetchLatestWaWebVersion } = await import('@whiskeysockets/baileys');
const { fetchLatestWaWebVersion } =
await import('@whiskeysockets/baileys');
expect(fetchLatestWaWebVersion).toHaveBeenCalledWith({});
});
it('falls back gracefully when version fetch fails', async () => {
const { fetchLatestWaWebVersion } = await import('@whiskeysockets/baileys');
vi.mocked(fetchLatestWaWebVersion).mockRejectedValueOnce(new Error('network error'));
const { fetchLatestWaWebVersion } =
await import('@whiskeysockets/baileys');
vi.mocked(fetchLatestWaWebVersion).mockRejectedValueOnce(
new Error('network error'),
);
const opts = createTestOpts();
const channel = new WhatsAppChannel(opts);
@@ -226,10 +234,9 @@ describe('WhatsAppChannel', () => {
await (channel as any).flushOutgoingQueue();
// Group messages get prefixed when flushed
expect(fakeSocket.sendMessage).toHaveBeenCalledWith(
'test@g.us',
{ text: 'Andy: Queued message' },
);
expect(fakeSocket.sendMessage).toHaveBeenCalledWith('test@g.us', {
text: 'Andy: Queued message',
});
});
it('disconnects cleanly', async () => {
@@ -249,7 +256,9 @@ describe('WhatsAppChannel', () => {
describe('authentication', () => {
it('exits process when QR code is emitted (no auth state)', async () => {
vi.useFakeTimers();
const mockExit = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never);
const mockExit = vi
.spyOn(process, 'exit')
.mockImplementation(() => undefined as never);
const opts = createTestOpts();
const channel = new WhatsAppChannel(opts);
@@ -291,7 +300,9 @@ describe('WhatsAppChannel', () => {
});
it('exits on loggedOut disconnect', async () => {
const mockExit = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never);
const mockExit = vi
.spyOn(process, 'exit')
.mockImplementation(() => undefined as never);
const opts = createTestOpts();
const channel = new WhatsAppChannel(opts);
@@ -477,7 +488,10 @@ describe('WhatsAppChannel', () => {
fromMe: false,
},
message: {
imageMessage: { caption: 'Check this photo', mimetype: 'image/jpeg' },
imageMessage: {
caption: 'Check this photo',
mimetype: 'image/jpeg',
},
},
pushName: 'Diana',
messageTimestamp: Math.floor(Date.now() / 1000),
@@ -684,7 +698,9 @@ describe('WhatsAppChannel', () => {
await channel.sendMessage('test@g.us', 'Hello');
// Group messages get prefixed with assistant name
expect(fakeSocket.sendMessage).toHaveBeenCalledWith('test@g.us', { text: 'Andy: Hello' });
expect(fakeSocket.sendMessage).toHaveBeenCalledWith('test@g.us', {
text: 'Andy: Hello',
});
});
it('prefixes direct chat messages on shared number', async () => {
@@ -695,7 +711,10 @@ describe('WhatsAppChannel', () => {
await channel.sendMessage('123@s.whatsapp.net', 'Hello');
// Shared number: DMs also get prefixed (needed for self-chat distinction)
expect(fakeSocket.sendMessage).toHaveBeenCalledWith('123@s.whatsapp.net', { text: 'Andy: Hello' });
expect(fakeSocket.sendMessage).toHaveBeenCalledWith(
'123@s.whatsapp.net',
{ text: 'Andy: Hello' },
);
});
it('queues message when disconnected', async () => {
@@ -739,9 +758,15 @@ describe('WhatsAppChannel', () => {
expect(fakeSocket.sendMessage).toHaveBeenCalledTimes(3);
// Group messages get prefixed
expect(fakeSocket.sendMessage).toHaveBeenNthCalledWith(1, 'test@g.us', { text: 'Andy: First' });
expect(fakeSocket.sendMessage).toHaveBeenNthCalledWith(2, 'test@g.us', { text: 'Andy: Second' });
expect(fakeSocket.sendMessage).toHaveBeenNthCalledWith(3, 'test@g.us', { text: 'Andy: Third' });
expect(fakeSocket.sendMessage).toHaveBeenNthCalledWith(1, 'test@g.us', {
text: 'Andy: First',
});
expect(fakeSocket.sendMessage).toHaveBeenNthCalledWith(2, 'test@g.us', {
text: 'Andy: Second',
});
expect(fakeSocket.sendMessage).toHaveBeenNthCalledWith(3, 'test@g.us', {
text: 'Andy: Third',
});
});
});
@@ -874,7 +899,10 @@ describe('WhatsAppChannel', () => {
await connectChannel(channel);
await channel.setTyping('test@g.us', true);
expect(fakeSocket.sendPresenceUpdate).toHaveBeenCalledWith('composing', 'test@g.us');
expect(fakeSocket.sendPresenceUpdate).toHaveBeenCalledWith(
'composing',
'test@g.us',
);
});
it('sends paused presence when stopping', async () => {
@@ -884,7 +912,10 @@ describe('WhatsAppChannel', () => {
await connectChannel(channel);
await channel.setTyping('test@g.us', false);
expect(fakeSocket.sendPresenceUpdate).toHaveBeenCalledWith('paused', 'test@g.us');
expect(fakeSocket.sendPresenceUpdate).toHaveBeenCalledWith(
'paused',
'test@g.us',
);
});
it('handles typing indicator failure gracefully', async () => {
@@ -896,7 +927,9 @@ describe('WhatsAppChannel', () => {
fakeSocket.sendPresenceUpdate.mockRejectedValueOnce(new Error('Failed'));
// Should not throw
await expect(channel.setTyping('test@g.us', true)).resolves.toBeUndefined();
await expect(
channel.setTyping('test@g.us', true),
).resolves.toBeUndefined();
});
});

View File

@@ -11,14 +11,19 @@ import makeWASocket, {
useMultiFileAuthState,
} from '@whiskeysockets/baileys';
import { ASSISTANT_HAS_OWN_NUMBER, ASSISTANT_NAME, STORE_DIR } from '../config.js';
import {
getLastGroupSync,
setLastGroupSync,
updateChatName,
} from '../db.js';
ASSISTANT_HAS_OWN_NUMBER,
ASSISTANT_NAME,
STORE_DIR,
} from '../config.js';
import { getLastGroupSync, setLastGroupSync, updateChatName } from '../db.js';
import { logger } from '../logger.js';
import { Channel, OnInboundMessage, OnChatMetadata, RegisteredGroup } from '../types.js';
import {
Channel,
OnInboundMessage,
OnChatMetadata,
RegisteredGroup,
} from '../types.js';
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
@@ -57,7 +62,10 @@ export class WhatsAppChannel implements Channel {
const { state, saveCreds } = await useMultiFileAuthState(authDir);
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
logger.warn({ err }, 'Failed to fetch latest WA Web version, using default');
logger.warn(
{ err },
'Failed to fetch latest WA Web version, using default',
);
return { version: undefined };
});
this.sock = makeWASocket({
@@ -86,9 +94,18 @@ export class WhatsAppChannel implements Channel {
if (connection === 'close') {
this.connected = false;
const reason = (lastDisconnect?.error as { output?: { statusCode?: number } })?.output?.statusCode;
const reason = (
lastDisconnect?.error as { output?: { statusCode?: number } }
)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info({ reason, shouldReconnect, queuedMessages: this.outgoingQueue.length }, 'Connection closed');
logger.info(
{
reason,
shouldReconnect,
queuedMessages: this.outgoingQueue.length,
},
'Connection closed',
);
if (shouldReconnect) {
logger.info('Reconnecting...');
@@ -167,7 +184,13 @@ export class WhatsAppChannel implements Channel {
// Always notify about chat metadata for group discovery
const isGroup = chatJid.endsWith('@g.us');
this.opts.onChatMetadata(chatJid, timestamp, undefined, 'whatsapp', isGroup);
this.opts.onChatMetadata(
chatJid,
timestamp,
undefined,
'whatsapp',
isGroup,
);
// Only deliver full message for registered groups
const groups = this.opts.registeredGroups();
@@ -220,7 +243,10 @@ export class WhatsAppChannel implements Channel {
if (!this.connected) {
this.outgoingQueue.push({ jid, text: prefixed });
logger.info({ jid, length: prefixed.length, queueSize: this.outgoingQueue.length }, 'WA disconnected, message queued');
logger.info(
{ jid, length: prefixed.length, queueSize: this.outgoingQueue.length },
'WA disconnected, message queued',
);
return;
}
try {
@@ -229,7 +255,10 @@ export class WhatsAppChannel implements Channel {
} catch (err) {
// If send fails, queue it for retry on reconnect
this.outgoingQueue.push({ jid, text: prefixed });
logger.warn({ jid, err, queueSize: this.outgoingQueue.length }, 'Failed to send, message queued');
logger.warn(
{ jid, err, queueSize: this.outgoingQueue.length },
'Failed to send, message queued',
);
}
}
@@ -299,7 +328,10 @@ export class WhatsAppChannel implements Channel {
// Check local cache first
const cached = this.lidToPhoneMap[lidUser];
if (cached) {
logger.debug({ lidJid: jid, phoneJid: cached }, 'Translated LID to phone JID (cached)');
logger.debug(
{ lidJid: jid, phoneJid: cached },
'Translated LID to phone JID (cached)',
);
return cached;
}
@@ -309,7 +341,10 @@ export class WhatsAppChannel implements Channel {
if (pn) {
const phoneJid = `${pn.split('@')[0].split(':')[0]}@s.whatsapp.net`;
this.lidToPhoneMap[lidUser] = phoneJid;
logger.info({ lidJid: jid, phoneJid }, 'Translated LID to phone JID (signalRepository)');
logger.info(
{ lidJid: jid, phoneJid },
'Translated LID to phone JID (signalRepository)',
);
return phoneJid;
}
} catch (err) {
@@ -323,12 +358,18 @@ export class WhatsAppChannel implements Channel {
if (this.flushing || this.outgoingQueue.length === 0) return;
this.flushing = true;
try {
logger.info({ count: this.outgoingQueue.length }, 'Flushing outgoing message queue');
logger.info(
{ count: this.outgoingQueue.length },
'Flushing outgoing message queue',
);
while (this.outgoingQueue.length > 0) {
const item = this.outgoingQueue.shift()!;
// Send directly — queued items are already prefixed by sendMessage
await this.sock.sendMessage(item.jid, { text: item.text });
logger.info({ jid: item.jid, length: item.text.length }, 'Queued message sent');
logger.info(
{ jid: item.jid, length: item.text.length },
'Queued message sent',
);
}
} finally {
this.flushing = false;

View File

@@ -6,15 +6,13 @@ import { readEnvFile } from './env.js';
// Read config values from .env (falls back to process.env).
// Secrets are NOT read here — they stay on disk and are loaded only
// where needed (container-runner.ts) to avoid leaking to child processes.
const envConfig = readEnvFile([
'ASSISTANT_NAME',
'ASSISTANT_HAS_OWN_NUMBER',
]);
const envConfig = readEnvFile(['ASSISTANT_NAME', 'ASSISTANT_HAS_OWN_NUMBER']);
export const ASSISTANT_NAME =
process.env.ASSISTANT_NAME || envConfig.ASSISTANT_NAME || 'Andy';
export const ASSISTANT_HAS_OWN_NUMBER =
(process.env.ASSISTANT_HAS_OWN_NUMBER || envConfig.ASSISTANT_HAS_OWN_NUMBER) === 'true';
(process.env.ASSISTANT_HAS_OWN_NUMBER ||
envConfig.ASSISTANT_HAS_OWN_NUMBER) === 'true';
export const POLL_INTERVAL = 2000;
export const SCHEDULER_POLL_INTERVAL = 60000;
@@ -45,10 +43,7 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
10,
); // 10MB default
export const IPC_POLL_INTERVAL = 1000;
export const IDLE_TIMEOUT = parseInt(
process.env.IDLE_TIMEOUT || '1800000',
10,
); // 30min default — how long to keep container alive after last result
export const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '1800000', 10); // 30min default — how long to keep container alive after last result
export const MAX_CONCURRENT_CONTAINERS = Math.max(
1,
parseInt(process.env.MAX_CONCURRENT_CONTAINERS || '5', 10) || 5,

View File

@@ -71,14 +71,17 @@ let fakeProc: ReturnType<typeof createFakeProcess>;
// Mock child_process.spawn
vi.mock('child_process', async () => {
const actual = await vi.importActual<typeof import('child_process')>('child_process');
const actual =
await vi.importActual<typeof import('child_process')>('child_process');
return {
...actual,
spawn: vi.fn(() => fakeProc),
exec: vi.fn((_cmd: string, _opts: unknown, cb?: (err: Error | null) => void) => {
if (cb) cb(null);
return new EventEmitter();
}),
exec: vi.fn(
(_cmd: string, _opts: unknown, cb?: (err: Error | null) => void) => {
if (cb) cb(null);
return new EventEmitter();
},
),
};
});
@@ -99,7 +102,10 @@ const testInput = {
isMain: false,
};
function emitOutputMarker(proc: ReturnType<typeof createFakeProcess>, output: ContainerOutput) {
function emitOutputMarker(
proc: ReturnType<typeof createFakeProcess>,
output: ContainerOutput,
) {
const json = JSON.stringify(output);
proc.stdout.push(`${OUTPUT_START_MARKER}\n${json}\n${OUTPUT_END_MARKER}\n`);
}

View File

@@ -18,7 +18,11 @@ import {
import { readEnvFile } from './env.js';
import { resolveGroupFolderPath, resolveGroupIpcPath } from './group-folder.js';
import { logger } from './logger.js';
import { CONTAINER_RUNTIME_BIN, readonlyMountArgs, stopContainer } from './container-runtime.js';
import {
CONTAINER_RUNTIME_BIN,
readonlyMountArgs,
stopContainer,
} from './container-runtime.js';
import { validateAdditionalMounts } from './mount-security.js';
import { RegisteredGroup } from './types.js';
@@ -107,19 +111,26 @@ function buildVolumeMounts(
fs.mkdirSync(groupSessionsDir, { recursive: true });
const settingsFile = path.join(groupSessionsDir, 'settings.json');
if (!fs.existsSync(settingsFile)) {
fs.writeFileSync(settingsFile, JSON.stringify({
env: {
// Enable agent swarms (subagent orchestration)
// https://code.claude.com/docs/en/agent-teams#orchestrate-teams-of-claude-code-sessions
CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS: '1',
// Load CLAUDE.md from additional mounted directories
// https://code.claude.com/docs/en/memory#load-memory-from-additional-directories
CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1',
// Enable Claude's memory feature (persists user preferences between sessions)
// https://code.claude.com/docs/en/memory#manage-auto-memory
CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0',
},
}, null, 2) + '\n');
fs.writeFileSync(
settingsFile,
JSON.stringify(
{
env: {
// Enable agent swarms (subagent orchestration)
// https://code.claude.com/docs/en/agent-teams#orchestrate-teams-of-claude-code-sessions
CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS: '1',
// Load CLAUDE.md from additional mounted directories
// https://code.claude.com/docs/en/memory#load-memory-from-additional-directories
CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1',
// Enable Claude's memory feature (persists user preferences between sessions)
// https://code.claude.com/docs/en/memory#manage-auto-memory
CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0',
},
},
null,
2,
) + '\n',
);
}
// Sync skills from container/skills/ into each group's .claude/skills/
@@ -154,8 +165,18 @@ function buildVolumeMounts(
// Copy agent-runner source into a per-group writable location so agents
// can customize it (add tools, change behavior) without affecting other
// groups. Recompiled on container startup via entrypoint.sh.
const agentRunnerSrc = path.join(projectRoot, 'container', 'agent-runner', 'src');
const groupAgentRunnerDir = path.join(DATA_DIR, 'sessions', group.folder, 'agent-runner-src');
const agentRunnerSrc = path.join(
projectRoot,
'container',
'agent-runner',
'src',
);
const groupAgentRunnerDir = path.join(
DATA_DIR,
'sessions',
group.folder,
'agent-runner-src',
);
if (!fs.existsSync(groupAgentRunnerDir) && fs.existsSync(agentRunnerSrc)) {
fs.cpSync(agentRunnerSrc, groupAgentRunnerDir, { recursive: true });
}
@@ -186,7 +207,10 @@ function readSecrets(): Record<string, string> {
return readEnvFile(['CLAUDE_CODE_OAUTH_TOKEN', 'ANTHROPIC_API_KEY']);
}
function buildContainerArgs(mounts: VolumeMount[], containerName: string): string[] {
function buildContainerArgs(
mounts: VolumeMount[],
containerName: string,
): string[] {
const args: string[] = ['run', '-i', '--rm', '--name', containerName];
// Pass host timezone so container's local time matches the user's
@@ -364,10 +388,16 @@ export async function runContainerAgent(
const killOnTimeout = () => {
timedOut = true;
logger.error({ group: group.name, containerName }, 'Container timeout, stopping gracefully');
logger.error(
{ group: group.name, containerName },
'Container timeout, stopping gracefully',
);
exec(stopContainer(containerName), { timeout: 15000 }, (err) => {
if (err) {
logger.warn({ group: group.name, containerName, err }, 'Graceful stop failed, force killing');
logger.warn(
{ group: group.name, containerName, err },
'Graceful stop failed, force killing',
);
container.kill('SIGKILL');
}
});
@@ -388,15 +418,18 @@ export async function runContainerAgent(
if (timedOut) {
const ts = new Date().toISOString().replace(/[:.]/g, '-');
const timeoutLog = path.join(logsDir, `container-${ts}.log`);
fs.writeFileSync(timeoutLog, [
`=== Container Run Log (TIMEOUT) ===`,
`Timestamp: ${new Date().toISOString()}`,
`Group: ${group.name}`,
`Container: ${containerName}`,
`Duration: ${duration}ms`,
`Exit Code: ${code}`,
`Had Streaming Output: ${hadStreamingOutput}`,
].join('\n'));
fs.writeFileSync(
timeoutLog,
[
`=== Container Run Log (TIMEOUT) ===`,
`Timestamp: ${new Date().toISOString()}`,
`Group: ${group.name}`,
`Container: ${containerName}`,
`Duration: ${duration}ms`,
`Exit Code: ${code}`,
`Had Streaming Output: ${hadStreamingOutput}`,
].join('\n'),
);
// Timeout after output = idle cleanup, not failure.
// The agent already sent its response; this is just the
@@ -431,7 +464,8 @@ export async function runContainerAgent(
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const logFile = path.join(logsDir, `container-${timestamp}.log`);
const isVerbose = process.env.LOG_LEVEL === 'debug' || process.env.LOG_LEVEL === 'trace';
const isVerbose =
process.env.LOG_LEVEL === 'debug' || process.env.LOG_LEVEL === 'trace';
const logLines = [
`=== Container Run Log ===`,
@@ -574,7 +608,10 @@ export async function runContainerAgent(
container.on('error', (err) => {
clearTimeout(timeout);
logger.error({ group: group.name, containerName, error: err }, 'Container spawn error');
logger.error(
{ group: group.name, containerName, error: err },
'Container spawn error',
);
resolve({
status: 'error',
result: null,

View File

@@ -55,11 +55,13 @@ describe('ensureContainerRuntimeRunning', () => {
ensureContainerRuntimeRunning();
expect(mockExecSync).toHaveBeenCalledTimes(1);
expect(mockExecSync).toHaveBeenCalledWith(
`${CONTAINER_RUNTIME_BIN} info`,
{ stdio: 'pipe', timeout: 10000 },
expect(mockExecSync).toHaveBeenCalledWith(`${CONTAINER_RUNTIME_BIN} info`, {
stdio: 'pipe',
timeout: 10000,
});
expect(logger.debug).toHaveBeenCalledWith(
'Container runtime already running',
);
expect(logger.debug).toHaveBeenCalledWith('Container runtime already running');
});
it('throws when docker info fails', () => {
@@ -79,7 +81,9 @@ describe('ensureContainerRuntimeRunning', () => {
describe('cleanupOrphans', () => {
it('stops orphaned nanoclaw containers', () => {
// docker ps returns container names, one per line
mockExecSync.mockReturnValueOnce('nanoclaw-group1-111\nnanoclaw-group2-222\n');
mockExecSync.mockReturnValueOnce(
'nanoclaw-group1-111\nnanoclaw-group2-222\n',
);
// stop calls succeed
mockExecSync.mockReturnValue('');

View File

@@ -10,7 +10,10 @@ import { logger } from './logger.js';
export const CONTAINER_RUNTIME_BIN = 'docker';
/** Returns CLI args for a readonly bind mount. */
export function readonlyMountArgs(hostPath: string, containerPath: string): string[] {
export function readonlyMountArgs(
hostPath: string,
containerPath: string,
): string[] {
return ['-v', `${hostPath}:${containerPath}:ro`];
}
@@ -22,7 +25,10 @@ export function stopContainer(name: string): string {
/** Ensure the container runtime is running, starting it if needed. */
export function ensureContainerRuntimeRunning(): void {
try {
execSync(`${CONTAINER_RUNTIME_BIN} info`, { stdio: 'pipe', timeout: 10000 });
execSync(`${CONTAINER_RUNTIME_BIN} info`, {
stdio: 'pipe',
timeout: 10000,
});
logger.debug('Container runtime already running');
} catch (err) {
logger.error({ err }, 'Failed to reach container runtime');
@@ -65,10 +71,15 @@ export function cleanupOrphans(): void {
for (const name of orphans) {
try {
execSync(stopContainer(name), { stdio: 'pipe' });
} catch { /* already stopped */ }
} catch {
/* already stopped */
}
}
if (orphans.length > 0) {
logger.info({ count: orphans.length, names: orphans }, 'Stopped orphaned containers');
logger.info(
{ count: orphans.length, names: orphans },
'Stopped orphaned containers',
);
}
} catch (err) {
logger.warn({ err }, 'Failed to clean up orphaned containers');

View File

@@ -53,7 +53,11 @@ describe('storeMessage', () => {
timestamp: '2024-01-01T00:00:01.000Z',
});
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'Andy');
const messages = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:00.000Z',
'Andy',
);
expect(messages).toHaveLength(1);
expect(messages[0].id).toBe('msg-1');
expect(messages[0].sender).toBe('123@s.whatsapp.net');
@@ -73,7 +77,11 @@ describe('storeMessage', () => {
timestamp: '2024-01-01T00:00:04.000Z',
});
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'Andy');
const messages = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:00.000Z',
'Andy',
);
expect(messages).toHaveLength(0);
});
@@ -91,7 +99,11 @@ describe('storeMessage', () => {
});
// Message is stored (we can retrieve it — is_from_me doesn't affect retrieval)
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'Andy');
const messages = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:00.000Z',
'Andy',
);
expect(messages).toHaveLength(1);
});
@@ -116,7 +128,11 @@ describe('storeMessage', () => {
timestamp: '2024-01-01T00:00:01.000Z',
});
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'Andy');
const messages = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:00.000Z',
'Andy',
);
expect(messages).toHaveLength(1);
expect(messages[0].content).toBe('updated');
});
@@ -129,33 +145,57 @@ describe('getMessagesSince', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
store({
id: 'm1', chat_jid: 'group@g.us', sender: 'Alice@s.whatsapp.net',
sender_name: 'Alice', content: 'first', timestamp: '2024-01-01T00:00:01.000Z',
id: 'm1',
chat_jid: 'group@g.us',
sender: 'Alice@s.whatsapp.net',
sender_name: 'Alice',
content: 'first',
timestamp: '2024-01-01T00:00:01.000Z',
});
store({
id: 'm2', chat_jid: 'group@g.us', sender: 'Bob@s.whatsapp.net',
sender_name: 'Bob', content: 'second', timestamp: '2024-01-01T00:00:02.000Z',
id: 'm2',
chat_jid: 'group@g.us',
sender: 'Bob@s.whatsapp.net',
sender_name: 'Bob',
content: 'second',
timestamp: '2024-01-01T00:00:02.000Z',
});
storeMessage({
id: 'm3', chat_jid: 'group@g.us', sender: 'Bot@s.whatsapp.net',
sender_name: 'Bot', content: 'bot reply', timestamp: '2024-01-01T00:00:03.000Z',
id: 'm3',
chat_jid: 'group@g.us',
sender: 'Bot@s.whatsapp.net',
sender_name: 'Bot',
content: 'bot reply',
timestamp: '2024-01-01T00:00:03.000Z',
is_bot_message: true,
});
store({
id: 'm4', chat_jid: 'group@g.us', sender: 'Carol@s.whatsapp.net',
sender_name: 'Carol', content: 'third', timestamp: '2024-01-01T00:00:04.000Z',
id: 'm4',
chat_jid: 'group@g.us',
sender: 'Carol@s.whatsapp.net',
sender_name: 'Carol',
content: 'third',
timestamp: '2024-01-01T00:00:04.000Z',
});
});
it('returns messages after the given timestamp', () => {
const msgs = getMessagesSince('group@g.us', '2024-01-01T00:00:02.000Z', 'Andy');
const msgs = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:02.000Z',
'Andy',
);
// Should exclude m1, m2 (before/at timestamp), m3 (bot message)
expect(msgs).toHaveLength(1);
expect(msgs[0].content).toBe('third');
});
it('excludes bot messages via is_bot_message flag', () => {
const msgs = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'Andy');
const msgs = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:00.000Z',
'Andy',
);
const botMsgs = msgs.filter((m) => m.content === 'bot reply');
expect(botMsgs).toHaveLength(0);
});
@@ -169,11 +209,18 @@ describe('getMessagesSince', () => {
it('filters pre-migration bot messages via content prefix backstop', () => {
// Simulate a message written before migration: has prefix but is_bot_message = 0
store({
id: 'm5', chat_jid: 'group@g.us', sender: 'Bot@s.whatsapp.net',
sender_name: 'Bot', content: 'Andy: old bot reply',
id: 'm5',
chat_jid: 'group@g.us',
sender: 'Bot@s.whatsapp.net',
sender_name: 'Bot',
content: 'Andy: old bot reply',
timestamp: '2024-01-01T00:00:05.000Z',
});
const msgs = getMessagesSince('group@g.us', '2024-01-01T00:00:04.000Z', 'Andy');
const msgs = getMessagesSince(
'group@g.us',
'2024-01-01T00:00:04.000Z',
'Andy',
);
expect(msgs).toHaveLength(0);
});
});
@@ -186,21 +233,37 @@ describe('getNewMessages', () => {
storeChatMetadata('group2@g.us', '2024-01-01T00:00:00.000Z');
store({
id: 'a1', chat_jid: 'group1@g.us', sender: 'user@s.whatsapp.net',
sender_name: 'User', content: 'g1 msg1', timestamp: '2024-01-01T00:00:01.000Z',
id: 'a1',
chat_jid: 'group1@g.us',
sender: 'user@s.whatsapp.net',
sender_name: 'User',
content: 'g1 msg1',
timestamp: '2024-01-01T00:00:01.000Z',
});
store({
id: 'a2', chat_jid: 'group2@g.us', sender: 'user@s.whatsapp.net',
sender_name: 'User', content: 'g2 msg1', timestamp: '2024-01-01T00:00:02.000Z',
id: 'a2',
chat_jid: 'group2@g.us',
sender: 'user@s.whatsapp.net',
sender_name: 'User',
content: 'g2 msg1',
timestamp: '2024-01-01T00:00:02.000Z',
});
storeMessage({
id: 'a3', chat_jid: 'group1@g.us', sender: 'user@s.whatsapp.net',
sender_name: 'User', content: 'bot reply', timestamp: '2024-01-01T00:00:03.000Z',
id: 'a3',
chat_jid: 'group1@g.us',
sender: 'user@s.whatsapp.net',
sender_name: 'User',
content: 'bot reply',
timestamp: '2024-01-01T00:00:03.000Z',
is_bot_message: true,
});
store({
id: 'a4', chat_jid: 'group1@g.us', sender: 'user@s.whatsapp.net',
sender_name: 'User', content: 'g1 msg2', timestamp: '2024-01-01T00:00:04.000Z',
id: 'a4',
chat_jid: 'group1@g.us',
sender: 'user@s.whatsapp.net',
sender_name: 'User',
content: 'g1 msg2',
timestamp: '2024-01-01T00:00:04.000Z',
});
});

View File

@@ -5,7 +5,12 @@ import path from 'path';
import { ASSISTANT_NAME, DATA_DIR, STORE_DIR } from './config.js';
import { isValidGroupFolder } from './group-folder.js';
import { logger } from './logger.js';
import { NewMessage, RegisteredGroup, ScheduledTask, TaskRunLog } from './types.js';
import {
NewMessage,
RegisteredGroup,
ScheduledTask,
TaskRunLog,
} from './types.js';
let db: Database.Database;
@@ -94,26 +99,30 @@ function createSchema(database: Database.Database): void {
`ALTER TABLE messages ADD COLUMN is_bot_message INTEGER DEFAULT 0`,
);
// Backfill: mark existing bot messages that used the content prefix pattern
database.prepare(
`UPDATE messages SET is_bot_message = 1 WHERE content LIKE ?`,
).run(`${ASSISTANT_NAME}:%`);
database
.prepare(`UPDATE messages SET is_bot_message = 1 WHERE content LIKE ?`)
.run(`${ASSISTANT_NAME}:%`);
} catch {
/* column already exists */
}
// Add channel and is_group columns if they don't exist (migration for existing DBs)
try {
database.exec(
`ALTER TABLE chats ADD COLUMN channel TEXT`,
);
database.exec(
`ALTER TABLE chats ADD COLUMN is_group INTEGER DEFAULT 0`,
);
database.exec(`ALTER TABLE chats ADD COLUMN channel TEXT`);
database.exec(`ALTER TABLE chats ADD COLUMN is_group INTEGER DEFAULT 0`);
// Backfill from JID patterns
database.exec(`UPDATE chats SET channel = 'whatsapp', is_group = 1 WHERE jid LIKE '%@g.us'`);
database.exec(`UPDATE chats SET channel = 'whatsapp', is_group = 0 WHERE jid LIKE '%@s.whatsapp.net'`);
database.exec(`UPDATE chats SET channel = 'discord', is_group = 1 WHERE jid LIKE 'dc:%'`);
database.exec(`UPDATE chats SET channel = 'telegram', is_group = 1 WHERE jid LIKE 'tg:%'`);
database.exec(
`UPDATE chats SET channel = 'whatsapp', is_group = 1 WHERE jid LIKE '%@g.us'`,
);
database.exec(
`UPDATE chats SET channel = 'whatsapp', is_group = 0 WHERE jid LIKE '%@s.whatsapp.net'`,
);
database.exec(
`UPDATE chats SET channel = 'discord', is_group = 1 WHERE jid LIKE 'dc:%'`,
);
database.exec(
`UPDATE chats SET channel = 'telegram', is_group = 1 WHERE jid LIKE 'tg:%'`,
);
} catch {
/* columns already exist */
}
@@ -540,14 +549,12 @@ export function getRegisteredGroup(
containerConfig: row.container_config
? JSON.parse(row.container_config)
: undefined,
requiresTrigger: row.requires_trigger === null ? undefined : row.requires_trigger === 1,
requiresTrigger:
row.requires_trigger === null ? undefined : row.requires_trigger === 1,
};
}
export function setRegisteredGroup(
jid: string,
group: RegisteredGroup,
): void {
export function setRegisteredGroup(jid: string, group: RegisteredGroup): void {
if (!isValidGroupFolder(group.folder)) {
throw new Error(`Invalid group folder "${group.folder}" for JID ${jid}`);
}
@@ -566,9 +573,7 @@ export function setRegisteredGroup(
}
export function getAllRegisteredGroups(): Record<string, RegisteredGroup> {
const rows = db
.prepare('SELECT * FROM registered_groups')
.all() as Array<{
const rows = db.prepare('SELECT * FROM registered_groups').all() as Array<{
jid: string;
name: string;
folder: string;
@@ -594,7 +599,8 @@ export function getAllRegisteredGroups(): Record<string, RegisteredGroup> {
containerConfig: row.container_config
? JSON.parse(row.container_config)
: undefined,
requiresTrigger: row.requires_trigger === null ? undefined : row.requires_trigger === 1,
requiresTrigger:
row.requires_trigger === null ? undefined : row.requires_trigger === 1,
};
}
return result;

View File

@@ -69,7 +69,12 @@ describe('formatMessages', () => {
it('formats multiple messages', () => {
const msgs = [
makeMsg({ id: '1', sender_name: 'Alice', content: 'hi', timestamp: 't1' }),
makeMsg({
id: '1',
sender_name: 'Alice',
content: 'hi',
timestamp: 't1',
}),
makeMsg({ id: '2', sender_name: 'Bob', content: 'hey', timestamp: 't2' }),
];
const result = formatMessages(msgs);
@@ -154,9 +159,7 @@ describe('stripInternalTags', () => {
it('strips multiple internal tag blocks', () => {
expect(
stripInternalTags(
'<internal>a</internal>hello<internal>b</internal>',
),
stripInternalTags('<internal>a</internal>hello<internal>b</internal>'),
).toBe('hello');
});

View File

@@ -2,7 +2,11 @@ import path from 'path';
import { describe, expect, it } from 'vitest';
import { isValidGroupFolder, resolveGroupFolderPath, resolveGroupIpcPath } from './group-folder.js';
import {
isValidGroupFolder,
resolveGroupFolderPath,
resolveGroupIpcPath,
} from './group-folder.js';
describe('group folder validation', () => {
it('accepts normal group folder names', () => {
@@ -20,9 +24,9 @@ describe('group folder validation', () => {
it('resolves safe paths under groups directory', () => {
const resolved = resolveGroupFolderPath('family-chat');
expect(
resolved.endsWith(`${path.sep}groups${path.sep}family-chat`),
).toBe(true);
expect(resolved.endsWith(`${path.sep}groups${path.sep}family-chat`)).toBe(
true,
);
});
it('resolves safe paths under data ipc directory', () => {

View File

@@ -263,7 +263,12 @@ describe('GroupQueue', () => {
await vi.advanceTimersByTimeAsync(10);
// Register a process so closeStdin has a groupFolder
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
queue.registerProcess(
'group1@g.us',
{} as any,
'container-1',
'test-group',
);
// Enqueue a task while container is active but NOT idle
const taskFn = vi.fn(async () => {});
@@ -298,7 +303,12 @@ describe('GroupQueue', () => {
await vi.advanceTimersByTimeAsync(10);
// Register process and mark idle
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
queue.registerProcess(
'group1@g.us',
{} as any,
'container-1',
'test-group',
);
queue.notifyIdle('group1@g.us');
// Clear previous writes, then enqueue a task
@@ -332,7 +342,12 @@ describe('GroupQueue', () => {
queue.setProcessMessagesFn(processMessages);
queue.enqueueMessageCheck('group1@g.us');
await vi.advanceTimersByTimeAsync(10);
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
queue.registerProcess(
'group1@g.us',
{} as any,
'container-1',
'test-group',
);
// Container becomes idle
queue.notifyIdle('group1@g.us');
@@ -368,7 +383,12 @@ describe('GroupQueue', () => {
// Start a task (sets isTaskContainer = true)
queue.enqueueTask('group1@g.us', 'task-1', taskFn);
await vi.advanceTimersByTimeAsync(10);
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
queue.registerProcess(
'group1@g.us',
{} as any,
'container-1',
'test-group',
);
// sendMessage should return false — user messages must not go to task containers
const result = queue.sendMessage('group1@g.us', 'hello');
@@ -396,7 +416,12 @@ describe('GroupQueue', () => {
await vi.advanceTimersByTimeAsync(10);
// Register process and enqueue a task (no idle yet — no preemption)
queue.registerProcess('group1@g.us', {} as any, 'container-1', 'test-group');
queue.registerProcess(
'group1@g.us',
{} as any,
'container-1',
'test-group',
);
const writeFileSync = vi.mocked(fs.default.writeFileSync);
writeFileSync.mockClear();

View File

@@ -123,7 +123,12 @@ export class GroupQueue {
);
}
registerProcess(groupJid: string, proc: ChildProcess, containerName: string, groupFolder?: string): void {
registerProcess(
groupJid: string,
proc: ChildProcess,
containerName: string,
groupFolder?: string,
): void {
const state = this.getGroup(groupJid);
state.process = proc;
state.containerName = containerName;
@@ -148,7 +153,8 @@ export class GroupQueue {
*/
sendMessage(groupJid: string, text: string): boolean {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder || state.isTaskContainer) return false;
if (!state.active || !state.groupFolder || state.isTaskContainer)
return false;
state.idleWaiting = false; // Agent is about to receive work, no longer idle
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
@@ -278,7 +284,10 @@ export class GroupQueue {
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(groupJid, task).catch((err) =>
logger.error({ groupJid, taskId: task.id, err }, 'Unhandled error in runTask (drain)'),
logger.error(
{ groupJid, taskId: task.id, err },
'Unhandled error in runTask (drain)',
),
);
return;
}
@@ -286,7 +295,10 @@ export class GroupQueue {
// Then pending messages
if (state.pendingMessages) {
this.runForGroup(groupJid, 'drain').catch((err) =>
logger.error({ groupJid, err }, 'Unhandled error in runForGroup (drain)'),
logger.error(
{ groupJid, err },
'Unhandled error in runForGroup (drain)',
),
);
return;
}
@@ -307,11 +319,17 @@ export class GroupQueue {
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(nextJid, task).catch((err) =>
logger.error({ groupJid: nextJid, taskId: task.id, err }, 'Unhandled error in runTask (waiting)'),
logger.error(
{ groupJid: nextJid, taskId: task.id, err },
'Unhandled error in runTask (waiting)',
),
);
} else if (state.pendingMessages) {
this.runForGroup(nextJid, 'drain').catch((err) =>
logger.error({ groupJid: nextJid, err }, 'Unhandled error in runForGroup (waiting)'),
logger.error(
{ groupJid: nextJid, err },
'Unhandled error in runForGroup (waiting)',
),
);
}
// If neither pending, skip this group

View File

@@ -15,7 +15,10 @@ import {
writeGroupsSnapshot,
writeTasksSnapshot,
} from './container-runner.js';
import { cleanupOrphans, ensureContainerRuntimeRunning } from './container-runtime.js';
import {
cleanupOrphans,
ensureContainerRuntimeRunning,
} from './container-runtime.js';
import {
getAllChats,
getAllRegisteredGroups,
@@ -71,10 +74,7 @@ function loadState(): void {
function saveState(): void {
setRouterState('last_timestamp', lastTimestamp);
setRouterState(
'last_agent_timestamp',
JSON.stringify(lastAgentTimestamp),
);
setRouterState('last_agent_timestamp', JSON.stringify(lastAgentTimestamp));
}
function registerGroup(jid: string, group: RegisteredGroup): void {
@@ -120,7 +120,9 @@ export function getAvailableGroups(): import('./container-runner.js').AvailableG
}
/** @internal - exported for testing */
export function _setRegisteredGroups(groups: Record<string, RegisteredGroup>): void {
export function _setRegisteredGroups(
groups: Record<string, RegisteredGroup>,
): void {
registeredGroups = groups;
}
@@ -134,14 +136,18 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
const channel = findChannel(channels, chatJid);
if (!channel) {
console.log(`Warning: no channel owns JID ${chatJid}, skipping messages`);
logger.warn({ chatJid }, 'No channel owns JID, skipping messages');
return true;
}
const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
const missedMessages = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME);
const missedMessages = getMessagesSince(
chatJid,
sinceTimestamp,
ASSISTANT_NAME,
);
if (missedMessages.length === 0) return true;
@@ -173,7 +179,10 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
const resetIdleTimer = () => {
if (idleTimer) clearTimeout(idleTimer);
idleTimer = setTimeout(() => {
logger.debug({ group: group.name }, 'Idle timeout, closing container stdin');
logger.debug(
{ group: group.name },
'Idle timeout, closing container stdin',
);
queue.closeStdin(chatJid);
}, IDLE_TIMEOUT);
};
@@ -185,7 +194,10 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
const output = await runAgent(group, prompt, chatJid, async (result) => {
// Streaming output callback — called for each agent result
if (result.result) {
const raw = typeof result.result === 'string' ? result.result : JSON.stringify(result.result);
const raw =
typeof result.result === 'string'
? result.result
: JSON.stringify(result.result);
// Strip <internal>...</internal> blocks — agent uses these for internal reasoning
const text = raw.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`);
@@ -213,13 +225,19 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
// If we already sent output to the user, don't roll back the cursor —
// the user got their response and re-processing would send duplicates.
if (outputSentToUser) {
logger.warn({ group: group.name }, 'Agent error after output was sent, skipping cursor rollback to prevent duplicates');
logger.warn(
{ group: group.name },
'Agent error after output was sent, skipping cursor rollback to prevent duplicates',
);
return true;
}
// Roll back cursor so retries can re-process these messages
lastAgentTimestamp[chatJid] = previousCursor;
saveState();
logger.warn({ group: group.name }, 'Agent error, rolled back message cursor for retry');
logger.warn(
{ group: group.name },
'Agent error, rolled back message cursor for retry',
);
return false;
}
@@ -282,7 +300,8 @@ async function runAgent(
isMain,
assistantName: ASSISTANT_NAME,
},
(proc, containerName) => queue.registerProcess(chatJid, proc, containerName, group.folder),
(proc, containerName) =>
queue.registerProcess(chatJid, proc, containerName, group.folder),
wrappedOnOutput,
);
@@ -318,7 +337,11 @@ async function startMessageLoop(): Promise<void> {
while (true) {
try {
const jids = Object.keys(registeredGroups);
const { messages, newTimestamp } = getNewMessages(jids, lastTimestamp, ASSISTANT_NAME);
const { messages, newTimestamp } = getNewMessages(
jids,
lastTimestamp,
ASSISTANT_NAME,
);
if (messages.length > 0) {
logger.info({ count: messages.length }, 'New messages');
@@ -344,7 +367,7 @@ async function startMessageLoop(): Promise<void> {
const channel = findChannel(channels, chatJid);
if (!channel) {
console.log(`Warning: no channel owns JID ${chatJid}, skipping messages`);
logger.warn({ chatJid }, 'No channel owns JID, skipping messages');
continue;
}
@@ -381,9 +404,11 @@ async function startMessageLoop(): Promise<void> {
messagesToSend[messagesToSend.length - 1].timestamp;
saveState();
// Show typing indicator while the container processes the piped message
channel.setTyping?.(chatJid, true)?.catch((err) =>
logger.warn({ chatJid, err }, 'Failed to set typing indicator'),
);
channel
.setTyping?.(chatJid, true)
?.catch((err) =>
logger.warn({ chatJid, err }, 'Failed to set typing indicator'),
);
} else {
// No active container — enqueue for a new one
queue.enqueueMessageCheck(chatJid);
@@ -439,8 +464,13 @@ async function main(): Promise<void> {
// Channel callbacks (shared by all channels)
const channelOpts = {
onMessage: (_chatJid: string, msg: NewMessage) => storeMessage(msg),
onChatMetadata: (chatJid: string, timestamp: string, name?: string, channel?: string, isGroup?: boolean) =>
storeChatMetadata(chatJid, timestamp, name, channel, isGroup),
onChatMetadata: (
chatJid: string,
timestamp: string,
name?: string,
channel?: string,
isGroup?: boolean,
) => storeChatMetadata(chatJid, timestamp, name, channel, isGroup),
registeredGroups: () => registeredGroups,
};
@@ -454,11 +484,12 @@ async function main(): Promise<void> {
registeredGroups: () => registeredGroups,
getSessions: () => sessions,
queue,
onProcess: (groupJid, proc, containerName, groupFolder) => queue.registerProcess(groupJid, proc, containerName, groupFolder),
onProcess: (groupJid, proc, containerName, groupFolder) =>
queue.registerProcess(groupJid, proc, containerName, groupFolder),
sendMessage: async (jid, rawText) => {
const channel = findChannel(channels, jid);
if (!channel) {
console.log(`Warning: no channel owns JID ${jid}, cannot send message`);
logger.warn({ jid }, 'No channel owns JID, cannot send message');
return;
}
const text = formatOutbound(rawText);
@@ -473,9 +504,11 @@ async function main(): Promise<void> {
},
registeredGroups: () => registeredGroups,
registerGroup,
syncGroupMetadata: (force) => whatsapp?.syncGroupMetadata(force) ?? Promise.resolve(),
syncGroupMetadata: (force) =>
whatsapp?.syncGroupMetadata(force) ?? Promise.resolve(),
getAvailableGroups,
writeGroupsSnapshot: (gf, im, ag, rj) => writeGroupsSnapshot(gf, im, ag, rj),
writeGroupsSnapshot: (gf, im, ag, rj) =>
writeGroupsSnapshot(gf, im, ag, rj),
});
queue.setProcessMessagesFn(processGroupMessages);
recoverPendingMessages();
@@ -488,7 +521,8 @@ async function main(): Promise<void> {
// Guard: only run when executed directly, not when imported by tests
const isDirectRun =
process.argv[1] &&
new URL(import.meta.url).pathname === new URL(`file://${process.argv[1]}`).pathname;
new URL(import.meta.url).pathname ===
new URL(`file://${process.argv[1]}`).pathname;
if (isDirectRun) {
main().catch((err) => {

View File

@@ -174,17 +174,32 @@ describe('pause_task authorization', () => {
});
it('main group can pause any task', async () => {
await processTaskIpc({ type: 'pause_task', taskId: 'task-other' }, 'main', true, deps);
await processTaskIpc(
{ type: 'pause_task', taskId: 'task-other' },
'main',
true,
deps,
);
expect(getTaskById('task-other')!.status).toBe('paused');
});
it('non-main group can pause its own task', async () => {
await processTaskIpc({ type: 'pause_task', taskId: 'task-other' }, 'other-group', false, deps);
await processTaskIpc(
{ type: 'pause_task', taskId: 'task-other' },
'other-group',
false,
deps,
);
expect(getTaskById('task-other')!.status).toBe('paused');
});
it('non-main group cannot pause another groups task', async () => {
await processTaskIpc({ type: 'pause_task', taskId: 'task-main' }, 'other-group', false, deps);
await processTaskIpc(
{ type: 'pause_task', taskId: 'task-main' },
'other-group',
false,
deps,
);
expect(getTaskById('task-main')!.status).toBe('active');
});
});
@@ -208,17 +223,32 @@ describe('resume_task authorization', () => {
});
it('main group can resume any task', async () => {
await processTaskIpc({ type: 'resume_task', taskId: 'task-paused' }, 'main', true, deps);
await processTaskIpc(
{ type: 'resume_task', taskId: 'task-paused' },
'main',
true,
deps,
);
expect(getTaskById('task-paused')!.status).toBe('active');
});
it('non-main group can resume its own task', async () => {
await processTaskIpc({ type: 'resume_task', taskId: 'task-paused' }, 'other-group', false, deps);
await processTaskIpc(
{ type: 'resume_task', taskId: 'task-paused' },
'other-group',
false,
deps,
);
expect(getTaskById('task-paused')!.status).toBe('active');
});
it('non-main group cannot resume another groups task', async () => {
await processTaskIpc({ type: 'resume_task', taskId: 'task-paused' }, 'third-group', false, deps);
await processTaskIpc(
{ type: 'resume_task', taskId: 'task-paused' },
'third-group',
false,
deps,
);
expect(getTaskById('task-paused')!.status).toBe('paused');
});
});
@@ -240,7 +270,12 @@ describe('cancel_task authorization', () => {
created_at: '2024-01-01T00:00:00.000Z',
});
await processTaskIpc({ type: 'cancel_task', taskId: 'task-to-cancel' }, 'main', true, deps);
await processTaskIpc(
{ type: 'cancel_task', taskId: 'task-to-cancel' },
'main',
true,
deps,
);
expect(getTaskById('task-to-cancel')).toBeUndefined();
});
@@ -258,7 +293,12 @@ describe('cancel_task authorization', () => {
created_at: '2024-01-01T00:00:00.000Z',
});
await processTaskIpc({ type: 'cancel_task', taskId: 'task-own' }, 'other-group', false, deps);
await processTaskIpc(
{ type: 'cancel_task', taskId: 'task-own' },
'other-group',
false,
deps,
);
expect(getTaskById('task-own')).toBeUndefined();
});
@@ -276,7 +316,12 @@ describe('cancel_task authorization', () => {
created_at: '2024-01-01T00:00:00.000Z',
});
await processTaskIpc({ type: 'cancel_task', taskId: 'task-foreign' }, 'other-group', false, deps);
await processTaskIpc(
{ type: 'cancel_task', taskId: 'task-foreign' },
'other-group',
false,
deps,
);
expect(getTaskById('task-foreign')).toBeDefined();
});
});
@@ -325,7 +370,12 @@ describe('register_group authorization', () => {
describe('refresh_groups authorization', () => {
it('non-main group cannot trigger refresh', async () => {
// This should be silently blocked (no crash, no effect)
await processTaskIpc({ type: 'refresh_groups' }, 'other-group', false, deps);
await processTaskIpc(
{ type: 'refresh_groups' },
'other-group',
false,
deps,
);
// If we got here without error, the auth gate worked
});
});
@@ -352,21 +402,31 @@ describe('IPC message authorization', () => {
});
it('non-main group can send to its own chat', () => {
expect(isMessageAuthorized('other-group', false, 'other@g.us', groups)).toBe(true);
expect(
isMessageAuthorized('other-group', false, 'other@g.us', groups),
).toBe(true);
});
it('non-main group cannot send to another groups chat', () => {
expect(isMessageAuthorized('other-group', false, 'main@g.us', groups)).toBe(false);
expect(isMessageAuthorized('other-group', false, 'third@g.us', groups)).toBe(false);
expect(isMessageAuthorized('other-group', false, 'main@g.us', groups)).toBe(
false,
);
expect(
isMessageAuthorized('other-group', false, 'third@g.us', groups),
).toBe(false);
});
it('non-main group cannot send to unregistered JID', () => {
expect(isMessageAuthorized('other-group', false, 'unknown@g.us', groups)).toBe(false);
expect(
isMessageAuthorized('other-group', false, 'unknown@g.us', groups),
).toBe(false);
});
it('main group can send to unregistered JID', () => {
// Main is always authorized regardless of target
expect(isMessageAuthorized('main', true, 'unknown@g.us', groups)).toBe(true);
expect(isMessageAuthorized('main', true, 'unknown@g.us', groups)).toBe(
true,
);
});
});
@@ -392,7 +452,9 @@ describe('schedule_task schedule types', () => {
expect(tasks[0].schedule_type).toBe('cron');
expect(tasks[0].next_run).toBeTruthy();
// next_run should be a valid ISO date in the future
expect(new Date(tasks[0].next_run!).getTime()).toBeGreaterThan(Date.now() - 60000);
expect(new Date(tasks[0].next_run!).getTime()).toBeGreaterThan(
Date.now() - 60000,
);
});
it('rejects invalid cron expression', async () => {

View File

@@ -10,8 +10,9 @@ export function escapeXml(s: string): string {
}
export function formatMessages(messages: NewMessage[]): string {
const lines = messages.map((m) =>
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`,
const lines = messages.map(
(m) =>
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`,
);
return `<messages>\n${lines.join('\n')}\n</messages>`;
}

View File

@@ -28,9 +28,27 @@ describe('JID ownership patterns', () => {
describe('getAvailableGroups', () => {
it('returns only groups, excludes DMs', () => {
storeChatMetadata('group1@g.us', '2024-01-01T00:00:01.000Z', 'Group 1', 'whatsapp', true);
storeChatMetadata('user@s.whatsapp.net', '2024-01-01T00:00:02.000Z', 'User DM', 'whatsapp', false);
storeChatMetadata('group2@g.us', '2024-01-01T00:00:03.000Z', 'Group 2', 'whatsapp', true);
storeChatMetadata(
'group1@g.us',
'2024-01-01T00:00:01.000Z',
'Group 1',
'whatsapp',
true,
);
storeChatMetadata(
'user@s.whatsapp.net',
'2024-01-01T00:00:02.000Z',
'User DM',
'whatsapp',
false,
);
storeChatMetadata(
'group2@g.us',
'2024-01-01T00:00:03.000Z',
'Group 2',
'whatsapp',
true,
);
const groups = getAvailableGroups();
expect(groups).toHaveLength(2);
@@ -41,7 +59,13 @@ describe('getAvailableGroups', () => {
it('excludes __group_sync__ sentinel', () => {
storeChatMetadata('__group_sync__', '2024-01-01T00:00:00.000Z');
storeChatMetadata('group@g.us', '2024-01-01T00:00:01.000Z', 'Group', 'whatsapp', true);
storeChatMetadata(
'group@g.us',
'2024-01-01T00:00:01.000Z',
'Group',
'whatsapp',
true,
);
const groups = getAvailableGroups();
expect(groups).toHaveLength(1);
@@ -49,8 +73,20 @@ describe('getAvailableGroups', () => {
});
it('marks registered groups correctly', () => {
storeChatMetadata('reg@g.us', '2024-01-01T00:00:01.000Z', 'Registered', 'whatsapp', true);
storeChatMetadata('unreg@g.us', '2024-01-01T00:00:02.000Z', 'Unregistered', 'whatsapp', true);
storeChatMetadata(
'reg@g.us',
'2024-01-01T00:00:01.000Z',
'Registered',
'whatsapp',
true,
);
storeChatMetadata(
'unreg@g.us',
'2024-01-01T00:00:02.000Z',
'Unregistered',
'whatsapp',
true,
);
_setRegisteredGroups({
'reg@g.us': {
@@ -70,9 +106,27 @@ describe('getAvailableGroups', () => {
});
it('returns groups ordered by most recent activity', () => {
storeChatMetadata('old@g.us', '2024-01-01T00:00:01.000Z', 'Old', 'whatsapp', true);
storeChatMetadata('new@g.us', '2024-01-01T00:00:05.000Z', 'New', 'whatsapp', true);
storeChatMetadata('mid@g.us', '2024-01-01T00:00:03.000Z', 'Mid', 'whatsapp', true);
storeChatMetadata(
'old@g.us',
'2024-01-01T00:00:01.000Z',
'Old',
'whatsapp',
true,
);
storeChatMetadata(
'new@g.us',
'2024-01-01T00:00:05.000Z',
'New',
'whatsapp',
true,
);
storeChatMetadata(
'mid@g.us',
'2024-01-01T00:00:03.000Z',
'Mid',
'whatsapp',
true,
);
const groups = getAvailableGroups();
expect(groups[0].jid).toBe('new@g.us');
@@ -82,11 +136,27 @@ describe('getAvailableGroups', () => {
it('excludes non-group chats regardless of JID format', () => {
// Unknown JID format stored without is_group should not appear
storeChatMetadata('unknown-format-123', '2024-01-01T00:00:01.000Z', 'Unknown');
storeChatMetadata(
'unknown-format-123',
'2024-01-01T00:00:01.000Z',
'Unknown',
);
// Explicitly non-group with unusual JID
storeChatMetadata('custom:abc', '2024-01-01T00:00:02.000Z', 'Custom DM', 'custom', false);
storeChatMetadata(
'custom:abc',
'2024-01-01T00:00:02.000Z',
'Custom DM',
'custom',
false,
);
// A real group for contrast
storeChatMetadata('group@g.us', '2024-01-01T00:00:03.000Z', 'Group', 'whatsapp', true);
storeChatMetadata(
'group@g.us',
'2024-01-01T00:00:03.000Z',
'Group',
'whatsapp',
true,
);
const groups = getAvailableGroups();
expect(groups).toHaveLength(1);

View File

@@ -4,12 +4,15 @@ import fs from 'fs';
import {
ASSISTANT_NAME,
IDLE_TIMEOUT,
MAIN_GROUP_FOLDER,
SCHEDULER_POLL_INTERVAL,
TIMEZONE,
} from './config.js';
import { ContainerOutput, runContainerAgent, writeTasksSnapshot } from './container-runner.js';
import {
ContainerOutput,
runContainerAgent,
writeTasksSnapshot,
} from './container-runner.js';
import {
getAllTasks,
getDueTasks,
@@ -27,7 +30,12 @@ export interface SchedulerDependencies {
registeredGroups: () => Record<string, RegisteredGroup>;
getSessions: () => Record<string, string>;
queue: GroupQueue;
onProcess: (groupJid: string, proc: ChildProcess, containerName: string, groupFolder: string) => void;
onProcess: (
groupJid: string,
proc: ChildProcess,
containerName: string,
groupFolder: string,
) => void;
sendMessage: (jid: string, text: string) => Promise<void>;
}
@@ -136,7 +144,8 @@ async function runTask(
isScheduledTask: true,
assistantName: ASSISTANT_NAME,
},
(proc, containerName) => deps.onProcess(task.chat_jid, proc, containerName, task.group_folder),
(proc, containerName) =>
deps.onProcess(task.chat_jid, proc, containerName, task.group_folder),
async (streamedOutput: ContainerOutput) => {
if (streamedOutput.result) {
result = streamedOutput.result;
@@ -227,10 +236,8 @@ export function startSchedulerLoop(deps: SchedulerDependencies): void {
continue;
}
deps.queue.enqueueTask(
currentTask.chat_jid,
currentTask.id,
() => runTask(currentTask, deps),
deps.queue.enqueueTask(currentTask.chat_jid, currentTask.id, () =>
runTask(currentTask, deps),
);
}
} catch (err) {

View File

@@ -33,7 +33,10 @@ const usePairingCode = process.argv.includes('--pairing-code');
const phoneArg = process.argv.find((_, i, arr) => arr[i - 1] === '--phone');
function askQuestion(prompt: string): Promise<string> {
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
return new Promise((resolve) => {
rl.question(prompt, (answer) => {
rl.close();
@@ -42,7 +45,10 @@ function askQuestion(prompt: string): Promise<string> {
});
}
async function connectSocket(phoneNumber?: string, isReconnect = false): Promise<void> {
async function connectSocket(
phoneNumber?: string,
isReconnect = false,
): Promise<void> {
const { state, saveCreds } = await useMultiFileAuthState(AUTH_DIR);
if (state.creds.registered && !isReconnect) {
@@ -55,7 +61,10 @@ async function connectSocket(phoneNumber?: string, isReconnect = false): Promise
}
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
logger.warn({ err }, 'Failed to fetch latest WA Web version, using default');
logger.warn(
{ err },
'Failed to fetch latest WA Web version, using default',
);
return { version: undefined };
});
const sock = makeWASocket({
@@ -127,7 +136,9 @@ async function connectSocket(phoneNumber?: string, isReconnect = false): Promise
if (connection === 'open') {
fs.writeFileSync(STATUS_FILE, 'authenticated');
// Clean up QR file now that we're connected
try { fs.unlinkSync(QR_FILE); } catch {}
try {
fs.unlinkSync(QR_FILE);
} catch {}
console.log('\n✓ Successfully authenticated with WhatsApp!');
console.log(' Credentials saved to store/auth/');
console.log(' You can now start the NanoClaw service.\n');
@@ -144,12 +155,18 @@ async function authenticate(): Promise<void> {
fs.mkdirSync(AUTH_DIR, { recursive: true });
// Clean up any stale QR/status files from previous runs
try { fs.unlinkSync(QR_FILE); } catch {}
try { fs.unlinkSync(STATUS_FILE); } catch {}
try {
fs.unlinkSync(QR_FILE);
} catch {}
try {
fs.unlinkSync(STATUS_FILE);
} catch {}
let phoneNumber = phoneArg;
if (usePairingCode && !phoneNumber) {
phoneNumber = await askQuestion('Enter your phone number (with country code, no + or spaces, e.g. 14155551234): ');
phoneNumber = await askQuestion(
'Enter your phone number (with country code, no + or spaces, e.g. 14155551234): ',
);
}
console.log('Starting WhatsApp authentication...\n');