refactor: extract claw script from SKILL.md into separate file
Move the Python CLI script from inline markdown into scripts/claw,
aligning with the Claude Code skills standard (code in files, not md).
Remove non-standard `author` frontmatter field. SKILL.md now uses
${CLAUDE_SKILL_DIR} substitution to copy the script during install.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
318
.claude/skills/claw/scripts/claw
Normal file
318
.claude/skills/claw/scripts/claw
Normal file
@@ -0,0 +1,318 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
claw — NanoClaw CLI
|
||||
Run a NanoClaw agent container from the command line.
|
||||
|
||||
Usage:
|
||||
claw "What is 2+2?"
|
||||
claw -g <channel_name> "Review this code"
|
||||
claw -g "<channel name with spaces>" "What's the latest issue?"
|
||||
claw -j "<chatJid>" "Hello"
|
||||
claw -g <channel_name> -s <session-id> "Continue"
|
||||
claw --list-groups
|
||||
echo "prompt text" | claw --pipe -g <channel_name>
|
||||
cat prompt.txt | claw --pipe
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
# ── Globals ─────────────────────────────────────────────────────────────────
|
||||
|
||||
VERBOSE = False
|
||||
|
||||
def dbg(*args):
|
||||
if VERBOSE:
|
||||
print("»", *args, file=sys.stderr)
|
||||
|
||||
# ── Config ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _find_nanoclaw_dir() -> Path:
|
||||
"""Locate the NanoClaw installation directory.
|
||||
|
||||
Resolution order:
|
||||
1. NANOCLAW_DIR env var
|
||||
2. The directory containing this script (if it looks like a NanoClaw install)
|
||||
3. ~/src/nanoclaw (legacy default)
|
||||
"""
|
||||
if env := os.environ.get("NANOCLAW_DIR"):
|
||||
return Path(env).expanduser()
|
||||
# If this script lives inside the NanoClaw tree (e.g. scripts/claw), walk up
|
||||
here = Path(__file__).resolve()
|
||||
for parent in [here.parent, here.parent.parent]:
|
||||
if (parent / "store" / "messages.db").exists() or (parent / ".env").exists():
|
||||
return parent
|
||||
return Path.home() / "src" / "nanoclaw"
|
||||
|
||||
NANOCLAW_DIR = _find_nanoclaw_dir()
|
||||
DB_PATH = NANOCLAW_DIR / "store" / "messages.db"
|
||||
ENV_FILE = NANOCLAW_DIR / ".env"
|
||||
IMAGE = "nanoclaw-agent:latest"
|
||||
|
||||
SECRET_KEYS = [
|
||||
"CLAUDE_CODE_OAUTH_TOKEN",
|
||||
"ANTHROPIC_API_KEY",
|
||||
"ANTHROPIC_BASE_URL",
|
||||
"ANTHROPIC_AUTH_TOKEN",
|
||||
"OLLAMA_HOST",
|
||||
]
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def detect_runtime(preference: str | None) -> str:
|
||||
if preference:
|
||||
dbg(f"runtime: forced to {preference}")
|
||||
return preference
|
||||
for rt in ("container", "docker"):
|
||||
result = subprocess.run(["which", rt], capture_output=True)
|
||||
if result.returncode == 0:
|
||||
dbg(f"runtime: auto-detected {rt} at {result.stdout.decode().strip()}")
|
||||
return rt
|
||||
sys.exit("error: neither 'container' nor 'docker' found. Install one or pass --runtime.")
|
||||
|
||||
|
||||
def read_secrets(env_file: Path) -> dict:
|
||||
secrets = {}
|
||||
if not env_file.exists():
|
||||
return secrets
|
||||
for line in env_file.read_text().splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if "=" in line:
|
||||
key, _, val = line.partition("=")
|
||||
key = key.strip()
|
||||
if key in SECRET_KEYS:
|
||||
secrets[key] = val.strip()
|
||||
return secrets
|
||||
|
||||
|
||||
def get_groups(db: Path) -> list[dict]:
|
||||
conn = sqlite3.connect(db)
|
||||
rows = conn.execute(
|
||||
"SELECT jid, name, folder, is_main FROM registered_groups ORDER BY name"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return [{"jid": r[0], "name": r[1], "folder": r[2], "is_main": bool(r[3])} for r in rows]
|
||||
|
||||
|
||||
def find_group(groups: list[dict], query: str) -> dict | None:
|
||||
q = query.lower()
|
||||
# Exact name match
|
||||
for g in groups:
|
||||
if g["name"].lower() == q or g["folder"].lower() == q:
|
||||
return g
|
||||
# Partial match
|
||||
matches = [g for g in groups if q in g["name"].lower() or q in g["folder"].lower()]
|
||||
if len(matches) == 1:
|
||||
return matches[0]
|
||||
if len(matches) > 1:
|
||||
names = ", ".join(f'"{g["name"]}"' for g in matches)
|
||||
sys.exit(f"error: ambiguous group '{query}'. Matches: {names}")
|
||||
return None
|
||||
|
||||
|
||||
def run_container(runtime: str, image: str, payload: dict, timeout: int = 300) -> None:
|
||||
cmd = [runtime, "run", "-i", "--rm", image]
|
||||
dbg(f"cmd: {' '.join(cmd)}")
|
||||
|
||||
# Show payload sans secrets
|
||||
if VERBOSE:
|
||||
safe = {k: v for k, v in payload.items() if k != "secrets"}
|
||||
safe["secrets"] = {k: "***" for k in payload.get("secrets", {})}
|
||||
dbg(f"payload: {json.dumps(safe, indent=2)}")
|
||||
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
dbg(f"container pid: {proc.pid}")
|
||||
|
||||
# Write JSON payload and close stdin
|
||||
proc.stdin.write(json.dumps(payload).encode())
|
||||
proc.stdin.close()
|
||||
dbg("stdin closed, waiting for response...")
|
||||
|
||||
stdout_lines: list[str] = []
|
||||
stderr_lines: list[str] = []
|
||||
done = threading.Event()
|
||||
|
||||
def stream_stderr():
|
||||
for raw in proc.stderr:
|
||||
line = raw.decode(errors="replace").rstrip()
|
||||
if line.startswith("npm notice"):
|
||||
continue
|
||||
stderr_lines.append(line)
|
||||
print(line, file=sys.stderr)
|
||||
|
||||
def stream_stdout():
|
||||
for raw in proc.stdout:
|
||||
line = raw.decode(errors="replace").rstrip()
|
||||
stdout_lines.append(line)
|
||||
dbg(f"stdout: {line}")
|
||||
# Kill the container as soon as we see the closing sentinel —
|
||||
# the Node.js event loop often keeps the process alive indefinitely.
|
||||
if line.strip() == "---NANOCLAW_OUTPUT_END---":
|
||||
dbg("output sentinel found, terminating container")
|
||||
done.set()
|
||||
try:
|
||||
proc.kill()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
return
|
||||
|
||||
t_err = threading.Thread(target=stream_stderr, daemon=True)
|
||||
t_out = threading.Thread(target=stream_stdout, daemon=True)
|
||||
t_err.start()
|
||||
t_out.start()
|
||||
|
||||
# Wait for sentinel or timeout
|
||||
if not done.wait(timeout=timeout):
|
||||
# Also check if process exited naturally
|
||||
t_out.join(timeout=2)
|
||||
if not done.is_set():
|
||||
proc.kill()
|
||||
sys.exit(f"error: container timed out after {timeout}s (no output sentinel received)")
|
||||
|
||||
t_err.join(timeout=5)
|
||||
t_out.join(timeout=5)
|
||||
proc.wait()
|
||||
dbg(f"container done (rc={proc.returncode}), {len(stdout_lines)} stdout lines")
|
||||
stdout = "\n".join(stdout_lines)
|
||||
|
||||
# Parse output block
|
||||
match = re.search(
|
||||
r"---NANOCLAW_OUTPUT_START---\n(.*?)\n---NANOCLAW_OUTPUT_END---",
|
||||
stdout,
|
||||
re.DOTALL,
|
||||
)
|
||||
if match:
|
||||
try:
|
||||
data = json.loads(match.group(1))
|
||||
status = data.get("status", "unknown")
|
||||
if status == "success":
|
||||
print(data.get("result", ""))
|
||||
session_id = data.get("newSessionId") or data.get("sessionId")
|
||||
if session_id:
|
||||
print(f"\n[session: {session_id}]", file=sys.stderr)
|
||||
else:
|
||||
print(f"[{status}] {data.get('result', '')}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError:
|
||||
print(match.group(1))
|
||||
else:
|
||||
# No structured output — print raw stdout
|
||||
print(stdout)
|
||||
|
||||
if proc.returncode not in (0, None):
|
||||
sys.exit(proc.returncode)
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="claw",
|
||||
description="Run a NanoClaw agent from the command line.",
|
||||
)
|
||||
parser.add_argument("prompt", nargs="?", help="Prompt to send")
|
||||
parser.add_argument("-g", "--group", help="Group name or folder (fuzzy match)")
|
||||
parser.add_argument("-j", "--jid", help="Chat JID (exact)")
|
||||
parser.add_argument("-s", "--session", help="Session ID to resume")
|
||||
parser.add_argument("-p", "--pipe", action="store_true",
|
||||
help="Read prompt from stdin (can be combined with a prompt arg as prefix)")
|
||||
parser.add_argument("--runtime", choices=["docker", "container"],
|
||||
help="Container runtime (default: auto-detect)")
|
||||
parser.add_argument("--image", default=IMAGE, help=f"Container image (default: {IMAGE})")
|
||||
parser.add_argument("--list-groups", action="store_true", help="List registered groups and exit")
|
||||
parser.add_argument("--raw", action="store_true", help="Print raw JSON output")
|
||||
parser.add_argument("--timeout", type=int, default=300, metavar="SECS",
|
||||
help="Max seconds to wait for a response (default: 300)")
|
||||
parser.add_argument("-v", "--verbose", action="store_true",
|
||||
help="Show debug info: cmd, payload (secrets redacted), stdout lines, exit code")
|
||||
args = parser.parse_args()
|
||||
|
||||
global VERBOSE
|
||||
VERBOSE = args.verbose
|
||||
|
||||
groups = get_groups(DB_PATH) if DB_PATH.exists() else []
|
||||
|
||||
if args.list_groups:
|
||||
print(f"{'NAME':<35} {'FOLDER':<30} {'JID'}")
|
||||
print("-" * 100)
|
||||
for g in groups:
|
||||
main_tag = " [main]" if g["is_main"] else ""
|
||||
print(f"{g['name']:<35} {g['folder']:<30} {g['jid']}{main_tag}")
|
||||
return
|
||||
|
||||
# Resolve prompt: --pipe reads stdin, optionally prepended with positional arg
|
||||
if args.pipe or (not sys.stdin.isatty() and not args.prompt):
|
||||
stdin_text = sys.stdin.read().strip()
|
||||
if args.prompt:
|
||||
prompt = f"{args.prompt}\n\n{stdin_text}"
|
||||
else:
|
||||
prompt = stdin_text
|
||||
else:
|
||||
prompt = args.prompt
|
||||
|
||||
if not prompt:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Resolve group → jid
|
||||
jid = args.jid
|
||||
group_name = None
|
||||
is_main = False
|
||||
|
||||
if args.group:
|
||||
g = find_group(groups, args.group)
|
||||
if g is None:
|
||||
sys.exit(f"error: group '{args.group}' not found. Run --list-groups to see options.")
|
||||
jid = g["jid"]
|
||||
group_name = g["name"]
|
||||
is_main = g["is_main"]
|
||||
elif not jid:
|
||||
# Default: main group
|
||||
mains = [g for g in groups if g["is_main"]]
|
||||
if mains:
|
||||
jid = mains[0]["jid"]
|
||||
group_name = mains[0]["name"]
|
||||
is_main = True
|
||||
else:
|
||||
sys.exit("error: no group specified and no main group found. Use -g or -j.")
|
||||
|
||||
runtime = detect_runtime(args.runtime)
|
||||
secrets = read_secrets(ENV_FILE)
|
||||
|
||||
if not secrets:
|
||||
print("warning: no secrets found in .env — agent may not be authenticated", file=sys.stderr)
|
||||
|
||||
payload: dict = {
|
||||
"prompt": prompt,
|
||||
"chatJid": jid,
|
||||
"isMain": is_main,
|
||||
"secrets": secrets,
|
||||
}
|
||||
if group_name:
|
||||
payload["groupFolder"] = group_name
|
||||
if args.session:
|
||||
payload["sessionId"] = args.session
|
||||
payload["resumeAt"] = "latest"
|
||||
|
||||
print(f"[{group_name or jid}] running via {runtime}...", file=sys.stderr)
|
||||
run_container(runtime, args.image, payload, timeout=args.timeout)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user