`tea api` exits 0 even on 401 responses, so `_close_issue` and `_set_labels` appeared to succeed but did nothing. Issues were never actually closed, causing them to be picked up again every cron tick. Switch all write operations (close issue, set labels) and issue-list reads to `fgj`, which has proper authentication. Keep `tea api` only for CI run fetches where `fgj` times out (504). Add ~/go/bin to the cron PATH so fgj is found. Also add an error check in `_tea_get` for API-level error responses, and strip State/InProgress when closing an issue. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
412 lines
14 KiB
Python
Executable File
412 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
agent_loop.py — called from cron every 10 minutes.
|
|
|
|
Flow
|
|
----
|
|
1. Agent already running?
|
|
a. Age > 1 h → kill it, set its issue to State/Question, exit 1
|
|
b. Age ≤ 1 h → print status, exit 0 (let it keep working)
|
|
2. No agent running → extract pending_issue from state (if any), then check CI
|
|
a. CI is running → save pending-ci state, exit 0
|
|
b. Latest CI failed → start fix-CI agent (preserving pending_issue), exit 0
|
|
c. CI ok + pending_issue → close the issue (CI passed), exit 0
|
|
d. CI ok (or no run yet) → find oldest Ready issue, start issue agent,
|
|
save state, exit 0
|
|
e. No Ready issues → print "nothing to do", exit 0
|
|
|
|
Issue agents must NOT close the issue themselves; the loop closes it after CI passes.
|
|
|
|
State file: ~/.sharedinbox-agent-state.json
|
|
{ "pid": 12345, "issue": 91,
|
|
"started_at": "2026-05-15T12:00:00+00:00", "type": "issue" }
|
|
|
|
Output is written to ~/.sharedinbox-agent-logs/<session>-<timestamp>.log.
|
|
Resume the Claude conversation afterward with:
|
|
|
|
claude --resume issue-91
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# Cron runs with a minimal PATH; ensure Nix profile binaries (tea, claude) and ~/go/bin (fgj) are found.
|
|
os.environ["PATH"] = (
|
|
f"{Path.home()}/.nix-profile/bin"
|
|
f":{Path.home()}/go/bin"
|
|
f":{os.environ.get('PATH', '/usr/bin:/bin')}"
|
|
)
|
|
|
|
# ── configuration ─────────────────────────────────────────────────────────────
|
|
|
|
REPO = "guettli/sharedinbox"
|
|
REPO_URL = f"https://codeberg.org/{REPO}"
|
|
STATE_FILE = Path.home() / ".sharedinbox-agent-state.json"
|
|
MAX_AGENT_AGE_SECONDS = 3600 # 1 hour
|
|
CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects" / (
|
|
"-" + str(Path.home())[1:].replace("/", "-")
|
|
)
|
|
|
|
# Labels used by the workflow.
|
|
LABEL_READY = "State/Ready"
|
|
LABEL_IN_PROGRESS = "State/InProgress"
|
|
LABEL_QUESTION = "State/Question"
|
|
LABEL_PRIO_HIGH = "Prio/High"
|
|
|
|
# ── helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _issue_url(number: int) -> str:
|
|
return f"{REPO_URL}/issues/{number}"
|
|
|
|
|
|
def _ci_run_url(run_id: int) -> str:
|
|
return f"{REPO_URL}/actions/runs/{run_id}"
|
|
|
|
|
|
def _fgj(*args: str) -> None:
|
|
"""Run a fgj command, raising on failure."""
|
|
cmd = ["fgj", "--hostname", "codeberg.org", *args]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(
|
|
f"fgj {' '.join(args)} failed:\n{result.stderr or result.stdout}"
|
|
)
|
|
|
|
|
|
def _tea_get(path: str) -> dict | list | None:
|
|
"""Run a tea api GET and return parsed JSON. Only use for reads — tea PATCH/PUT
|
|
silently fails (exits 0) when unauthenticated, so writes must go via fgj."""
|
|
cmd = ["tea", "api", path]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(
|
|
f"tea api {path} failed:\n{result.stderr or result.stdout}"
|
|
)
|
|
out = result.stdout.strip()
|
|
if not out:
|
|
return None
|
|
data = json.loads(out)
|
|
if isinstance(data, dict) and "message" in data and "url" in data:
|
|
raise RuntimeError(f"tea api {path} returned error: {data['message']}")
|
|
return data
|
|
|
|
|
|
def _set_labels(issue: int, add: list[str], remove: list[str]) -> None:
|
|
"""Add/remove labels on an issue via fgj."""
|
|
cmd = ["issue", "edit", str(issue), "--repo", REPO]
|
|
for label in add:
|
|
cmd += ["--add-label", label]
|
|
for label in remove:
|
|
cmd += ["--remove-label", label]
|
|
_fgj(*cmd)
|
|
|
|
|
|
def _close_issue(issue: int) -> None:
|
|
_fgj("issue", "close", str(issue), "--repo", REPO)
|
|
_set_labels(issue, add=[], remove=[LABEL_IN_PROGRESS])
|
|
|
|
|
|
def _ready_issues() -> list[dict]:
|
|
"""Return open issues with State/Ready, Prio/High first, then oldest."""
|
|
result = subprocess.run(
|
|
["fgj", "--hostname", "codeberg.org", "issue", "list",
|
|
"--repo", REPO, "--state", "open", "--json"],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
data = json.loads(result.stdout) if result.stdout.strip() else []
|
|
ready = [
|
|
i for i in data
|
|
if any(lbl["name"] == LABEL_READY for lbl in i.get("labels", []))
|
|
]
|
|
ready.sort(key=lambda i: (
|
|
0 if any(lbl["name"] == LABEL_PRIO_HIGH for lbl in i.get("labels", [])) else 1,
|
|
i["number"],
|
|
))
|
|
return ready
|
|
|
|
|
|
def _latest_ci_run() -> dict | None:
|
|
data = _tea_get(f"repos/{REPO}/actions/runs?limit=1")
|
|
runs = (data or {}).get("workflow_runs", [])
|
|
return runs[0] if runs else None
|
|
|
|
|
|
# ── state file ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _read_state() -> dict | None:
|
|
if STATE_FILE.exists():
|
|
try:
|
|
return json.loads(STATE_FILE.read_text())
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _write_state(pid: int | None, issue: int | None, kind: str, issue_title: str | None = None) -> None:
|
|
data: dict = {
|
|
"pid": pid,
|
|
"issue": issue,
|
|
"started_at": datetime.now(timezone.utc).isoformat(),
|
|
"type": kind,
|
|
}
|
|
if issue_title is not None:
|
|
data["issue_title"] = issue_title
|
|
STATE_FILE.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
def _clear_state() -> None:
|
|
STATE_FILE.unlink(missing_ok=True)
|
|
|
|
|
|
# ── agent launcher ────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _start_agent(prompt: str, session_name: str) -> int:
|
|
"""Start Claude Code as a detached background process and return its PID."""
|
|
log_dir = Path.home() / ".sharedinbox-agent-logs"
|
|
log_dir.mkdir(exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%dT%H%M%S")
|
|
log_file = log_dir / f"{session_name}-{ts}.log"
|
|
|
|
log_fh = open(log_file, "w")
|
|
proc = subprocess.Popen(
|
|
[
|
|
"claude",
|
|
"--dangerously-skip-permissions",
|
|
"--name", session_name,
|
|
"-p", prompt,
|
|
],
|
|
stdin=subprocess.PIPE,
|
|
stdout=log_fh,
|
|
stderr=log_fh,
|
|
start_new_session=True,
|
|
)
|
|
log_fh.close() # Parent closes its copy; the child retains the fd.
|
|
# Answer the workspace-trust dialog; after this the pipe hits EOF.
|
|
proc.stdin.write(b"\n")
|
|
proc.stdin.close()
|
|
|
|
print(f"Started agent pid={proc.pid}, log={log_file}")
|
|
print(f" Resume: claude --resume {shlex.quote(session_name)}")
|
|
return proc.pid
|
|
|
|
|
|
def _agent_alive(state: dict) -> bool:
|
|
"""Return True if the agent process is still running."""
|
|
pid = state.get("pid")
|
|
if pid is None:
|
|
return False
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return True
|
|
|
|
|
|
def _agent_age_seconds(state: dict) -> float:
|
|
"""Seconds elapsed since the agent was launched, from the state file timestamp."""
|
|
try:
|
|
started_at = datetime.fromisoformat(state["started_at"])
|
|
return (datetime.now(timezone.utc) - started_at).total_seconds()
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def _kill_agent(state: dict) -> None:
|
|
"""Forcefully stop the running agent."""
|
|
pid = state.get("pid")
|
|
if pid:
|
|
try:
|
|
os.kill(pid, 9)
|
|
except ProcessLookupError:
|
|
pass
|
|
|
|
|
|
# ── subcommands ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def cmd_list() -> int:
|
|
"""List recent agent-loop sessions, newest first."""
|
|
if not CLAUDE_PROJECTS_DIR.exists():
|
|
print(f"No sessions found (directory missing: {CLAUDE_PROJECTS_DIR})")
|
|
return 0
|
|
|
|
sessions = []
|
|
for jsonl in CLAUDE_PROJECTS_DIR.glob("*.jsonl"):
|
|
agent_name = None
|
|
session_id = None
|
|
try:
|
|
with jsonl.open() as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
d = json.loads(line)
|
|
if d.get("type") == "agent-name":
|
|
agent_name = d.get("agentName")
|
|
session_id = d.get("sessionId")
|
|
break
|
|
except Exception:
|
|
continue
|
|
if agent_name:
|
|
sessions.append((jsonl.stat().st_mtime, agent_name, session_id))
|
|
|
|
if not sessions:
|
|
print("No agent sessions found.")
|
|
return 0
|
|
|
|
sessions.sort(reverse=True)
|
|
total = len(sessions)
|
|
print(f" {'DATE':<16} {'NAME':<20} UUID (use with: claude --resume <uuid>)")
|
|
print(f" {'-'*16} {'-'*20} {'-'*36}")
|
|
for mtime, name, sid in sessions[:20]:
|
|
ts = datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M")
|
|
print(f" {ts:<16} {name:<20} {sid}")
|
|
if total > 20:
|
|
print(f" ... ({total - 20} more)")
|
|
return 0
|
|
|
|
|
|
# ── main flow ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _run_loop() -> int:
|
|
now = datetime.now(timezone.utc)
|
|
print(f"---------------------- Starting {now.strftime('%Y-%m-%d %H:%MZ')}")
|
|
|
|
state = _read_state()
|
|
|
|
# ── 1. Agent already running? ─────────────────────────────────────────────
|
|
if state and _agent_alive(state):
|
|
age = _agent_age_seconds(state)
|
|
issue = state.get("issue")
|
|
kind = state.get("type", "issue")
|
|
pid = state.get("pid", "?")
|
|
|
|
issue_title = state.get("issue_title", "")
|
|
issue_ref = (
|
|
f"{_issue_url(issue)} {issue_title}".strip() if issue else str(issue)
|
|
)
|
|
|
|
if age > MAX_AGENT_AGE_SECONDS:
|
|
print(
|
|
f"Agent pid={pid!r} ({issue_ref}) "
|
|
f"has been running for {age/60:.0f} min — aborting."
|
|
)
|
|
_kill_agent(state)
|
|
_clear_state()
|
|
if issue:
|
|
_set_labels(issue, add=[LABEL_QUESTION], remove=[LABEL_IN_PROGRESS])
|
|
print(f"Set {_issue_url(issue)} to State/Question.")
|
|
return 1
|
|
|
|
print(
|
|
f"Agent pid={pid!r} ({kind}, {issue_ref}) "
|
|
f"still running ({age/60:.0f} min). Waiting."
|
|
)
|
|
return 0
|
|
|
|
# Agent not running (or no state) — extract any pending issue, then clean up.
|
|
pending_issue: int | None = None
|
|
if state:
|
|
pending_issue = state.get("issue")
|
|
_clear_state()
|
|
|
|
# ── 2. Check CI ───────────────────────────────────────────────────────────
|
|
run = _latest_ci_run()
|
|
|
|
if run and run.get("status") == "running":
|
|
print(f"CI run {_ci_run_url(run['id'])} is still running. Waiting.")
|
|
if pending_issue:
|
|
_write_state(None, pending_issue, "pending-ci")
|
|
return 0
|
|
|
|
if run and run.get("status") in ("failure", "error"):
|
|
print(f"CI run {_ci_run_url(run['id'])} failed — starting fix agent.")
|
|
prompt = (
|
|
"The Codeberg CI for guettli/sharedinbox just failed. "
|
|
f"The CI run ID is {run['id']}. "
|
|
"Fetch the CI logs using the task ci-logs command or the Codeberg API. "
|
|
"Identify the failure, fix it, commit, and push. "
|
|
"Verify locally with 'task check' before pushing. "
|
|
"When done, stop."
|
|
)
|
|
pid = _start_agent(prompt, "ci-fix")
|
|
_write_state(pid, pending_issue, "ci-fix")
|
|
return 0
|
|
|
|
# CI is ok (or no run).
|
|
if pending_issue:
|
|
_close_issue(pending_issue)
|
|
print(f"CI passed — closed {_issue_url(pending_issue)}.")
|
|
return 0
|
|
|
|
# Find a Ready issue.
|
|
issues = _ready_issues()
|
|
if not issues:
|
|
print("No issues with State/Ready. Nothing to do.")
|
|
return 0
|
|
|
|
issue = issues[0]
|
|
issue_number = issue["number"]
|
|
issue_title = issue["title"]
|
|
issue_body = issue.get("body", "")
|
|
|
|
print(f"Starting agent for {_issue_url(issue_number)} {issue_title}")
|
|
|
|
# Mark InProgress before starting so the next cron tick sees it even if
|
|
# the agent hasn't had time to do so yet.
|
|
_set_labels(
|
|
issue_number,
|
|
add=[LABEL_IN_PROGRESS],
|
|
remove=[LABEL_READY],
|
|
)
|
|
|
|
prompt = f"""Work on Codeberg issue #{issue_number} in the guettli/sharedinbox repository.
|
|
|
|
Issue title: {issue_title}
|
|
|
|
Issue body:
|
|
{issue_body}
|
|
|
|
Instructions:
|
|
- Understand the issue thoroughly before writing any code.
|
|
- Implement the required change, following the existing code style.
|
|
- Write or update tests as appropriate.
|
|
- Run 'task check' locally and fix any failures before committing.
|
|
- Commit with a descriptive message referencing the issue number (e.g. "feat: ... (#{issue_number})").
|
|
- Push to origin/main.
|
|
- If you hit a blocker you cannot resolve, set the issue label to State/Question
|
|
and stop (do NOT close the issue).
|
|
- When the work is done and pushed, stop. The loop will close the issue after CI passes.
|
|
"""
|
|
|
|
pid = _start_agent(prompt, f"issue-{issue_number}")
|
|
_write_state(pid, issue_number, "issue", issue_title)
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(prog="agent_loop")
|
|
sub = parser.add_subparsers(dest="cmd")
|
|
sub.add_parser("list", help="List recent agent sessions")
|
|
args = parser.parse_args()
|
|
|
|
if args.cmd == "list":
|
|
return cmd_list()
|
|
return _run_loop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|