Files
nanoclaw/.claude/skills/claw/scripts/claw
Ken Bolton 724fe7250d fix(claw): mount group folder and sessions into container
claw was running containers with no volume mounts, so the agent
always saw an empty /workspace/group. Add build_mounts() to
replicate the same bind-mounts that container-runner.ts sets up
(group folder, .claude sessions, IPC dir, agent-runner source,
and project root for main).

Also includes upstream fix from qwibitai/nanoclaw#1368:
graceful terminate() before kill() on output sentinel, and early
return after a successful structured response so exit code stays 0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 20:36:05 -04:00

375 lines
13 KiB
Python

#!/usr/bin/env python3
"""
claw — NanoClaw CLI
Run a NanoClaw agent container from the command line.
Usage:
claw "What is 2+2?"
claw -g <channel_name> "Review this code"
claw -g "<channel name with spaces>" "What's the latest issue?"
claw -j "<chatJid>" "Hello"
claw -g <channel_name> -s <session-id> "Continue"
claw --list-groups
echo "prompt text" | claw --pipe -g <channel_name>
cat prompt.txt | claw --pipe
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sqlite3
import subprocess
import sys
import threading
from pathlib import Path
# ── Globals ─────────────────────────────────────────────────────────────────
VERBOSE = False
def dbg(*args):
if VERBOSE:
print("»", *args, file=sys.stderr)
# ── Config ──────────────────────────────────────────────────────────────────
def _find_nanoclaw_dir() -> Path:
"""Locate the NanoClaw installation directory.
Resolution order:
1. NANOCLAW_DIR env var
2. The directory containing this script (if it looks like a NanoClaw install)
3. ~/src/nanoclaw (legacy default)
"""
if env := os.environ.get("NANOCLAW_DIR"):
return Path(env).expanduser()
# If this script lives inside the NanoClaw tree (e.g. scripts/claw), walk up
here = Path(__file__).resolve()
for parent in [here.parent, here.parent.parent]:
if (parent / "store" / "messages.db").exists() or (parent / ".env").exists():
return parent
return Path.home() / "src" / "nanoclaw"
NANOCLAW_DIR = _find_nanoclaw_dir()
DB_PATH = NANOCLAW_DIR / "store" / "messages.db"
ENV_FILE = NANOCLAW_DIR / ".env"
IMAGE = "nanoclaw-agent:latest"
SECRET_KEYS = [
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_API_KEY",
"ANTHROPIC_BASE_URL",
"ANTHROPIC_AUTH_TOKEN",
"OLLAMA_HOST",
]
# ── Helpers ──────────────────────────────────────────────────────────────────
def detect_runtime(preference: str | None) -> str:
if preference:
dbg(f"runtime: forced to {preference}")
return preference
for rt in ("container", "docker"):
result = subprocess.run(["which", rt], capture_output=True)
if result.returncode == 0:
dbg(f"runtime: auto-detected {rt} at {result.stdout.decode().strip()}")
return rt
sys.exit("error: neither 'container' nor 'docker' found. Install one or pass --runtime.")
def read_secrets(env_file: Path) -> dict:
secrets = {}
if not env_file.exists():
return secrets
for line in env_file.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, val = line.partition("=")
key = key.strip()
if key in SECRET_KEYS:
secrets[key] = val.strip()
return secrets
def get_groups(db: Path) -> list[dict]:
conn = sqlite3.connect(db)
rows = conn.execute(
"SELECT jid, name, folder, is_main FROM registered_groups ORDER BY name"
).fetchall()
conn.close()
return [{"jid": r[0], "name": r[1], "folder": r[2], "is_main": bool(r[3])} for r in rows]
def find_group(groups: list[dict], query: str) -> dict | None:
q = query.lower()
# Exact name match
for g in groups:
if g["name"].lower() == q or g["folder"].lower() == q:
return g
# Partial match
matches = [g for g in groups if q in g["name"].lower() or q in g["folder"].lower()]
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
names = ", ".join(f'"{g["name"]}"' for g in matches)
sys.exit(f"error: ambiguous group '{query}'. Matches: {names}")
return None
def build_mounts(folder: str, is_main: bool) -> list[tuple[str, str, bool]]:
"""Return list of (host_path, container_path, readonly) tuples."""
groups_dir = NANOCLAW_DIR / "groups"
data_dir = NANOCLAW_DIR / "data"
sessions_dir = data_dir / "sessions" / folder
ipc_dir = data_dir / "ipc" / folder
# Ensure required dirs exist
group_dir = groups_dir / folder
group_dir.mkdir(parents=True, exist_ok=True)
(sessions_dir / ".claude").mkdir(parents=True, exist_ok=True)
for sub in ("messages", "tasks", "input"):
(ipc_dir / sub).mkdir(parents=True, exist_ok=True)
agent_runner_src = sessions_dir / "agent-runner-src"
project_agent_runner = NANOCLAW_DIR / "container" / "agent-runner" / "src"
if not agent_runner_src.exists() and project_agent_runner.exists():
import shutil
shutil.copytree(project_agent_runner, agent_runner_src)
mounts: list[tuple[str, str, bool]] = []
if is_main:
mounts.append((str(NANOCLAW_DIR), "/workspace/project", True))
mounts.append((str(group_dir), "/workspace/group", False))
mounts.append((str(sessions_dir / ".claude"), "/home/node/.claude", False))
mounts.append((str(ipc_dir), "/workspace/ipc", False))
if agent_runner_src.exists():
mounts.append((str(agent_runner_src), "/app/src", False))
return mounts
def run_container(runtime: str, image: str, payload: dict,
folder: str | None = None, is_main: bool = False,
timeout: int = 300) -> None:
cmd = [runtime, "run", "-i", "--rm"]
if folder:
for host, container, readonly in build_mounts(folder, is_main):
if readonly:
cmd += ["--mount", f"type=bind,source={host},target={container},readonly"]
else:
cmd += ["-v", f"{host}:{container}"]
cmd.append(image)
dbg(f"cmd: {' '.join(cmd)}")
# Show payload sans secrets
if VERBOSE:
safe = {k: v for k, v in payload.items() if k != "secrets"}
safe["secrets"] = {k: "***" for k in payload.get("secrets", {})}
dbg(f"payload: {json.dumps(safe, indent=2)}")
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
dbg(f"container pid: {proc.pid}")
# Write JSON payload and close stdin
proc.stdin.write(json.dumps(payload).encode())
proc.stdin.close()
dbg("stdin closed, waiting for response...")
stdout_lines: list[str] = []
stderr_lines: list[str] = []
done = threading.Event()
def stream_stderr():
for raw in proc.stderr:
line = raw.decode(errors="replace").rstrip()
if line.startswith("npm notice"):
continue
stderr_lines.append(line)
print(line, file=sys.stderr)
def stream_stdout():
for raw in proc.stdout:
line = raw.decode(errors="replace").rstrip()
stdout_lines.append(line)
dbg(f"stdout: {line}")
# Kill the container as soon as we see the closing sentinel —
# the Node.js event loop often keeps the process alive indefinitely.
if line.strip() == "---NANOCLAW_OUTPUT_END---":
dbg("output sentinel found, terminating container")
done.set()
try:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
dbg("graceful stop timed out, force killing container")
proc.kill()
except ProcessLookupError:
pass
return
t_err = threading.Thread(target=stream_stderr, daemon=True)
t_out = threading.Thread(target=stream_stdout, daemon=True)
t_err.start()
t_out.start()
# Wait for sentinel or timeout
if not done.wait(timeout=timeout):
# Also check if process exited naturally
t_out.join(timeout=2)
if not done.is_set():
proc.kill()
sys.exit(f"error: container timed out after {timeout}s (no output sentinel received)")
t_err.join(timeout=5)
t_out.join(timeout=5)
proc.wait()
dbg(f"container done (rc={proc.returncode}), {len(stdout_lines)} stdout lines")
stdout = "\n".join(stdout_lines)
# Parse output block
match = re.search(
r"---NANOCLAW_OUTPUT_START---\n(.*?)\n---NANOCLAW_OUTPUT_END---",
stdout,
re.DOTALL,
)
success = False
if match:
try:
data = json.loads(match.group(1))
status = data.get("status", "unknown")
if status == "success":
print(data.get("result", ""))
session_id = data.get("newSessionId") or data.get("sessionId")
if session_id:
print(f"\n[session: {session_id}]", file=sys.stderr)
success = True
else:
print(f"[{status}] {data.get('result', '')}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError:
print(match.group(1))
else:
# No structured output — print raw stdout
print(stdout)
if success:
return
if proc.returncode not in (0, None):
sys.exit(proc.returncode)
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
prog="claw",
description="Run a NanoClaw agent from the command line.",
)
parser.add_argument("prompt", nargs="?", help="Prompt to send")
parser.add_argument("-g", "--group", help="Group name or folder (fuzzy match)")
parser.add_argument("-j", "--jid", help="Chat JID (exact)")
parser.add_argument("-s", "--session", help="Session ID to resume")
parser.add_argument("-p", "--pipe", action="store_true",
help="Read prompt from stdin (can be combined with a prompt arg as prefix)")
parser.add_argument("--runtime", choices=["docker", "container"],
help="Container runtime (default: auto-detect)")
parser.add_argument("--image", default=IMAGE, help=f"Container image (default: {IMAGE})")
parser.add_argument("--list-groups", action="store_true", help="List registered groups and exit")
parser.add_argument("--raw", action="store_true", help="Print raw JSON output")
parser.add_argument("--timeout", type=int, default=300, metavar="SECS",
help="Max seconds to wait for a response (default: 300)")
parser.add_argument("-v", "--verbose", action="store_true",
help="Show debug info: cmd, payload (secrets redacted), stdout lines, exit code")
args = parser.parse_args()
global VERBOSE
VERBOSE = args.verbose
groups = get_groups(DB_PATH) if DB_PATH.exists() else []
if args.list_groups:
print(f"{'NAME':<35} {'FOLDER':<30} {'JID'}")
print("-" * 100)
for g in groups:
main_tag = " [main]" if g["is_main"] else ""
print(f"{g['name']:<35} {g['folder']:<30} {g['jid']}{main_tag}")
return
# Resolve prompt: --pipe reads stdin, optionally prepended with positional arg
if args.pipe or (not sys.stdin.isatty() and not args.prompt):
stdin_text = sys.stdin.read().strip()
if args.prompt:
prompt = f"{args.prompt}\n\n{stdin_text}"
else:
prompt = stdin_text
else:
prompt = args.prompt
if not prompt:
parser.print_help()
sys.exit(1)
# Resolve group → jid
jid = args.jid
group_name = None
group_folder = None
is_main = False
if args.group:
g = find_group(groups, args.group)
if g is None:
sys.exit(f"error: group '{args.group}' not found. Run --list-groups to see options.")
jid = g["jid"]
group_name = g["name"]
group_folder = g["folder"]
is_main = g["is_main"]
elif not jid:
# Default: main group
mains = [g for g in groups if g["is_main"]]
if mains:
jid = mains[0]["jid"]
group_name = mains[0]["name"]
group_folder = mains[0]["folder"]
is_main = True
else:
sys.exit("error: no group specified and no main group found. Use -g or -j.")
runtime = detect_runtime(args.runtime)
secrets = read_secrets(ENV_FILE)
if not secrets:
print("warning: no secrets found in .env — agent may not be authenticated", file=sys.stderr)
payload: dict = {
"prompt": prompt,
"chatJid": jid,
"isMain": is_main,
"secrets": secrets,
}
if group_name:
payload["groupFolder"] = group_name
if args.session:
payload["sessionId"] = args.session
payload["resumeAt"] = "latest"
print(f"[{group_name or jid}] running via {runtime}...", file=sys.stderr)
run_container(runtime, args.image, payload,
folder=group_folder, is_main=is_main,
timeout=args.timeout)
if __name__ == "__main__":
main()