feat(skills): add image vision skill for WhatsApp (#770)
* chore: prepare image-vision skill for template regeneration - Delete stale modify/*.ts templates (built against 1.1.2) - Update core_version to 1.2.6 - Strip fork-specific details from intent docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat(skills): regenerate image-vision modify/ templates against upstream Templates regenerated against upstream 1.2.6: - src/container-runner.ts: imageAttachments field in ContainerInput - src/index.ts: parseImageReferences + threading to runAgent - src/channels/whatsapp.ts: downloadMediaMessage + image handling block - src/channels/whatsapp.test.ts: image mocks + 4 test cases - container/agent-runner/src/index.ts: ContentBlock types, pushMultimodal, image loading Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test: update image-vision tests for upstream templates - Relax downloadMediaMessage import pattern check (multi-line import) - Remove check for [Image - processing failed] (not in upstream template) - Add vitest.skills.config.ts for skill package test runs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: update image-vision core_version to 1.2.8 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,626 @@
|
||||
/**
|
||||
* NanoClaw Agent Runner
|
||||
* Runs inside a container, receives config via stdin, outputs result to stdout
|
||||
*
|
||||
* Input protocol:
|
||||
* Stdin: Full ContainerInput JSON (read until EOF, like before)
|
||||
* IPC: Follow-up messages written as JSON files to /workspace/ipc/input/
|
||||
* Files: {type:"message", text:"..."}.json — polled and consumed
|
||||
* Sentinel: /workspace/ipc/input/_close — signals session end
|
||||
*
|
||||
* Stdout protocol:
|
||||
* Each result is wrapped in OUTPUT_START_MARKER / OUTPUT_END_MARKER pairs.
|
||||
* Multiple results may be emitted (one per agent teams result).
|
||||
* Final marker after loop ends signals completion.
|
||||
*/
|
||||
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import { query, HookCallback, PreCompactHookInput, PreToolUseHookInput } from '@anthropic-ai/claude-agent-sdk';
|
||||
import { fileURLToPath } from 'url';
|
||||
|
||||
interface ContainerInput {
|
||||
prompt: string;
|
||||
sessionId?: string;
|
||||
groupFolder: string;
|
||||
chatJid: string;
|
||||
isMain: boolean;
|
||||
isScheduledTask?: boolean;
|
||||
assistantName?: string;
|
||||
secrets?: Record<string, string>;
|
||||
imageAttachments?: Array<{ relativePath: string; mediaType: string }>;
|
||||
}
|
||||
|
||||
interface ImageContentBlock {
|
||||
type: 'image';
|
||||
source: { type: 'base64'; media_type: string; data: string };
|
||||
}
|
||||
interface TextContentBlock {
|
||||
type: 'text';
|
||||
text: string;
|
||||
}
|
||||
type ContentBlock = ImageContentBlock | TextContentBlock;
|
||||
|
||||
interface ContainerOutput {
|
||||
status: 'success' | 'error';
|
||||
result: string | null;
|
||||
newSessionId?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
interface SessionEntry {
|
||||
sessionId: string;
|
||||
fullPath: string;
|
||||
summary: string;
|
||||
firstPrompt: string;
|
||||
}
|
||||
|
||||
interface SessionsIndex {
|
||||
entries: SessionEntry[];
|
||||
}
|
||||
|
||||
interface SDKUserMessage {
|
||||
type: 'user';
|
||||
message: { role: 'user'; content: string | ContentBlock[] };
|
||||
parent_tool_use_id: null;
|
||||
session_id: string;
|
||||
}
|
||||
|
||||
const IPC_INPUT_DIR = '/workspace/ipc/input';
|
||||
const IPC_INPUT_CLOSE_SENTINEL = path.join(IPC_INPUT_DIR, '_close');
|
||||
const IPC_POLL_MS = 500;
|
||||
|
||||
/**
|
||||
* Push-based async iterable for streaming user messages to the SDK.
|
||||
* Keeps the iterable alive until end() is called, preventing isSingleUserTurn.
|
||||
*/
|
||||
class MessageStream {
|
||||
private queue: SDKUserMessage[] = [];
|
||||
private waiting: (() => void) | null = null;
|
||||
private done = false;
|
||||
|
||||
push(text: string): void {
|
||||
this.queue.push({
|
||||
type: 'user',
|
||||
message: { role: 'user', content: text },
|
||||
parent_tool_use_id: null,
|
||||
session_id: '',
|
||||
});
|
||||
this.waiting?.();
|
||||
}
|
||||
|
||||
pushMultimodal(content: ContentBlock[]): void {
|
||||
this.queue.push({
|
||||
type: 'user',
|
||||
message: { role: 'user', content },
|
||||
parent_tool_use_id: null,
|
||||
session_id: '',
|
||||
});
|
||||
this.waiting?.();
|
||||
}
|
||||
|
||||
end(): void {
|
||||
this.done = true;
|
||||
this.waiting?.();
|
||||
}
|
||||
|
||||
async *[Symbol.asyncIterator](): AsyncGenerator<SDKUserMessage> {
|
||||
while (true) {
|
||||
while (this.queue.length > 0) {
|
||||
yield this.queue.shift()!;
|
||||
}
|
||||
if (this.done) return;
|
||||
await new Promise<void>(r => { this.waiting = r; });
|
||||
this.waiting = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function readStdin(): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let data = '';
|
||||
process.stdin.setEncoding('utf8');
|
||||
process.stdin.on('data', chunk => { data += chunk; });
|
||||
process.stdin.on('end', () => resolve(data));
|
||||
process.stdin.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
const OUTPUT_START_MARKER = '---NANOCLAW_OUTPUT_START---';
|
||||
const OUTPUT_END_MARKER = '---NANOCLAW_OUTPUT_END---';
|
||||
|
||||
function writeOutput(output: ContainerOutput): void {
|
||||
console.log(OUTPUT_START_MARKER);
|
||||
console.log(JSON.stringify(output));
|
||||
console.log(OUTPUT_END_MARKER);
|
||||
}
|
||||
|
||||
function log(message: string): void {
|
||||
console.error(`[agent-runner] ${message}`);
|
||||
}
|
||||
|
||||
function getSessionSummary(sessionId: string, transcriptPath: string): string | null {
|
||||
const projectDir = path.dirname(transcriptPath);
|
||||
const indexPath = path.join(projectDir, 'sessions-index.json');
|
||||
|
||||
if (!fs.existsSync(indexPath)) {
|
||||
log(`Sessions index not found at ${indexPath}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const index: SessionsIndex = JSON.parse(fs.readFileSync(indexPath, 'utf-8'));
|
||||
const entry = index.entries.find(e => e.sessionId === sessionId);
|
||||
if (entry?.summary) {
|
||||
return entry.summary;
|
||||
}
|
||||
} catch (err) {
|
||||
log(`Failed to read sessions index: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Archive the full transcript to conversations/ before compaction.
|
||||
*/
|
||||
function createPreCompactHook(assistantName?: string): HookCallback {
|
||||
return async (input, _toolUseId, _context) => {
|
||||
const preCompact = input as PreCompactHookInput;
|
||||
const transcriptPath = preCompact.transcript_path;
|
||||
const sessionId = preCompact.session_id;
|
||||
|
||||
if (!transcriptPath || !fs.existsSync(transcriptPath)) {
|
||||
log('No transcript found for archiving');
|
||||
return {};
|
||||
}
|
||||
|
||||
try {
|
||||
const content = fs.readFileSync(transcriptPath, 'utf-8');
|
||||
const messages = parseTranscript(content);
|
||||
|
||||
if (messages.length === 0) {
|
||||
log('No messages to archive');
|
||||
return {};
|
||||
}
|
||||
|
||||
const summary = getSessionSummary(sessionId, transcriptPath);
|
||||
const name = summary ? sanitizeFilename(summary) : generateFallbackName();
|
||||
|
||||
const conversationsDir = '/workspace/group/conversations';
|
||||
fs.mkdirSync(conversationsDir, { recursive: true });
|
||||
|
||||
const date = new Date().toISOString().split('T')[0];
|
||||
const filename = `${date}-${name}.md`;
|
||||
const filePath = path.join(conversationsDir, filename);
|
||||
|
||||
const markdown = formatTranscriptMarkdown(messages, summary, assistantName);
|
||||
fs.writeFileSync(filePath, markdown);
|
||||
|
||||
log(`Archived conversation to ${filePath}`);
|
||||
} catch (err) {
|
||||
log(`Failed to archive transcript: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
|
||||
return {};
|
||||
};
|
||||
}
|
||||
|
||||
// Secrets to strip from Bash tool subprocess environments.
|
||||
// These are needed by claude-code for API auth but should never
|
||||
// be visible to commands Kit runs.
|
||||
const SECRET_ENV_VARS = ['ANTHROPIC_API_KEY', 'CLAUDE_CODE_OAUTH_TOKEN'];
|
||||
|
||||
function createSanitizeBashHook(): HookCallback {
|
||||
return async (input, _toolUseId, _context) => {
|
||||
const preInput = input as PreToolUseHookInput;
|
||||
const command = (preInput.tool_input as { command?: string })?.command;
|
||||
if (!command) return {};
|
||||
|
||||
const unsetPrefix = `unset ${SECRET_ENV_VARS.join(' ')} 2>/dev/null; `;
|
||||
return {
|
||||
hookSpecificOutput: {
|
||||
hookEventName: 'PreToolUse',
|
||||
updatedInput: {
|
||||
...(preInput.tool_input as Record<string, unknown>),
|
||||
command: unsetPrefix + command,
|
||||
},
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
function sanitizeFilename(summary: string): string {
|
||||
return summary
|
||||
.toLowerCase()
|
||||
.replace(/[^a-z0-9]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '')
|
||||
.slice(0, 50);
|
||||
}
|
||||
|
||||
function generateFallbackName(): string {
|
||||
const time = new Date();
|
||||
return `conversation-${time.getHours().toString().padStart(2, '0')}${time.getMinutes().toString().padStart(2, '0')}`;
|
||||
}
|
||||
|
||||
interface ParsedMessage {
|
||||
role: 'user' | 'assistant';
|
||||
content: string;
|
||||
}
|
||||
|
||||
function parseTranscript(content: string): ParsedMessage[] {
|
||||
const messages: ParsedMessage[] = [];
|
||||
|
||||
for (const line of content.split('\n')) {
|
||||
if (!line.trim()) continue;
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
if (entry.type === 'user' && entry.message?.content) {
|
||||
const text = typeof entry.message.content === 'string'
|
||||
? entry.message.content
|
||||
: entry.message.content.map((c: { text?: string }) => c.text || '').join('');
|
||||
if (text) messages.push({ role: 'user', content: text });
|
||||
} else if (entry.type === 'assistant' && entry.message?.content) {
|
||||
const textParts = entry.message.content
|
||||
.filter((c: { type: string }) => c.type === 'text')
|
||||
.map((c: { text: string }) => c.text);
|
||||
const text = textParts.join('');
|
||||
if (text) messages.push({ role: 'assistant', content: text });
|
||||
}
|
||||
} catch {
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
function formatTranscriptMarkdown(messages: ParsedMessage[], title?: string | null, assistantName?: string): string {
|
||||
const now = new Date();
|
||||
const formatDateTime = (d: Date) => d.toLocaleString('en-US', {
|
||||
month: 'short',
|
||||
day: 'numeric',
|
||||
hour: 'numeric',
|
||||
minute: '2-digit',
|
||||
hour12: true
|
||||
});
|
||||
|
||||
const lines: string[] = [];
|
||||
lines.push(`# ${title || 'Conversation'}`);
|
||||
lines.push('');
|
||||
lines.push(`Archived: ${formatDateTime(now)}`);
|
||||
lines.push('');
|
||||
lines.push('---');
|
||||
lines.push('');
|
||||
|
||||
for (const msg of messages) {
|
||||
const sender = msg.role === 'user' ? 'User' : (assistantName || 'Assistant');
|
||||
const content = msg.content.length > 2000
|
||||
? msg.content.slice(0, 2000) + '...'
|
||||
: msg.content;
|
||||
lines.push(`**${sender}**: ${content}`);
|
||||
lines.push('');
|
||||
}
|
||||
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
/**
|
||||
* Check for _close sentinel.
|
||||
*/
|
||||
function shouldClose(): boolean {
|
||||
if (fs.existsSync(IPC_INPUT_CLOSE_SENTINEL)) {
|
||||
try { fs.unlinkSync(IPC_INPUT_CLOSE_SENTINEL); } catch { /* ignore */ }
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain all pending IPC input messages.
|
||||
* Returns messages found, or empty array.
|
||||
*/
|
||||
function drainIpcInput(): string[] {
|
||||
try {
|
||||
fs.mkdirSync(IPC_INPUT_DIR, { recursive: true });
|
||||
const files = fs.readdirSync(IPC_INPUT_DIR)
|
||||
.filter(f => f.endsWith('.json'))
|
||||
.sort();
|
||||
|
||||
const messages: string[] = [];
|
||||
for (const file of files) {
|
||||
const filePath = path.join(IPC_INPUT_DIR, file);
|
||||
try {
|
||||
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
|
||||
fs.unlinkSync(filePath);
|
||||
if (data.type === 'message' && data.text) {
|
||||
messages.push(data.text);
|
||||
}
|
||||
} catch (err) {
|
||||
log(`Failed to process input file ${file}: ${err instanceof Error ? err.message : String(err)}`);
|
||||
try { fs.unlinkSync(filePath); } catch { /* ignore */ }
|
||||
}
|
||||
}
|
||||
return messages;
|
||||
} catch (err) {
|
||||
log(`IPC drain error: ${err instanceof Error ? err.message : String(err)}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a new IPC message or _close sentinel.
|
||||
* Returns the messages as a single string, or null if _close.
|
||||
*/
|
||||
function waitForIpcMessage(): Promise<string | null> {
|
||||
return new Promise((resolve) => {
|
||||
const poll = () => {
|
||||
if (shouldClose()) {
|
||||
resolve(null);
|
||||
return;
|
||||
}
|
||||
const messages = drainIpcInput();
|
||||
if (messages.length > 0) {
|
||||
resolve(messages.join('\n'));
|
||||
return;
|
||||
}
|
||||
setTimeout(poll, IPC_POLL_MS);
|
||||
};
|
||||
poll();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single query and stream results via writeOutput.
|
||||
* Uses MessageStream (AsyncIterable) to keep isSingleUserTurn=false,
|
||||
* allowing agent teams subagents to run to completion.
|
||||
* Also pipes IPC messages into the stream during the query.
|
||||
*/
|
||||
async function runQuery(
|
||||
prompt: string,
|
||||
sessionId: string | undefined,
|
||||
mcpServerPath: string,
|
||||
containerInput: ContainerInput,
|
||||
sdkEnv: Record<string, string | undefined>,
|
||||
resumeAt?: string,
|
||||
): Promise<{ newSessionId?: string; lastAssistantUuid?: string; closedDuringQuery: boolean }> {
|
||||
const stream = new MessageStream();
|
||||
stream.push(prompt);
|
||||
|
||||
// Load image attachments and send as multimodal content blocks
|
||||
if (containerInput.imageAttachments?.length) {
|
||||
const blocks: ContentBlock[] = [];
|
||||
for (const img of containerInput.imageAttachments) {
|
||||
const imgPath = path.join('/workspace/group', img.relativePath);
|
||||
try {
|
||||
const data = fs.readFileSync(imgPath).toString('base64');
|
||||
blocks.push({ type: 'image', source: { type: 'base64', media_type: img.mediaType, data } });
|
||||
} catch (err) {
|
||||
log(`Failed to load image: ${imgPath}`);
|
||||
}
|
||||
}
|
||||
if (blocks.length > 0) {
|
||||
stream.pushMultimodal(blocks);
|
||||
}
|
||||
}
|
||||
|
||||
// Poll IPC for follow-up messages and _close sentinel during the query
|
||||
let ipcPolling = true;
|
||||
let closedDuringQuery = false;
|
||||
const pollIpcDuringQuery = () => {
|
||||
if (!ipcPolling) return;
|
||||
if (shouldClose()) {
|
||||
log('Close sentinel detected during query, ending stream');
|
||||
closedDuringQuery = true;
|
||||
stream.end();
|
||||
ipcPolling = false;
|
||||
return;
|
||||
}
|
||||
const messages = drainIpcInput();
|
||||
for (const text of messages) {
|
||||
log(`Piping IPC message into active query (${text.length} chars)`);
|
||||
stream.push(text);
|
||||
}
|
||||
setTimeout(pollIpcDuringQuery, IPC_POLL_MS);
|
||||
};
|
||||
setTimeout(pollIpcDuringQuery, IPC_POLL_MS);
|
||||
|
||||
let newSessionId: string | undefined;
|
||||
let lastAssistantUuid: string | undefined;
|
||||
let messageCount = 0;
|
||||
let resultCount = 0;
|
||||
|
||||
// Load global CLAUDE.md as additional system context (shared across all groups)
|
||||
const globalClaudeMdPath = '/workspace/global/CLAUDE.md';
|
||||
let globalClaudeMd: string | undefined;
|
||||
if (!containerInput.isMain && fs.existsSync(globalClaudeMdPath)) {
|
||||
globalClaudeMd = fs.readFileSync(globalClaudeMdPath, 'utf-8');
|
||||
}
|
||||
|
||||
// Discover additional directories mounted at /workspace/extra/*
|
||||
// These are passed to the SDK so their CLAUDE.md files are loaded automatically
|
||||
const extraDirs: string[] = [];
|
||||
const extraBase = '/workspace/extra';
|
||||
if (fs.existsSync(extraBase)) {
|
||||
for (const entry of fs.readdirSync(extraBase)) {
|
||||
const fullPath = path.join(extraBase, entry);
|
||||
if (fs.statSync(fullPath).isDirectory()) {
|
||||
extraDirs.push(fullPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (extraDirs.length > 0) {
|
||||
log(`Additional directories: ${extraDirs.join(', ')}`);
|
||||
}
|
||||
|
||||
for await (const message of query({
|
||||
prompt: stream,
|
||||
options: {
|
||||
cwd: '/workspace/group',
|
||||
additionalDirectories: extraDirs.length > 0 ? extraDirs : undefined,
|
||||
resume: sessionId,
|
||||
resumeSessionAt: resumeAt,
|
||||
systemPrompt: globalClaudeMd
|
||||
? { type: 'preset' as const, preset: 'claude_code' as const, append: globalClaudeMd }
|
||||
: undefined,
|
||||
allowedTools: [
|
||||
'Bash',
|
||||
'Read', 'Write', 'Edit', 'Glob', 'Grep',
|
||||
'WebSearch', 'WebFetch',
|
||||
'Task', 'TaskOutput', 'TaskStop',
|
||||
'TeamCreate', 'TeamDelete', 'SendMessage',
|
||||
'TodoWrite', 'ToolSearch', 'Skill',
|
||||
'NotebookEdit',
|
||||
'mcp__nanoclaw__*'
|
||||
],
|
||||
env: sdkEnv,
|
||||
permissionMode: 'bypassPermissions',
|
||||
allowDangerouslySkipPermissions: true,
|
||||
settingSources: ['project', 'user'],
|
||||
mcpServers: {
|
||||
nanoclaw: {
|
||||
command: 'node',
|
||||
args: [mcpServerPath],
|
||||
env: {
|
||||
NANOCLAW_CHAT_JID: containerInput.chatJid,
|
||||
NANOCLAW_GROUP_FOLDER: containerInput.groupFolder,
|
||||
NANOCLAW_IS_MAIN: containerInput.isMain ? '1' : '0',
|
||||
},
|
||||
},
|
||||
},
|
||||
hooks: {
|
||||
PreCompact: [{ hooks: [createPreCompactHook(containerInput.assistantName)] }],
|
||||
PreToolUse: [{ matcher: 'Bash', hooks: [createSanitizeBashHook()] }],
|
||||
},
|
||||
}
|
||||
})) {
|
||||
messageCount++;
|
||||
const msgType = message.type === 'system' ? `system/${(message as { subtype?: string }).subtype}` : message.type;
|
||||
log(`[msg #${messageCount}] type=${msgType}`);
|
||||
|
||||
if (message.type === 'assistant' && 'uuid' in message) {
|
||||
lastAssistantUuid = (message as { uuid: string }).uuid;
|
||||
}
|
||||
|
||||
if (message.type === 'system' && message.subtype === 'init') {
|
||||
newSessionId = message.session_id;
|
||||
log(`Session initialized: ${newSessionId}`);
|
||||
}
|
||||
|
||||
if (message.type === 'system' && (message as { subtype?: string }).subtype === 'task_notification') {
|
||||
const tn = message as { task_id: string; status: string; summary: string };
|
||||
log(`Task notification: task=${tn.task_id} status=${tn.status} summary=${tn.summary}`);
|
||||
}
|
||||
|
||||
if (message.type === 'result') {
|
||||
resultCount++;
|
||||
const textResult = 'result' in message ? (message as { result?: string }).result : null;
|
||||
log(`Result #${resultCount}: subtype=${message.subtype}${textResult ? ` text=${textResult.slice(0, 200)}` : ''}`);
|
||||
writeOutput({
|
||||
status: 'success',
|
||||
result: textResult || null,
|
||||
newSessionId
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
ipcPolling = false;
|
||||
log(`Query done. Messages: ${messageCount}, results: ${resultCount}, lastAssistantUuid: ${lastAssistantUuid || 'none'}, closedDuringQuery: ${closedDuringQuery}`);
|
||||
return { newSessionId, lastAssistantUuid, closedDuringQuery };
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
let containerInput: ContainerInput;
|
||||
|
||||
try {
|
||||
const stdinData = await readStdin();
|
||||
containerInput = JSON.parse(stdinData);
|
||||
// Delete the temp file the entrypoint wrote — it contains secrets
|
||||
try { fs.unlinkSync('/tmp/input.json'); } catch { /* may not exist */ }
|
||||
log(`Received input for group: ${containerInput.groupFolder}`);
|
||||
} catch (err) {
|
||||
writeOutput({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Failed to parse input: ${err instanceof Error ? err.message : String(err)}`
|
||||
});
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Build SDK env: merge secrets into process.env for the SDK only.
|
||||
// Secrets never touch process.env itself, so Bash subprocesses can't see them.
|
||||
const sdkEnv: Record<string, string | undefined> = { ...process.env };
|
||||
for (const [key, value] of Object.entries(containerInput.secrets || {})) {
|
||||
sdkEnv[key] = value;
|
||||
}
|
||||
|
||||
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||||
const mcpServerPath = path.join(__dirname, 'ipc-mcp-stdio.js');
|
||||
|
||||
let sessionId = containerInput.sessionId;
|
||||
fs.mkdirSync(IPC_INPUT_DIR, { recursive: true });
|
||||
|
||||
// Clean up stale _close sentinel from previous container runs
|
||||
try { fs.unlinkSync(IPC_INPUT_CLOSE_SENTINEL); } catch { /* ignore */ }
|
||||
|
||||
// Build initial prompt (drain any pending IPC messages too)
|
||||
let prompt = containerInput.prompt;
|
||||
if (containerInput.isScheduledTask) {
|
||||
prompt = `[SCHEDULED TASK - The following message was sent automatically and is not coming directly from the user or group.]\n\n${prompt}`;
|
||||
}
|
||||
const pending = drainIpcInput();
|
||||
if (pending.length > 0) {
|
||||
log(`Draining ${pending.length} pending IPC messages into initial prompt`);
|
||||
prompt += '\n' + pending.join('\n');
|
||||
}
|
||||
|
||||
// Query loop: run query → wait for IPC message → run new query → repeat
|
||||
let resumeAt: string | undefined;
|
||||
try {
|
||||
while (true) {
|
||||
log(`Starting query (session: ${sessionId || 'new'}, resumeAt: ${resumeAt || 'latest'})...`);
|
||||
|
||||
const queryResult = await runQuery(prompt, sessionId, mcpServerPath, containerInput, sdkEnv, resumeAt);
|
||||
if (queryResult.newSessionId) {
|
||||
sessionId = queryResult.newSessionId;
|
||||
}
|
||||
if (queryResult.lastAssistantUuid) {
|
||||
resumeAt = queryResult.lastAssistantUuid;
|
||||
}
|
||||
|
||||
// If _close was consumed during the query, exit immediately.
|
||||
// Don't emit a session-update marker (it would reset the host's
|
||||
// idle timer and cause a 30-min delay before the next _close).
|
||||
if (queryResult.closedDuringQuery) {
|
||||
log('Close sentinel consumed during query, exiting');
|
||||
break;
|
||||
}
|
||||
|
||||
// Emit session update so host can track it
|
||||
writeOutput({ status: 'success', result: null, newSessionId: sessionId });
|
||||
|
||||
log('Query ended, waiting for next IPC message...');
|
||||
|
||||
// Wait for the next message or _close sentinel
|
||||
const nextMessage = await waitForIpcMessage();
|
||||
if (nextMessage === null) {
|
||||
log('Close sentinel received, exiting');
|
||||
break;
|
||||
}
|
||||
|
||||
log(`Got new message (${nextMessage.length} chars), starting new query`);
|
||||
prompt = nextMessage;
|
||||
}
|
||||
} catch (err) {
|
||||
const errorMessage = err instanceof Error ? err.message : String(err);
|
||||
log(`Agent error: ${errorMessage}`);
|
||||
writeOutput({
|
||||
status: 'error',
|
||||
result: null,
|
||||
newSessionId: sessionId,
|
||||
error: errorMessage
|
||||
});
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -0,0 +1,23 @@
|
||||
# Intent: container/agent-runner/src/index.ts
|
||||
|
||||
## What Changed
|
||||
- Added `imageAttachments?` field to ContainerInput interface
|
||||
- Added `ImageContentBlock`, `TextContentBlock`, `ContentBlock` type definitions
|
||||
- Changed `SDKUserMessage.message.content` type from `string` to `string | ContentBlock[]`
|
||||
- Added `pushMultimodal(content: ContentBlock[])` method to MessageStream class
|
||||
- In `runQuery`: image loading logic reads attachments from disk, base64-encodes, sends as multimodal content blocks
|
||||
|
||||
## Key Sections
|
||||
- **Types** (top of file): New content block interfaces, updated SDKUserMessage
|
||||
- **MessageStream class**: New pushMultimodal method
|
||||
- **runQuery function**: Image loading block
|
||||
|
||||
## Invariants (must-keep)
|
||||
- All IPC protocol logic (input polling, close sentinel, message stream)
|
||||
- MessageStream push/end/asyncIterator (text messages still work)
|
||||
- readStdin, writeOutput, log functions
|
||||
- Session management (getSessionSummary, sessions index)
|
||||
- PreCompact hook (transcript archiving)
|
||||
- Bash sanitization hook
|
||||
- SDK query options structure (mcpServers, hooks, permissions)
|
||||
- Query loop in main() (query -> wait for IPC -> repeat)
|
||||
1117
.claude/skills/add-image-vision/modify/src/channels/whatsapp.test.ts
Normal file
1117
.claude/skills/add-image-vision/modify/src/channels/whatsapp.test.ts
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,21 @@
|
||||
# Intent: src/channels/whatsapp.test.ts
|
||||
|
||||
## What Changed
|
||||
- Added `GROUPS_DIR` to config mock
|
||||
- Added `../image.js` mock (isImageMessage defaults false, processImage returns stub)
|
||||
- Added `updateMediaMessage` to fake socket (needed by downloadMediaMessage)
|
||||
- Added `normalizeMessageContent` to Baileys mock (pass-through)
|
||||
- Added `downloadMediaMessage` to Baileys mock (returns Buffer)
|
||||
- Added imports for `downloadMediaMessage`, `isImageMessage`, `processImage`
|
||||
- Added image test cases: downloads/processes, no caption, download failure, processImage null fallback
|
||||
|
||||
## Key Sections
|
||||
- **Mock setup** (top of file): New image mock, extended Baileys mock, extended fakeSocket
|
||||
- **Message handling tests**: Image test cases
|
||||
|
||||
## Invariants (must-keep)
|
||||
- All existing test sections and describe blocks
|
||||
- Existing mock structure (config, logger, db, fs, child_process, Baileys)
|
||||
- Test helpers (createTestOpts, triggerConnection, triggerDisconnect, triggerMessages, connectChannel)
|
||||
- Connection lifecycle, authentication, reconnection, LID translation tests
|
||||
- Outgoing queue, group metadata sync, JID ownership, typing indicator tests
|
||||
419
.claude/skills/add-image-vision/modify/src/channels/whatsapp.ts
Normal file
419
.claude/skills/add-image-vision/modify/src/channels/whatsapp.ts
Normal file
@@ -0,0 +1,419 @@
|
||||
import { exec } from 'child_process';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import makeWASocket, {
|
||||
Browsers,
|
||||
DisconnectReason,
|
||||
downloadMediaMessage,
|
||||
WASocket,
|
||||
fetchLatestWaWebVersion,
|
||||
makeCacheableSignalKeyStore,
|
||||
normalizeMessageContent,
|
||||
useMultiFileAuthState,
|
||||
} from '@whiskeysockets/baileys';
|
||||
|
||||
import {
|
||||
ASSISTANT_HAS_OWN_NUMBER,
|
||||
ASSISTANT_NAME,
|
||||
GROUPS_DIR,
|
||||
STORE_DIR,
|
||||
} from '../config.js';
|
||||
import { getLastGroupSync, setLastGroupSync, updateChatName } from '../db.js';
|
||||
import { isImageMessage, processImage } from '../image.js';
|
||||
import { logger } from '../logger.js';
|
||||
import {
|
||||
Channel,
|
||||
OnInboundMessage,
|
||||
OnChatMetadata,
|
||||
RegisteredGroup,
|
||||
} from '../types.js';
|
||||
import { registerChannel, ChannelOpts } from './registry.js';
|
||||
|
||||
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
|
||||
|
||||
export interface WhatsAppChannelOpts {
|
||||
onMessage: OnInboundMessage;
|
||||
onChatMetadata: OnChatMetadata;
|
||||
registeredGroups: () => Record<string, RegisteredGroup>;
|
||||
}
|
||||
|
||||
export class WhatsAppChannel implements Channel {
|
||||
name = 'whatsapp';
|
||||
|
||||
private sock!: WASocket;
|
||||
private connected = false;
|
||||
private lidToPhoneMap: Record<string, string> = {};
|
||||
private outgoingQueue: Array<{ jid: string; text: string }> = [];
|
||||
private flushing = false;
|
||||
private groupSyncTimerStarted = false;
|
||||
|
||||
private opts: WhatsAppChannelOpts;
|
||||
|
||||
constructor(opts: WhatsAppChannelOpts) {
|
||||
this.opts = opts;
|
||||
}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
this.connectInternal(resolve).catch(reject);
|
||||
});
|
||||
}
|
||||
|
||||
private async connectInternal(onFirstOpen?: () => void): Promise<void> {
|
||||
const authDir = path.join(STORE_DIR, 'auth');
|
||||
fs.mkdirSync(authDir, { recursive: true });
|
||||
|
||||
const { state, saveCreds } = await useMultiFileAuthState(authDir);
|
||||
|
||||
const { version } = await fetchLatestWaWebVersion({}).catch((err) => {
|
||||
logger.warn(
|
||||
{ err },
|
||||
'Failed to fetch latest WA Web version, using default',
|
||||
);
|
||||
return { version: undefined };
|
||||
});
|
||||
this.sock = makeWASocket({
|
||||
version,
|
||||
auth: {
|
||||
creds: state.creds,
|
||||
keys: makeCacheableSignalKeyStore(state.keys, logger),
|
||||
},
|
||||
printQRInTerminal: false,
|
||||
logger,
|
||||
browser: Browsers.macOS('Chrome'),
|
||||
});
|
||||
|
||||
this.sock.ev.on('connection.update', (update) => {
|
||||
const { connection, lastDisconnect, qr } = update;
|
||||
|
||||
if (qr) {
|
||||
const msg =
|
||||
'WhatsApp authentication required. Run /setup in Claude Code.';
|
||||
logger.error(msg);
|
||||
exec(
|
||||
`osascript -e 'display notification "${msg}" with title "NanoClaw" sound name "Basso"'`,
|
||||
);
|
||||
setTimeout(() => process.exit(1), 1000);
|
||||
}
|
||||
|
||||
if (connection === 'close') {
|
||||
this.connected = false;
|
||||
const reason = (
|
||||
lastDisconnect?.error as { output?: { statusCode?: number } }
|
||||
)?.output?.statusCode;
|
||||
const shouldReconnect = reason !== DisconnectReason.loggedOut;
|
||||
logger.info(
|
||||
{
|
||||
reason,
|
||||
shouldReconnect,
|
||||
queuedMessages: this.outgoingQueue.length,
|
||||
},
|
||||
'Connection closed',
|
||||
);
|
||||
|
||||
if (shouldReconnect) {
|
||||
this.scheduleReconnect(1);
|
||||
} else {
|
||||
logger.info('Logged out. Run /setup to re-authenticate.');
|
||||
process.exit(0);
|
||||
}
|
||||
} else if (connection === 'open') {
|
||||
this.connected = true;
|
||||
logger.info('Connected to WhatsApp');
|
||||
|
||||
// Announce availability so WhatsApp relays subsequent presence updates (typing indicators)
|
||||
this.sock.sendPresenceUpdate('available').catch((err) => {
|
||||
logger.warn({ err }, 'Failed to send presence update');
|
||||
});
|
||||
|
||||
// Build LID to phone mapping from auth state for self-chat translation
|
||||
if (this.sock.user) {
|
||||
const phoneUser = this.sock.user.id.split(':')[0];
|
||||
const lidUser = this.sock.user.lid?.split(':')[0];
|
||||
if (lidUser && phoneUser) {
|
||||
this.lidToPhoneMap[lidUser] = `${phoneUser}@s.whatsapp.net`;
|
||||
logger.debug({ lidUser, phoneUser }, 'LID to phone mapping set');
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any messages queued while disconnected
|
||||
this.flushOutgoingQueue().catch((err) =>
|
||||
logger.error({ err }, 'Failed to flush outgoing queue'),
|
||||
);
|
||||
|
||||
// Sync group metadata on startup (respects 24h cache)
|
||||
this.syncGroupMetadata().catch((err) =>
|
||||
logger.error({ err }, 'Initial group sync failed'),
|
||||
);
|
||||
// Set up daily sync timer (only once)
|
||||
if (!this.groupSyncTimerStarted) {
|
||||
this.groupSyncTimerStarted = true;
|
||||
setInterval(() => {
|
||||
this.syncGroupMetadata().catch((err) =>
|
||||
logger.error({ err }, 'Periodic group sync failed'),
|
||||
);
|
||||
}, GROUP_SYNC_INTERVAL_MS);
|
||||
}
|
||||
|
||||
// Signal first connection to caller
|
||||
if (onFirstOpen) {
|
||||
onFirstOpen();
|
||||
onFirstOpen = undefined;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.sock.ev.on('creds.update', saveCreds);
|
||||
|
||||
this.sock.ev.on('messages.upsert', async ({ messages }) => {
|
||||
for (const msg of messages) {
|
||||
try {
|
||||
if (!msg.message) continue;
|
||||
// Unwrap container types (viewOnceMessageV2, ephemeralMessage,
|
||||
// editedMessage, etc.) so that conversation, extendedTextMessage,
|
||||
// imageMessage, etc. are accessible at the top level.
|
||||
const normalized = normalizeMessageContent(msg.message);
|
||||
if (!normalized) continue;
|
||||
const rawJid = msg.key.remoteJid;
|
||||
if (!rawJid || rawJid === 'status@broadcast') continue;
|
||||
|
||||
// Translate LID JID to phone JID if applicable
|
||||
const chatJid = await this.translateJid(rawJid);
|
||||
|
||||
const timestamp = new Date(
|
||||
Number(msg.messageTimestamp) * 1000,
|
||||
).toISOString();
|
||||
|
||||
// Always notify about chat metadata for group discovery
|
||||
const isGroup = chatJid.endsWith('@g.us');
|
||||
this.opts.onChatMetadata(
|
||||
chatJid,
|
||||
timestamp,
|
||||
undefined,
|
||||
'whatsapp',
|
||||
isGroup,
|
||||
);
|
||||
|
||||
// Only deliver full message for registered groups
|
||||
const groups = this.opts.registeredGroups();
|
||||
if (groups[chatJid]) {
|
||||
let content =
|
||||
normalized.conversation ||
|
||||
normalized.extendedTextMessage?.text ||
|
||||
normalized.imageMessage?.caption ||
|
||||
normalized.videoMessage?.caption ||
|
||||
'';
|
||||
|
||||
// Image attachment handling
|
||||
if (isImageMessage(msg)) {
|
||||
try {
|
||||
const buffer = await downloadMediaMessage(msg, 'buffer', {});
|
||||
const groupDir = path.join(GROUPS_DIR, groups[chatJid].folder);
|
||||
const caption = normalized?.imageMessage?.caption ?? '';
|
||||
const result = await processImage(buffer as Buffer, groupDir, caption);
|
||||
if (result) {
|
||||
content = result.content;
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn({ err, jid: chatJid }, 'Image - download failed');
|
||||
}
|
||||
}
|
||||
|
||||
// Skip protocol messages with no text content (encryption keys, read receipts, etc.)
|
||||
if (!content) continue;
|
||||
|
||||
const sender = msg.key.participant || msg.key.remoteJid || '';
|
||||
const senderName = msg.pushName || sender.split('@')[0];
|
||||
|
||||
const fromMe = msg.key.fromMe || false;
|
||||
// Detect bot messages: with own number, fromMe is reliable
|
||||
// since only the bot sends from that number.
|
||||
// With shared number, bot messages carry the assistant name prefix
|
||||
// (even in DMs/self-chat) so we check for that.
|
||||
const isBotMessage = ASSISTANT_HAS_OWN_NUMBER
|
||||
? fromMe
|
||||
: content.startsWith(`${ASSISTANT_NAME}:`);
|
||||
|
||||
this.opts.onMessage(chatJid, {
|
||||
id: msg.key.id || '',
|
||||
chat_jid: chatJid,
|
||||
sender,
|
||||
sender_name: senderName,
|
||||
content,
|
||||
timestamp,
|
||||
is_from_me: fromMe,
|
||||
is_bot_message: isBotMessage,
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{ err, remoteJid: msg.key?.remoteJid },
|
||||
'Error processing incoming message',
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async sendMessage(jid: string, text: string): Promise<void> {
|
||||
// Prefix bot messages with assistant name so users know who's speaking.
|
||||
// On a shared number, prefix is also needed in DMs (including self-chat)
|
||||
// to distinguish bot output from user messages.
|
||||
// Skip only when the assistant has its own dedicated phone number.
|
||||
const prefixed = ASSISTANT_HAS_OWN_NUMBER
|
||||
? text
|
||||
: `${ASSISTANT_NAME}: ${text}`;
|
||||
|
||||
if (!this.connected) {
|
||||
this.outgoingQueue.push({ jid, text: prefixed });
|
||||
logger.info(
|
||||
{ jid, length: prefixed.length, queueSize: this.outgoingQueue.length },
|
||||
'WA disconnected, message queued',
|
||||
);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await this.sock.sendMessage(jid, { text: prefixed });
|
||||
logger.info({ jid, length: prefixed.length }, 'Message sent');
|
||||
} catch (err) {
|
||||
// If send fails, queue it for retry on reconnect
|
||||
this.outgoingQueue.push({ jid, text: prefixed });
|
||||
logger.warn(
|
||||
{ jid, err, queueSize: this.outgoingQueue.length },
|
||||
'Failed to send, message queued',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
isConnected(): boolean {
|
||||
return this.connected;
|
||||
}
|
||||
|
||||
ownsJid(jid: string): boolean {
|
||||
return jid.endsWith('@g.us') || jid.endsWith('@s.whatsapp.net');
|
||||
}
|
||||
|
||||
async disconnect(): Promise<void> {
|
||||
this.connected = false;
|
||||
this.sock?.end(undefined);
|
||||
}
|
||||
|
||||
async setTyping(jid: string, isTyping: boolean): Promise<void> {
|
||||
try {
|
||||
const status = isTyping ? 'composing' : 'paused';
|
||||
logger.debug({ jid, status }, 'Sending presence update');
|
||||
await this.sock.sendPresenceUpdate(status, jid);
|
||||
} catch (err) {
|
||||
logger.debug({ jid, err }, 'Failed to update typing status');
|
||||
}
|
||||
}
|
||||
|
||||
async syncGroups(force: boolean): Promise<void> {
|
||||
return this.syncGroupMetadata(force);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync group metadata from WhatsApp.
|
||||
* Fetches all participating groups and stores their names in the database.
|
||||
* Called on startup, daily, and on-demand via IPC.
|
||||
*/
|
||||
async syncGroupMetadata(force = false): Promise<void> {
|
||||
if (!force) {
|
||||
const lastSync = getLastGroupSync();
|
||||
if (lastSync) {
|
||||
const lastSyncTime = new Date(lastSync).getTime();
|
||||
if (Date.now() - lastSyncTime < GROUP_SYNC_INTERVAL_MS) {
|
||||
logger.debug({ lastSync }, 'Skipping group sync - synced recently');
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info('Syncing group metadata from WhatsApp...');
|
||||
const groups = await this.sock.groupFetchAllParticipating();
|
||||
|
||||
let count = 0;
|
||||
for (const [jid, metadata] of Object.entries(groups)) {
|
||||
if (metadata.subject) {
|
||||
updateChatName(jid, metadata.subject);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
setLastGroupSync();
|
||||
logger.info({ count }, 'Group metadata synced');
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Failed to sync group metadata');
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleReconnect(attempt: number): void {
|
||||
const delayMs = Math.min(5000 * Math.pow(2, attempt - 1), 300000);
|
||||
logger.info({ attempt, delayMs }, 'Reconnecting...');
|
||||
setTimeout(() => {
|
||||
this.connectInternal().catch((err) => {
|
||||
logger.error({ err, attempt }, 'Reconnection attempt failed');
|
||||
this.scheduleReconnect(attempt + 1);
|
||||
});
|
||||
}, delayMs);
|
||||
}
|
||||
|
||||
private async translateJid(jid: string): Promise<string> {
|
||||
if (!jid.endsWith('@lid')) return jid;
|
||||
const lidUser = jid.split('@')[0].split(':')[0];
|
||||
|
||||
// Check local cache first
|
||||
const cached = this.lidToPhoneMap[lidUser];
|
||||
if (cached) {
|
||||
logger.debug(
|
||||
{ lidJid: jid, phoneJid: cached },
|
||||
'Translated LID to phone JID (cached)',
|
||||
);
|
||||
return cached;
|
||||
}
|
||||
|
||||
// Query Baileys' signal repository for the mapping
|
||||
try {
|
||||
const pn = await this.sock.signalRepository?.lidMapping?.getPNForLID(jid);
|
||||
if (pn) {
|
||||
const phoneJid = `${pn.split('@')[0].split(':')[0]}@s.whatsapp.net`;
|
||||
this.lidToPhoneMap[lidUser] = phoneJid;
|
||||
logger.info(
|
||||
{ lidJid: jid, phoneJid },
|
||||
'Translated LID to phone JID (signalRepository)',
|
||||
);
|
||||
return phoneJid;
|
||||
}
|
||||
} catch (err) {
|
||||
logger.debug({ err, jid }, 'Failed to resolve LID via signalRepository');
|
||||
}
|
||||
|
||||
return jid;
|
||||
}
|
||||
|
||||
private async flushOutgoingQueue(): Promise<void> {
|
||||
if (this.flushing || this.outgoingQueue.length === 0) return;
|
||||
this.flushing = true;
|
||||
try {
|
||||
logger.info(
|
||||
{ count: this.outgoingQueue.length },
|
||||
'Flushing outgoing message queue',
|
||||
);
|
||||
while (this.outgoingQueue.length > 0) {
|
||||
const item = this.outgoingQueue.shift()!;
|
||||
// Send directly — queued items are already prefixed by sendMessage
|
||||
await this.sock.sendMessage(item.jid, { text: item.text });
|
||||
logger.info(
|
||||
{ jid: item.jid, length: item.text.length },
|
||||
'Queued message sent',
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
this.flushing = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
registerChannel('whatsapp', (opts: ChannelOpts) => new WhatsAppChannel(opts));
|
||||
@@ -0,0 +1,23 @@
|
||||
# Intent: src/channels/whatsapp.ts
|
||||
|
||||
## What Changed
|
||||
- Added `downloadMediaMessage` import from Baileys
|
||||
- Added `normalizeMessageContent` import from Baileys for unwrapping container types
|
||||
- Added `GROUPS_DIR` to config import
|
||||
- Added `isImageMessage`, `processImage` imports from `../image.js`
|
||||
- Uses `normalizeMessageContent(msg.message)` to unwrap viewOnce, ephemeral, edited messages
|
||||
- Changed `const content =` to `let content =` (allows mutation by image handler)
|
||||
- Added image download/process block between content extraction and `!content` guard
|
||||
|
||||
## Key Sections
|
||||
- **Imports** (top of file): New imports for downloadMediaMessage, normalizeMessageContent, isImageMessage, processImage, GROUPS_DIR
|
||||
- **messages.upsert handler** (inside `connectInternal`): normalizeMessageContent call, image block inserted after text extraction, before the `!content` skip guard
|
||||
|
||||
## Invariants (must-keep)
|
||||
- WhatsAppChannel class structure and all existing methods
|
||||
- Connection lifecycle (connect, reconnect with exponential backoff, disconnect)
|
||||
- LID-to-phone translation logic
|
||||
- Outgoing message queue and flush logic
|
||||
- Group metadata sync with 24h cache
|
||||
- The `!content` guard must remain AFTER media blocks (they provide content for otherwise-empty messages)
|
||||
- Local timestamp format (no Z suffix) for cursor compatibility
|
||||
703
.claude/skills/add-image-vision/modify/src/container-runner.ts
Normal file
703
.claude/skills/add-image-vision/modify/src/container-runner.ts
Normal file
@@ -0,0 +1,703 @@
|
||||
/**
|
||||
* Container Runner for NanoClaw
|
||||
* Spawns agent execution in containers and handles IPC
|
||||
*/
|
||||
import { ChildProcess, exec, spawn } from 'child_process';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import {
|
||||
CONTAINER_IMAGE,
|
||||
CONTAINER_MAX_OUTPUT_SIZE,
|
||||
CONTAINER_TIMEOUT,
|
||||
DATA_DIR,
|
||||
GROUPS_DIR,
|
||||
IDLE_TIMEOUT,
|
||||
TIMEZONE,
|
||||
} from './config.js';
|
||||
import { readEnvFile } from './env.js';
|
||||
import { resolveGroupFolderPath, resolveGroupIpcPath } from './group-folder.js';
|
||||
import { logger } from './logger.js';
|
||||
import {
|
||||
CONTAINER_RUNTIME_BIN,
|
||||
readonlyMountArgs,
|
||||
stopContainer,
|
||||
} from './container-runtime.js';
|
||||
import { validateAdditionalMounts } from './mount-security.js';
|
||||
import { RegisteredGroup } from './types.js';
|
||||
|
||||
// Sentinel markers for robust output parsing (must match agent-runner)
|
||||
const OUTPUT_START_MARKER = '---NANOCLAW_OUTPUT_START---';
|
||||
const OUTPUT_END_MARKER = '---NANOCLAW_OUTPUT_END---';
|
||||
|
||||
export interface ContainerInput {
|
||||
prompt: string;
|
||||
sessionId?: string;
|
||||
groupFolder: string;
|
||||
chatJid: string;
|
||||
isMain: boolean;
|
||||
isScheduledTask?: boolean;
|
||||
assistantName?: string;
|
||||
secrets?: Record<string, string>;
|
||||
imageAttachments?: Array<{ relativePath: string; mediaType: string }>;
|
||||
}
|
||||
|
||||
export interface ContainerOutput {
|
||||
status: 'success' | 'error';
|
||||
result: string | null;
|
||||
newSessionId?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
interface VolumeMount {
|
||||
hostPath: string;
|
||||
containerPath: string;
|
||||
readonly: boolean;
|
||||
}
|
||||
|
||||
function buildVolumeMounts(
|
||||
group: RegisteredGroup,
|
||||
isMain: boolean,
|
||||
): VolumeMount[] {
|
||||
const mounts: VolumeMount[] = [];
|
||||
const projectRoot = process.cwd();
|
||||
const groupDir = resolveGroupFolderPath(group.folder);
|
||||
|
||||
if (isMain) {
|
||||
// Main gets the project root read-only. Writable paths the agent needs
|
||||
// (group folder, IPC, .claude/) are mounted separately below.
|
||||
// Read-only prevents the agent from modifying host application code
|
||||
// (src/, dist/, package.json, etc.) which would bypass the sandbox
|
||||
// entirely on next restart.
|
||||
mounts.push({
|
||||
hostPath: projectRoot,
|
||||
containerPath: '/workspace/project',
|
||||
readonly: true,
|
||||
});
|
||||
|
||||
// Shadow .env so the agent cannot read secrets from the mounted project root.
|
||||
// Secrets are passed via stdin instead (see readSecrets()).
|
||||
const envFile = path.join(projectRoot, '.env');
|
||||
if (fs.existsSync(envFile)) {
|
||||
mounts.push({
|
||||
hostPath: '/dev/null',
|
||||
containerPath: '/workspace/project/.env',
|
||||
readonly: true,
|
||||
});
|
||||
}
|
||||
|
||||
// Main also gets its group folder as the working directory
|
||||
mounts.push({
|
||||
hostPath: groupDir,
|
||||
containerPath: '/workspace/group',
|
||||
readonly: false,
|
||||
});
|
||||
} else {
|
||||
// Other groups only get their own folder
|
||||
mounts.push({
|
||||
hostPath: groupDir,
|
||||
containerPath: '/workspace/group',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Global memory directory (read-only for non-main)
|
||||
// Only directory mounts are supported, not file mounts
|
||||
const globalDir = path.join(GROUPS_DIR, 'global');
|
||||
if (fs.existsSync(globalDir)) {
|
||||
mounts.push({
|
||||
hostPath: globalDir,
|
||||
containerPath: '/workspace/global',
|
||||
readonly: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Per-group Claude sessions directory (isolated from other groups)
|
||||
// Each group gets their own .claude/ to prevent cross-group session access
|
||||
const groupSessionsDir = path.join(
|
||||
DATA_DIR,
|
||||
'sessions',
|
||||
group.folder,
|
||||
'.claude',
|
||||
);
|
||||
fs.mkdirSync(groupSessionsDir, { recursive: true });
|
||||
const settingsFile = path.join(groupSessionsDir, 'settings.json');
|
||||
if (!fs.existsSync(settingsFile)) {
|
||||
fs.writeFileSync(
|
||||
settingsFile,
|
||||
JSON.stringify(
|
||||
{
|
||||
env: {
|
||||
// Enable agent swarms (subagent orchestration)
|
||||
// https://code.claude.com/docs/en/agent-teams#orchestrate-teams-of-claude-code-sessions
|
||||
CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS: '1',
|
||||
// Load CLAUDE.md from additional mounted directories
|
||||
// https://code.claude.com/docs/en/memory#load-memory-from-additional-directories
|
||||
CLAUDE_CODE_ADDITIONAL_DIRECTORIES_CLAUDE_MD: '1',
|
||||
// Enable Claude's memory feature (persists user preferences between sessions)
|
||||
// https://code.claude.com/docs/en/memory#manage-auto-memory
|
||||
CLAUDE_CODE_DISABLE_AUTO_MEMORY: '0',
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
) + '\n',
|
||||
);
|
||||
}
|
||||
|
||||
// Sync skills from container/skills/ into each group's .claude/skills/
|
||||
const skillsSrc = path.join(process.cwd(), 'container', 'skills');
|
||||
const skillsDst = path.join(groupSessionsDir, 'skills');
|
||||
if (fs.existsSync(skillsSrc)) {
|
||||
for (const skillDir of fs.readdirSync(skillsSrc)) {
|
||||
const srcDir = path.join(skillsSrc, skillDir);
|
||||
if (!fs.statSync(srcDir).isDirectory()) continue;
|
||||
const dstDir = path.join(skillsDst, skillDir);
|
||||
fs.cpSync(srcDir, dstDir, { recursive: true });
|
||||
}
|
||||
}
|
||||
mounts.push({
|
||||
hostPath: groupSessionsDir,
|
||||
containerPath: '/home/node/.claude',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Per-group IPC namespace: each group gets its own IPC directory
|
||||
// This prevents cross-group privilege escalation via IPC
|
||||
const groupIpcDir = resolveGroupIpcPath(group.folder);
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'messages'), { recursive: true });
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'tasks'), { recursive: true });
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'input'), { recursive: true });
|
||||
mounts.push({
|
||||
hostPath: groupIpcDir,
|
||||
containerPath: '/workspace/ipc',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Copy agent-runner source into a per-group writable location so agents
|
||||
// can customize it (add tools, change behavior) without affecting other
|
||||
// groups. Recompiled on container startup via entrypoint.sh.
|
||||
const agentRunnerSrc = path.join(
|
||||
projectRoot,
|
||||
'container',
|
||||
'agent-runner',
|
||||
'src',
|
||||
);
|
||||
const groupAgentRunnerDir = path.join(
|
||||
DATA_DIR,
|
||||
'sessions',
|
||||
group.folder,
|
||||
'agent-runner-src',
|
||||
);
|
||||
if (!fs.existsSync(groupAgentRunnerDir) && fs.existsSync(agentRunnerSrc)) {
|
||||
fs.cpSync(agentRunnerSrc, groupAgentRunnerDir, { recursive: true });
|
||||
}
|
||||
mounts.push({
|
||||
hostPath: groupAgentRunnerDir,
|
||||
containerPath: '/app/src',
|
||||
readonly: false,
|
||||
});
|
||||
|
||||
// Additional mounts validated against external allowlist (tamper-proof from containers)
|
||||
if (group.containerConfig?.additionalMounts) {
|
||||
const validatedMounts = validateAdditionalMounts(
|
||||
group.containerConfig.additionalMounts,
|
||||
group.name,
|
||||
isMain,
|
||||
);
|
||||
mounts.push(...validatedMounts);
|
||||
}
|
||||
|
||||
return mounts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read allowed secrets from .env for passing to the container via stdin.
|
||||
* Secrets are never written to disk or mounted as files.
|
||||
*/
|
||||
function readSecrets(): Record<string, string> {
|
||||
return readEnvFile([
|
||||
'CLAUDE_CODE_OAUTH_TOKEN',
|
||||
'ANTHROPIC_API_KEY',
|
||||
'ANTHROPIC_BASE_URL',
|
||||
'ANTHROPIC_AUTH_TOKEN',
|
||||
]);
|
||||
}
|
||||
|
||||
function buildContainerArgs(
|
||||
mounts: VolumeMount[],
|
||||
containerName: string,
|
||||
): string[] {
|
||||
const args: string[] = ['run', '-i', '--rm', '--name', containerName];
|
||||
|
||||
// Pass host timezone so container's local time matches the user's
|
||||
args.push('-e', `TZ=${TIMEZONE}`);
|
||||
|
||||
// Run as host user so bind-mounted files are accessible.
|
||||
// Skip when running as root (uid 0), as the container's node user (uid 1000),
|
||||
// or when getuid is unavailable (native Windows without WSL).
|
||||
const hostUid = process.getuid?.();
|
||||
const hostGid = process.getgid?.();
|
||||
if (hostUid != null && hostUid !== 0 && hostUid !== 1000) {
|
||||
args.push('--user', `${hostUid}:${hostGid}`);
|
||||
args.push('-e', 'HOME=/home/node');
|
||||
}
|
||||
|
||||
for (const mount of mounts) {
|
||||
if (mount.readonly) {
|
||||
args.push(...readonlyMountArgs(mount.hostPath, mount.containerPath));
|
||||
} else {
|
||||
args.push('-v', `${mount.hostPath}:${mount.containerPath}`);
|
||||
}
|
||||
}
|
||||
|
||||
args.push(CONTAINER_IMAGE);
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
export async function runContainerAgent(
|
||||
group: RegisteredGroup,
|
||||
input: ContainerInput,
|
||||
onProcess: (proc: ChildProcess, containerName: string) => void,
|
||||
onOutput?: (output: ContainerOutput) => Promise<void>,
|
||||
): Promise<ContainerOutput> {
|
||||
const startTime = Date.now();
|
||||
|
||||
const groupDir = resolveGroupFolderPath(group.folder);
|
||||
fs.mkdirSync(groupDir, { recursive: true });
|
||||
|
||||
const mounts = buildVolumeMounts(group, input.isMain);
|
||||
const safeName = group.folder.replace(/[^a-zA-Z0-9-]/g, '-');
|
||||
const containerName = `nanoclaw-${safeName}-${Date.now()}`;
|
||||
const containerArgs = buildContainerArgs(mounts, containerName);
|
||||
|
||||
logger.debug(
|
||||
{
|
||||
group: group.name,
|
||||
containerName,
|
||||
mounts: mounts.map(
|
||||
(m) =>
|
||||
`${m.hostPath} -> ${m.containerPath}${m.readonly ? ' (ro)' : ''}`,
|
||||
),
|
||||
containerArgs: containerArgs.join(' '),
|
||||
},
|
||||
'Container mount configuration',
|
||||
);
|
||||
|
||||
logger.info(
|
||||
{
|
||||
group: group.name,
|
||||
containerName,
|
||||
mountCount: mounts.length,
|
||||
isMain: input.isMain,
|
||||
},
|
||||
'Spawning container agent',
|
||||
);
|
||||
|
||||
const logsDir = path.join(groupDir, 'logs');
|
||||
fs.mkdirSync(logsDir, { recursive: true });
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const container = spawn(CONTAINER_RUNTIME_BIN, containerArgs, {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
onProcess(container, containerName);
|
||||
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
let stdoutTruncated = false;
|
||||
let stderrTruncated = false;
|
||||
|
||||
// Pass secrets via stdin (never written to disk or mounted as files)
|
||||
input.secrets = readSecrets();
|
||||
container.stdin.write(JSON.stringify(input));
|
||||
container.stdin.end();
|
||||
// Remove secrets from input so they don't appear in logs
|
||||
delete input.secrets;
|
||||
|
||||
// Streaming output: parse OUTPUT_START/END marker pairs as they arrive
|
||||
let parseBuffer = '';
|
||||
let newSessionId: string | undefined;
|
||||
let outputChain = Promise.resolve();
|
||||
|
||||
container.stdout.on('data', (data) => {
|
||||
const chunk = data.toString();
|
||||
|
||||
// Always accumulate for logging
|
||||
if (!stdoutTruncated) {
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stdout.length;
|
||||
if (chunk.length > remaining) {
|
||||
stdout += chunk.slice(0, remaining);
|
||||
stdoutTruncated = true;
|
||||
logger.warn(
|
||||
{ group: group.name, size: stdout.length },
|
||||
'Container stdout truncated due to size limit',
|
||||
);
|
||||
} else {
|
||||
stdout += chunk;
|
||||
}
|
||||
}
|
||||
|
||||
// Stream-parse for output markers
|
||||
if (onOutput) {
|
||||
parseBuffer += chunk;
|
||||
let startIdx: number;
|
||||
while ((startIdx = parseBuffer.indexOf(OUTPUT_START_MARKER)) !== -1) {
|
||||
const endIdx = parseBuffer.indexOf(OUTPUT_END_MARKER, startIdx);
|
||||
if (endIdx === -1) break; // Incomplete pair, wait for more data
|
||||
|
||||
const jsonStr = parseBuffer
|
||||
.slice(startIdx + OUTPUT_START_MARKER.length, endIdx)
|
||||
.trim();
|
||||
parseBuffer = parseBuffer.slice(endIdx + OUTPUT_END_MARKER.length);
|
||||
|
||||
try {
|
||||
const parsed: ContainerOutput = JSON.parse(jsonStr);
|
||||
if (parsed.newSessionId) {
|
||||
newSessionId = parsed.newSessionId;
|
||||
}
|
||||
hadStreamingOutput = true;
|
||||
// Activity detected — reset the hard timeout
|
||||
resetTimeout();
|
||||
// Call onOutput for all markers (including null results)
|
||||
// so idle timers start even for "silent" query completions.
|
||||
outputChain = outputChain.then(() => onOutput(parsed));
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ group: group.name, error: err },
|
||||
'Failed to parse streamed output chunk',
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
container.stderr.on('data', (data) => {
|
||||
const chunk = data.toString();
|
||||
const lines = chunk.trim().split('\n');
|
||||
for (const line of lines) {
|
||||
if (line) logger.debug({ container: group.folder }, line);
|
||||
}
|
||||
// Don't reset timeout on stderr — SDK writes debug logs continuously.
|
||||
// Timeout only resets on actual output (OUTPUT_MARKER in stdout).
|
||||
if (stderrTruncated) return;
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stderr.length;
|
||||
if (chunk.length > remaining) {
|
||||
stderr += chunk.slice(0, remaining);
|
||||
stderrTruncated = true;
|
||||
logger.warn(
|
||||
{ group: group.name, size: stderr.length },
|
||||
'Container stderr truncated due to size limit',
|
||||
);
|
||||
} else {
|
||||
stderr += chunk;
|
||||
}
|
||||
});
|
||||
|
||||
let timedOut = false;
|
||||
let hadStreamingOutput = false;
|
||||
const configTimeout = group.containerConfig?.timeout || CONTAINER_TIMEOUT;
|
||||
// Grace period: hard timeout must be at least IDLE_TIMEOUT + 30s so the
|
||||
// graceful _close sentinel has time to trigger before the hard kill fires.
|
||||
const timeoutMs = Math.max(configTimeout, IDLE_TIMEOUT + 30_000);
|
||||
|
||||
const killOnTimeout = () => {
|
||||
timedOut = true;
|
||||
logger.error(
|
||||
{ group: group.name, containerName },
|
||||
'Container timeout, stopping gracefully',
|
||||
);
|
||||
exec(stopContainer(containerName), { timeout: 15000 }, (err) => {
|
||||
if (err) {
|
||||
logger.warn(
|
||||
{ group: group.name, containerName, err },
|
||||
'Graceful stop failed, force killing',
|
||||
);
|
||||
container.kill('SIGKILL');
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
let timeout = setTimeout(killOnTimeout, timeoutMs);
|
||||
|
||||
// Reset the timeout whenever there's activity (streaming output)
|
||||
const resetTimeout = () => {
|
||||
clearTimeout(timeout);
|
||||
timeout = setTimeout(killOnTimeout, timeoutMs);
|
||||
};
|
||||
|
||||
container.on('close', (code) => {
|
||||
clearTimeout(timeout);
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
if (timedOut) {
|
||||
const ts = new Date().toISOString().replace(/[:.]/g, '-');
|
||||
const timeoutLog = path.join(logsDir, `container-${ts}.log`);
|
||||
fs.writeFileSync(
|
||||
timeoutLog,
|
||||
[
|
||||
`=== Container Run Log (TIMEOUT) ===`,
|
||||
`Timestamp: ${new Date().toISOString()}`,
|
||||
`Group: ${group.name}`,
|
||||
`Container: ${containerName}`,
|
||||
`Duration: ${duration}ms`,
|
||||
`Exit Code: ${code}`,
|
||||
`Had Streaming Output: ${hadStreamingOutput}`,
|
||||
].join('\n'),
|
||||
);
|
||||
|
||||
// Timeout after output = idle cleanup, not failure.
|
||||
// The agent already sent its response; this is just the
|
||||
// container being reaped after the idle period expired.
|
||||
if (hadStreamingOutput) {
|
||||
logger.info(
|
||||
{ group: group.name, containerName, duration, code },
|
||||
'Container timed out after output (idle cleanup)',
|
||||
);
|
||||
outputChain.then(() => {
|
||||
resolve({
|
||||
status: 'success',
|
||||
result: null,
|
||||
newSessionId,
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error(
|
||||
{ group: group.name, containerName, duration, code },
|
||||
'Container timed out with no output',
|
||||
);
|
||||
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Container timed out after ${configTimeout}ms`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
|
||||
const logFile = path.join(logsDir, `container-${timestamp}.log`);
|
||||
const isVerbose =
|
||||
process.env.LOG_LEVEL === 'debug' || process.env.LOG_LEVEL === 'trace';
|
||||
|
||||
const logLines = [
|
||||
`=== Container Run Log ===`,
|
||||
`Timestamp: ${new Date().toISOString()}`,
|
||||
`Group: ${group.name}`,
|
||||
`IsMain: ${input.isMain}`,
|
||||
`Duration: ${duration}ms`,
|
||||
`Exit Code: ${code}`,
|
||||
`Stdout Truncated: ${stdoutTruncated}`,
|
||||
`Stderr Truncated: ${stderrTruncated}`,
|
||||
``,
|
||||
];
|
||||
|
||||
const isError = code !== 0;
|
||||
|
||||
if (isVerbose || isError) {
|
||||
logLines.push(
|
||||
`=== Input ===`,
|
||||
JSON.stringify(input, null, 2),
|
||||
``,
|
||||
`=== Container Args ===`,
|
||||
containerArgs.join(' '),
|
||||
``,
|
||||
`=== Mounts ===`,
|
||||
mounts
|
||||
.map(
|
||||
(m) =>
|
||||
`${m.hostPath} -> ${m.containerPath}${m.readonly ? ' (ro)' : ''}`,
|
||||
)
|
||||
.join('\n'),
|
||||
``,
|
||||
`=== Stderr${stderrTruncated ? ' (TRUNCATED)' : ''} ===`,
|
||||
stderr,
|
||||
``,
|
||||
`=== Stdout${stdoutTruncated ? ' (TRUNCATED)' : ''} ===`,
|
||||
stdout,
|
||||
);
|
||||
} else {
|
||||
logLines.push(
|
||||
`=== Input Summary ===`,
|
||||
`Prompt length: ${input.prompt.length} chars`,
|
||||
`Session ID: ${input.sessionId || 'new'}`,
|
||||
``,
|
||||
`=== Mounts ===`,
|
||||
mounts
|
||||
.map((m) => `${m.containerPath}${m.readonly ? ' (ro)' : ''}`)
|
||||
.join('\n'),
|
||||
``,
|
||||
);
|
||||
}
|
||||
|
||||
fs.writeFileSync(logFile, logLines.join('\n'));
|
||||
logger.debug({ logFile, verbose: isVerbose }, 'Container log written');
|
||||
|
||||
if (code !== 0) {
|
||||
logger.error(
|
||||
{
|
||||
group: group.name,
|
||||
code,
|
||||
duration,
|
||||
stderr,
|
||||
stdout,
|
||||
logFile,
|
||||
},
|
||||
'Container exited with error',
|
||||
);
|
||||
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Container exited with code ${code}: ${stderr.slice(-200)}`,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Streaming mode: wait for output chain to settle, return completion marker
|
||||
if (onOutput) {
|
||||
outputChain.then(() => {
|
||||
logger.info(
|
||||
{ group: group.name, duration, newSessionId },
|
||||
'Container completed (streaming mode)',
|
||||
);
|
||||
resolve({
|
||||
status: 'success',
|
||||
result: null,
|
||||
newSessionId,
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Legacy mode: parse the last output marker pair from accumulated stdout
|
||||
try {
|
||||
// Extract JSON between sentinel markers for robust parsing
|
||||
const startIdx = stdout.indexOf(OUTPUT_START_MARKER);
|
||||
const endIdx = stdout.indexOf(OUTPUT_END_MARKER);
|
||||
|
||||
let jsonLine: string;
|
||||
if (startIdx !== -1 && endIdx !== -1 && endIdx > startIdx) {
|
||||
jsonLine = stdout
|
||||
.slice(startIdx + OUTPUT_START_MARKER.length, endIdx)
|
||||
.trim();
|
||||
} else {
|
||||
// Fallback: last non-empty line (backwards compatibility)
|
||||
const lines = stdout.trim().split('\n');
|
||||
jsonLine = lines[lines.length - 1];
|
||||
}
|
||||
|
||||
const output: ContainerOutput = JSON.parse(jsonLine);
|
||||
|
||||
logger.info(
|
||||
{
|
||||
group: group.name,
|
||||
duration,
|
||||
status: output.status,
|
||||
hasResult: !!output.result,
|
||||
},
|
||||
'Container completed',
|
||||
);
|
||||
|
||||
resolve(output);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{
|
||||
group: group.name,
|
||||
stdout,
|
||||
stderr,
|
||||
error: err,
|
||||
},
|
||||
'Failed to parse container output',
|
||||
);
|
||||
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Failed to parse container output: ${err instanceof Error ? err.message : String(err)}`,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
container.on('error', (err) => {
|
||||
clearTimeout(timeout);
|
||||
logger.error(
|
||||
{ group: group.name, containerName, error: err },
|
||||
'Container spawn error',
|
||||
);
|
||||
resolve({
|
||||
status: 'error',
|
||||
result: null,
|
||||
error: `Container spawn error: ${err.message}`,
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function writeTasksSnapshot(
|
||||
groupFolder: string,
|
||||
isMain: boolean,
|
||||
tasks: Array<{
|
||||
id: string;
|
||||
groupFolder: string;
|
||||
prompt: string;
|
||||
schedule_type: string;
|
||||
schedule_value: string;
|
||||
status: string;
|
||||
next_run: string | null;
|
||||
}>,
|
||||
): void {
|
||||
// Write filtered tasks to the group's IPC directory
|
||||
const groupIpcDir = resolveGroupIpcPath(groupFolder);
|
||||
fs.mkdirSync(groupIpcDir, { recursive: true });
|
||||
|
||||
// Main sees all tasks, others only see their own
|
||||
const filteredTasks = isMain
|
||||
? tasks
|
||||
: tasks.filter((t) => t.groupFolder === groupFolder);
|
||||
|
||||
const tasksFile = path.join(groupIpcDir, 'current_tasks.json');
|
||||
fs.writeFileSync(tasksFile, JSON.stringify(filteredTasks, null, 2));
|
||||
}
|
||||
|
||||
export interface AvailableGroup {
|
||||
jid: string;
|
||||
name: string;
|
||||
lastActivity: string;
|
||||
isRegistered: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write available groups snapshot for the container to read.
|
||||
* Only main group can see all available groups (for activation).
|
||||
* Non-main groups only see their own registration status.
|
||||
*/
|
||||
export function writeGroupsSnapshot(
|
||||
groupFolder: string,
|
||||
isMain: boolean,
|
||||
groups: AvailableGroup[],
|
||||
registeredJids: Set<string>,
|
||||
): void {
|
||||
const groupIpcDir = resolveGroupIpcPath(groupFolder);
|
||||
fs.mkdirSync(groupIpcDir, { recursive: true });
|
||||
|
||||
// Main sees all groups; others see nothing (they can't activate groups)
|
||||
const visibleGroups = isMain ? groups : [];
|
||||
|
||||
const groupsFile = path.join(groupIpcDir, 'available_groups.json');
|
||||
fs.writeFileSync(
|
||||
groupsFile,
|
||||
JSON.stringify(
|
||||
{
|
||||
groups: visibleGroups,
|
||||
lastSync: new Date().toISOString(),
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
# Intent: src/container-runner.ts
|
||||
|
||||
## What Changed
|
||||
- Added `imageAttachments?` optional field to `ContainerInput` interface
|
||||
|
||||
## Key Sections
|
||||
- **ContainerInput interface**: imageAttachments optional field (`Array<{ relativePath: string; mediaType: string }>`)
|
||||
|
||||
## Invariants (must-keep)
|
||||
- ContainerOutput interface unchanged
|
||||
- buildContainerArgs structure (run, -i, --rm, --name, mounts, image)
|
||||
- runContainerAgent with streaming output parsing (OUTPUT_START/END markers)
|
||||
- writeTasksSnapshot, writeGroupsSnapshot functions
|
||||
- Additional mounts via validateAdditionalMounts
|
||||
- Mount security validation against external allowlist
|
||||
590
.claude/skills/add-image-vision/modify/src/index.ts
Normal file
590
.claude/skills/add-image-vision/modify/src/index.ts
Normal file
@@ -0,0 +1,590 @@
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import {
|
||||
ASSISTANT_NAME,
|
||||
IDLE_TIMEOUT,
|
||||
POLL_INTERVAL,
|
||||
TRIGGER_PATTERN,
|
||||
} from './config.js';
|
||||
import './channels/index.js';
|
||||
import {
|
||||
getChannelFactory,
|
||||
getRegisteredChannelNames,
|
||||
} from './channels/registry.js';
|
||||
import {
|
||||
ContainerOutput,
|
||||
runContainerAgent,
|
||||
writeGroupsSnapshot,
|
||||
writeTasksSnapshot,
|
||||
} from './container-runner.js';
|
||||
import {
|
||||
cleanupOrphans,
|
||||
ensureContainerRuntimeRunning,
|
||||
} from './container-runtime.js';
|
||||
import {
|
||||
getAllChats,
|
||||
getAllRegisteredGroups,
|
||||
getAllSessions,
|
||||
getAllTasks,
|
||||
getMessagesSince,
|
||||
getNewMessages,
|
||||
getRouterState,
|
||||
initDatabase,
|
||||
setRegisteredGroup,
|
||||
setRouterState,
|
||||
setSession,
|
||||
storeChatMetadata,
|
||||
storeMessage,
|
||||
} from './db.js';
|
||||
import { GroupQueue } from './group-queue.js';
|
||||
import { resolveGroupFolderPath } from './group-folder.js';
|
||||
import { startIpcWatcher } from './ipc.js';
|
||||
import { findChannel, formatMessages, formatOutbound } from './router.js';
|
||||
import {
|
||||
isSenderAllowed,
|
||||
isTriggerAllowed,
|
||||
loadSenderAllowlist,
|
||||
shouldDropMessage,
|
||||
} from './sender-allowlist.js';
|
||||
import { startSchedulerLoop } from './task-scheduler.js';
|
||||
import { Channel, NewMessage, RegisteredGroup } from './types.js';
|
||||
import { parseImageReferences } from './image.js';
|
||||
import { logger } from './logger.js';
|
||||
|
||||
// Re-export for backwards compatibility during refactor
|
||||
export { escapeXml, formatMessages } from './router.js';
|
||||
|
||||
let lastTimestamp = '';
|
||||
let sessions: Record<string, string> = {};
|
||||
let registeredGroups: Record<string, RegisteredGroup> = {};
|
||||
let lastAgentTimestamp: Record<string, string> = {};
|
||||
let messageLoopRunning = false;
|
||||
|
||||
const channels: Channel[] = [];
|
||||
const queue = new GroupQueue();
|
||||
|
||||
function loadState(): void {
|
||||
lastTimestamp = getRouterState('last_timestamp') || '';
|
||||
const agentTs = getRouterState('last_agent_timestamp');
|
||||
try {
|
||||
lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {};
|
||||
} catch {
|
||||
logger.warn('Corrupted last_agent_timestamp in DB, resetting');
|
||||
lastAgentTimestamp = {};
|
||||
}
|
||||
sessions = getAllSessions();
|
||||
registeredGroups = getAllRegisteredGroups();
|
||||
logger.info(
|
||||
{ groupCount: Object.keys(registeredGroups).length },
|
||||
'State loaded',
|
||||
);
|
||||
}
|
||||
|
||||
function saveState(): void {
|
||||
setRouterState('last_timestamp', lastTimestamp);
|
||||
setRouterState('last_agent_timestamp', JSON.stringify(lastAgentTimestamp));
|
||||
}
|
||||
|
||||
function registerGroup(jid: string, group: RegisteredGroup): void {
|
||||
let groupDir: string;
|
||||
try {
|
||||
groupDir = resolveGroupFolderPath(group.folder);
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ jid, folder: group.folder, err },
|
||||
'Rejecting group registration with invalid folder',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
registeredGroups[jid] = group;
|
||||
setRegisteredGroup(jid, group);
|
||||
|
||||
// Create group folder
|
||||
fs.mkdirSync(path.join(groupDir, 'logs'), { recursive: true });
|
||||
|
||||
logger.info(
|
||||
{ jid, name: group.name, folder: group.folder },
|
||||
'Group registered',
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get available groups list for the agent.
|
||||
* Returns groups ordered by most recent activity.
|
||||
*/
|
||||
export function getAvailableGroups(): import('./container-runner.js').AvailableGroup[] {
|
||||
const chats = getAllChats();
|
||||
const registeredJids = new Set(Object.keys(registeredGroups));
|
||||
|
||||
return chats
|
||||
.filter((c) => c.jid !== '__group_sync__' && c.is_group)
|
||||
.map((c) => ({
|
||||
jid: c.jid,
|
||||
name: c.name,
|
||||
lastActivity: c.last_message_time,
|
||||
isRegistered: registeredJids.has(c.jid),
|
||||
}));
|
||||
}
|
||||
|
||||
/** @internal - exported for testing */
|
||||
export function _setRegisteredGroups(
|
||||
groups: Record<string, RegisteredGroup>,
|
||||
): void {
|
||||
registeredGroups = groups;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all pending messages for a group.
|
||||
* Called by the GroupQueue when it's this group's turn.
|
||||
*/
|
||||
async function processGroupMessages(chatJid: string): Promise<boolean> {
|
||||
const group = registeredGroups[chatJid];
|
||||
if (!group) return true;
|
||||
|
||||
const channel = findChannel(channels, chatJid);
|
||||
if (!channel) {
|
||||
logger.warn({ chatJid }, 'No channel owns JID, skipping messages');
|
||||
return true;
|
||||
}
|
||||
|
||||
const isMainGroup = group.isMain === true;
|
||||
|
||||
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
|
||||
const missedMessages = getMessagesSince(
|
||||
chatJid,
|
||||
sinceTimestamp,
|
||||
ASSISTANT_NAME,
|
||||
);
|
||||
|
||||
if (missedMessages.length === 0) return true;
|
||||
|
||||
// For non-main groups, check if trigger is required and present
|
||||
if (!isMainGroup && group.requiresTrigger !== false) {
|
||||
const allowlistCfg = loadSenderAllowlist();
|
||||
const hasTrigger = missedMessages.some(
|
||||
(m) =>
|
||||
TRIGGER_PATTERN.test(m.content.trim()) &&
|
||||
(m.is_from_me || isTriggerAllowed(chatJid, m.sender, allowlistCfg)),
|
||||
);
|
||||
if (!hasTrigger) return true;
|
||||
}
|
||||
|
||||
const prompt = formatMessages(missedMessages);
|
||||
const imageAttachments = parseImageReferences(missedMessages);
|
||||
|
||||
// Advance cursor so the piping path in startMessageLoop won't re-fetch
|
||||
// these messages. Save the old cursor so we can roll back on error.
|
||||
const previousCursor = lastAgentTimestamp[chatJid] || '';
|
||||
lastAgentTimestamp[chatJid] =
|
||||
missedMessages[missedMessages.length - 1].timestamp;
|
||||
saveState();
|
||||
|
||||
logger.info(
|
||||
{ group: group.name, messageCount: missedMessages.length },
|
||||
'Processing messages',
|
||||
);
|
||||
|
||||
// Track idle timer for closing stdin when agent is idle
|
||||
let idleTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
const resetIdleTimer = () => {
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
idleTimer = setTimeout(() => {
|
||||
logger.debug(
|
||||
{ group: group.name },
|
||||
'Idle timeout, closing container stdin',
|
||||
);
|
||||
queue.closeStdin(chatJid);
|
||||
}, IDLE_TIMEOUT);
|
||||
};
|
||||
|
||||
await channel.setTyping?.(chatJid, true);
|
||||
let hadError = false;
|
||||
let outputSentToUser = false;
|
||||
|
||||
const output = await runAgent(group, prompt, chatJid, imageAttachments, async (result) => {
|
||||
// Streaming output callback — called for each agent result
|
||||
if (result.result) {
|
||||
const raw =
|
||||
typeof result.result === 'string'
|
||||
? result.result
|
||||
: JSON.stringify(result.result);
|
||||
// Strip <internal>...</internal> blocks — agent uses these for internal reasoning
|
||||
const text = raw.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
|
||||
logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`);
|
||||
if (text) {
|
||||
await channel.sendMessage(chatJid, text);
|
||||
outputSentToUser = true;
|
||||
}
|
||||
// Only reset idle timer on actual results, not session-update markers (result: null)
|
||||
resetIdleTimer();
|
||||
}
|
||||
|
||||
if (result.status === 'success') {
|
||||
queue.notifyIdle(chatJid);
|
||||
}
|
||||
|
||||
if (result.status === 'error') {
|
||||
hadError = true;
|
||||
}
|
||||
});
|
||||
|
||||
await channel.setTyping?.(chatJid, false);
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
|
||||
if (output === 'error' || hadError) {
|
||||
// If we already sent output to the user, don't roll back the cursor —
|
||||
// the user got their response and re-processing would send duplicates.
|
||||
if (outputSentToUser) {
|
||||
logger.warn(
|
||||
{ group: group.name },
|
||||
'Agent error after output was sent, skipping cursor rollback to prevent duplicates',
|
||||
);
|
||||
return true;
|
||||
}
|
||||
// Roll back cursor so retries can re-process these messages
|
||||
lastAgentTimestamp[chatJid] = previousCursor;
|
||||
saveState();
|
||||
logger.warn(
|
||||
{ group: group.name },
|
||||
'Agent error, rolled back message cursor for retry',
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async function runAgent(
|
||||
group: RegisteredGroup,
|
||||
prompt: string,
|
||||
chatJid: string,
|
||||
imageAttachments: Array<{ relativePath: string; mediaType: string }>,
|
||||
onOutput?: (output: ContainerOutput) => Promise<void>,
|
||||
): Promise<'success' | 'error'> {
|
||||
const isMain = group.isMain === true;
|
||||
const sessionId = sessions[group.folder];
|
||||
|
||||
// Update tasks snapshot for container to read (filtered by group)
|
||||
const tasks = getAllTasks();
|
||||
writeTasksSnapshot(
|
||||
group.folder,
|
||||
isMain,
|
||||
tasks.map((t) => ({
|
||||
id: t.id,
|
||||
groupFolder: t.group_folder,
|
||||
prompt: t.prompt,
|
||||
schedule_type: t.schedule_type,
|
||||
schedule_value: t.schedule_value,
|
||||
status: t.status,
|
||||
next_run: t.next_run,
|
||||
})),
|
||||
);
|
||||
|
||||
// Update available groups snapshot (main group only can see all groups)
|
||||
const availableGroups = getAvailableGroups();
|
||||
writeGroupsSnapshot(
|
||||
group.folder,
|
||||
isMain,
|
||||
availableGroups,
|
||||
new Set(Object.keys(registeredGroups)),
|
||||
);
|
||||
|
||||
// Wrap onOutput to track session ID from streamed results
|
||||
const wrappedOnOutput = onOutput
|
||||
? async (output: ContainerOutput) => {
|
||||
if (output.newSessionId) {
|
||||
sessions[group.folder] = output.newSessionId;
|
||||
setSession(group.folder, output.newSessionId);
|
||||
}
|
||||
await onOutput(output);
|
||||
}
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
const output = await runContainerAgent(
|
||||
group,
|
||||
{
|
||||
prompt,
|
||||
sessionId,
|
||||
groupFolder: group.folder,
|
||||
chatJid,
|
||||
isMain,
|
||||
assistantName: ASSISTANT_NAME,
|
||||
...(imageAttachments.length > 0 && { imageAttachments }),
|
||||
},
|
||||
(proc, containerName) =>
|
||||
queue.registerProcess(chatJid, proc, containerName, group.folder),
|
||||
wrappedOnOutput,
|
||||
);
|
||||
|
||||
if (output.newSessionId) {
|
||||
sessions[group.folder] = output.newSessionId;
|
||||
setSession(group.folder, output.newSessionId);
|
||||
}
|
||||
|
||||
if (output.status === 'error') {
|
||||
logger.error(
|
||||
{ group: group.name, error: output.error },
|
||||
'Container agent error',
|
||||
);
|
||||
return 'error';
|
||||
}
|
||||
|
||||
return 'success';
|
||||
} catch (err) {
|
||||
logger.error({ group: group.name, err }, 'Agent error');
|
||||
return 'error';
|
||||
}
|
||||
}
|
||||
|
||||
async function startMessageLoop(): Promise<void> {
|
||||
if (messageLoopRunning) {
|
||||
logger.debug('Message loop already running, skipping duplicate start');
|
||||
return;
|
||||
}
|
||||
messageLoopRunning = true;
|
||||
|
||||
logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const jids = Object.keys(registeredGroups);
|
||||
const { messages, newTimestamp } = getNewMessages(
|
||||
jids,
|
||||
lastTimestamp,
|
||||
ASSISTANT_NAME,
|
||||
);
|
||||
|
||||
if (messages.length > 0) {
|
||||
logger.info({ count: messages.length }, 'New messages');
|
||||
|
||||
// Advance the "seen" cursor for all messages immediately
|
||||
lastTimestamp = newTimestamp;
|
||||
saveState();
|
||||
|
||||
// Deduplicate by group
|
||||
const messagesByGroup = new Map<string, NewMessage[]>();
|
||||
for (const msg of messages) {
|
||||
const existing = messagesByGroup.get(msg.chat_jid);
|
||||
if (existing) {
|
||||
existing.push(msg);
|
||||
} else {
|
||||
messagesByGroup.set(msg.chat_jid, [msg]);
|
||||
}
|
||||
}
|
||||
|
||||
for (const [chatJid, groupMessages] of messagesByGroup) {
|
||||
const group = registeredGroups[chatJid];
|
||||
if (!group) continue;
|
||||
|
||||
const channel = findChannel(channels, chatJid);
|
||||
if (!channel) {
|
||||
logger.warn({ chatJid }, 'No channel owns JID, skipping messages');
|
||||
continue;
|
||||
}
|
||||
|
||||
const isMainGroup = group.isMain === true;
|
||||
const needsTrigger = !isMainGroup && group.requiresTrigger !== false;
|
||||
|
||||
// For non-main groups, only act on trigger messages.
|
||||
// Non-trigger messages accumulate in DB and get pulled as
|
||||
// context when a trigger eventually arrives.
|
||||
if (needsTrigger) {
|
||||
const allowlistCfg = loadSenderAllowlist();
|
||||
const hasTrigger = groupMessages.some(
|
||||
(m) =>
|
||||
TRIGGER_PATTERN.test(m.content.trim()) &&
|
||||
(m.is_from_me ||
|
||||
isTriggerAllowed(chatJid, m.sender, allowlistCfg)),
|
||||
);
|
||||
if (!hasTrigger) continue;
|
||||
}
|
||||
|
||||
// Pull all messages since lastAgentTimestamp so non-trigger
|
||||
// context that accumulated between triggers is included.
|
||||
const allPending = getMessagesSince(
|
||||
chatJid,
|
||||
lastAgentTimestamp[chatJid] || '',
|
||||
ASSISTANT_NAME,
|
||||
);
|
||||
const messagesToSend =
|
||||
allPending.length > 0 ? allPending : groupMessages;
|
||||
const formatted = formatMessages(messagesToSend);
|
||||
|
||||
if (queue.sendMessage(chatJid, formatted)) {
|
||||
logger.debug(
|
||||
{ chatJid, count: messagesToSend.length },
|
||||
'Piped messages to active container',
|
||||
);
|
||||
lastAgentTimestamp[chatJid] =
|
||||
messagesToSend[messagesToSend.length - 1].timestamp;
|
||||
saveState();
|
||||
// Show typing indicator while the container processes the piped message
|
||||
channel
|
||||
.setTyping?.(chatJid, true)
|
||||
?.catch((err) =>
|
||||
logger.warn({ chatJid, err }, 'Failed to set typing indicator'),
|
||||
);
|
||||
} else {
|
||||
// No active container — enqueue for a new one
|
||||
queue.enqueueMessageCheck(chatJid);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Error in message loop');
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Startup recovery: check for unprocessed messages in registered groups.
|
||||
* Handles crash between advancing lastTimestamp and processing messages.
|
||||
*/
|
||||
function recoverPendingMessages(): void {
|
||||
for (const [chatJid, group] of Object.entries(registeredGroups)) {
|
||||
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
|
||||
const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME);
|
||||
if (pending.length > 0) {
|
||||
logger.info(
|
||||
{ group: group.name, pendingCount: pending.length },
|
||||
'Recovery: found unprocessed messages',
|
||||
);
|
||||
queue.enqueueMessageCheck(chatJid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function ensureContainerSystemRunning(): void {
|
||||
ensureContainerRuntimeRunning();
|
||||
cleanupOrphans();
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
ensureContainerSystemRunning();
|
||||
initDatabase();
|
||||
logger.info('Database initialized');
|
||||
loadState();
|
||||
|
||||
// Graceful shutdown handlers
|
||||
const shutdown = async (signal: string) => {
|
||||
logger.info({ signal }, 'Shutdown signal received');
|
||||
await queue.shutdown(10000);
|
||||
for (const ch of channels) await ch.disconnect();
|
||||
process.exit(0);
|
||||
};
|
||||
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
||||
process.on('SIGINT', () => shutdown('SIGINT'));
|
||||
|
||||
// Channel callbacks (shared by all channels)
|
||||
const channelOpts = {
|
||||
onMessage: (chatJid: string, msg: NewMessage) => {
|
||||
// Sender allowlist drop mode: discard messages from denied senders before storing
|
||||
if (!msg.is_from_me && !msg.is_bot_message && registeredGroups[chatJid]) {
|
||||
const cfg = loadSenderAllowlist();
|
||||
if (
|
||||
shouldDropMessage(chatJid, cfg) &&
|
||||
!isSenderAllowed(chatJid, msg.sender, cfg)
|
||||
) {
|
||||
if (cfg.logDenied) {
|
||||
logger.debug(
|
||||
{ chatJid, sender: msg.sender },
|
||||
'sender-allowlist: dropping message (drop mode)',
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
storeMessage(msg);
|
||||
},
|
||||
onChatMetadata: (
|
||||
chatJid: string,
|
||||
timestamp: string,
|
||||
name?: string,
|
||||
channel?: string,
|
||||
isGroup?: boolean,
|
||||
) => storeChatMetadata(chatJid, timestamp, name, channel, isGroup),
|
||||
registeredGroups: () => registeredGroups,
|
||||
};
|
||||
|
||||
// Create and connect all registered channels.
|
||||
// Each channel self-registers via the barrel import above.
|
||||
// Factories return null when credentials are missing, so unconfigured channels are skipped.
|
||||
for (const channelName of getRegisteredChannelNames()) {
|
||||
const factory = getChannelFactory(channelName)!;
|
||||
const channel = factory(channelOpts);
|
||||
if (!channel) {
|
||||
logger.warn(
|
||||
{ channel: channelName },
|
||||
'Channel installed but credentials missing — skipping. Check .env or re-run the channel skill.',
|
||||
);
|
||||
continue;
|
||||
}
|
||||
channels.push(channel);
|
||||
await channel.connect();
|
||||
}
|
||||
if (channels.length === 0) {
|
||||
logger.fatal('No channels connected');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Start subsystems (independently of connection handler)
|
||||
startSchedulerLoop({
|
||||
registeredGroups: () => registeredGroups,
|
||||
getSessions: () => sessions,
|
||||
queue,
|
||||
onProcess: (groupJid, proc, containerName, groupFolder) =>
|
||||
queue.registerProcess(groupJid, proc, containerName, groupFolder),
|
||||
sendMessage: async (jid, rawText) => {
|
||||
const channel = findChannel(channels, jid);
|
||||
if (!channel) {
|
||||
logger.warn({ jid }, 'No channel owns JID, cannot send message');
|
||||
return;
|
||||
}
|
||||
const text = formatOutbound(rawText);
|
||||
if (text) await channel.sendMessage(jid, text);
|
||||
},
|
||||
});
|
||||
startIpcWatcher({
|
||||
sendMessage: (jid, text) => {
|
||||
const channel = findChannel(channels, jid);
|
||||
if (!channel) throw new Error(`No channel for JID: ${jid}`);
|
||||
return channel.sendMessage(jid, text);
|
||||
},
|
||||
registeredGroups: () => registeredGroups,
|
||||
registerGroup,
|
||||
syncGroups: async (force: boolean) => {
|
||||
await Promise.all(
|
||||
channels
|
||||
.filter((ch) => ch.syncGroups)
|
||||
.map((ch) => ch.syncGroups!(force)),
|
||||
);
|
||||
},
|
||||
getAvailableGroups,
|
||||
writeGroupsSnapshot: (gf, im, ag, rj) =>
|
||||
writeGroupsSnapshot(gf, im, ag, rj),
|
||||
});
|
||||
queue.setProcessMessagesFn(processGroupMessages);
|
||||
recoverPendingMessages();
|
||||
startMessageLoop().catch((err) => {
|
||||
logger.fatal({ err }, 'Message loop crashed unexpectedly');
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
// Guard: only run when executed directly, not when imported by tests
|
||||
const isDirectRun =
|
||||
process.argv[1] &&
|
||||
new URL(import.meta.url).pathname ===
|
||||
new URL(`file://${process.argv[1]}`).pathname;
|
||||
|
||||
if (isDirectRun) {
|
||||
main().catch((err) => {
|
||||
logger.error({ err }, 'Failed to start NanoClaw');
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
# Intent: src/index.ts
|
||||
|
||||
## What Changed
|
||||
- Added `import { parseImageReferences } from './image.js'`
|
||||
- In `processGroupMessages`: extract image references after formatting, pass `imageAttachments` to `runAgent`
|
||||
- In `runAgent`: added `imageAttachments` parameter, conditionally spread into `runContainerAgent` input
|
||||
|
||||
## Key Sections
|
||||
- **Imports** (top of file): parseImageReferences
|
||||
- **processGroupMessages**: Image extraction, threading to runAgent
|
||||
- **runAgent**: Signature change + imageAttachments in input
|
||||
|
||||
## Invariants (must-keep)
|
||||
- State management (lastTimestamp, sessions, registeredGroups, lastAgentTimestamp)
|
||||
- loadState/saveState functions
|
||||
- registerGroup function with folder validation
|
||||
- getAvailableGroups function
|
||||
- processGroupMessages trigger logic, cursor management, idle timer, error rollback with duplicate prevention
|
||||
- runAgent task/group snapshot writes, session tracking, wrappedOnOutput
|
||||
- startMessageLoop with dedup-by-group and piping logic
|
||||
- recoverPendingMessages startup recovery
|
||||
- main() with channel setup, scheduler, IPC watcher, queue
|
||||
- ensureContainerSystemRunning using container-runtime abstraction
|
||||
- Graceful shutdown with queue.shutdown
|
||||
Reference in New Issue
Block a user