"""Aegis402 — autonomous monitor. Runs every 5 min via systemd timer on the VPS. Each run: 1. Hits /health and parses JSON 2. Hits /mcp/rpc with an initialize and checks status + Mcp-Session-Id 3. Hits /scan without payment header (x402 ON in prod) → expects 402 4. Reads on-chain USDC balance of the receive wallet (Base mainnet) 5. Appends one JSON line to data/monitor.jsonl with all metrics + alerts Alerts (raised by setting "alerts": [...]): - service_down health 5xx or non-200 - ingest_stale last ingest > 90 min ago - mcp_transport_broken /mcp/rpc initialize fails - x402_misconfig /scan without payment header returns != 402 - first_payment_received USDC balance > 0 for the first time - balance_changed USDC balance differs from previous run This script is fully self-contained — only stdlib + httpx (already installed). Output is human-readable to stdout AND machine-readable to monitor.jsonl. """ from __future__ import annotations import json import os import sys import time from datetime import datetime, timezone from pathlib import Path import httpx ROOT = Path(__file__).resolve().parent.parent DATA = ROOT / "data" DATA.mkdir(exist_ok=True) LOG = DATA / "monitor.jsonl" STATE = DATA / "monitor_state.json" BASE_URL = os.environ.get("AEGIS402_PUBLIC_URL", "https://aegis402.vmaxbadge.ch").rstrip("/") WALLET = os.environ.get("AEGIS402_WALLET", "0x3D1F0F7E51392f85877dB107696d5b3f591E4ff6") USDC_CONTRACT = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" BASE_RPC = os.environ.get("BASE_RPC", "https://mainnet.base.org") HTTP_TIMEOUT = 12.0 def _now() -> str: return datetime.now(timezone.utc).isoformat() def _load_state() -> dict: if STATE.exists(): try: return json.loads(STATE.read_text()) except Exception: return {} return {} def _save_state(s: dict) -> None: STATE.write_text(json.dumps(s, indent=2)) def check_health(cli: httpx.Client) -> dict: out = {"ok": False} try: r = cli.get(f"{BASE_URL}/health", timeout=HTTP_TIMEOUT) out["status"] = r.status_code if r.status_code == 200: data = r.json() out["ok"] = bool(data.get("ok")) out["cves"] = data.get("cves") out["affected_packages"] = data.get("affected_packages") out["ingest_age_seconds"] = data.get("ingest_age_seconds") out["ingest_status"] = data.get("ingest_status") except Exception as e: out["error"] = f"{type(e).__name__}: {e}" return out def check_mcp_rpc(cli: httpx.Client) -> dict: out = {"ok": False} body = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": {}, "clientInfo": {"name": "aegis-monitor", "version": "0.1"}, }, } try: r = cli.post( f"{BASE_URL}/mcp/rpc", json=body, headers={"Accept": "application/json, text/event-stream"}, timeout=HTTP_TIMEOUT, ) out["status"] = r.status_code out["session_id"] = r.headers.get("Mcp-Session-Id") if r.status_code == 200: data = r.json() res = data.get("result") or {} out["protocol_version"] = res.get("protocolVersion") out["server_name"] = (res.get("serverInfo") or {}).get("name") out["ok"] = out["server_name"] == "aegis402" except Exception as e: out["error"] = f"{type(e).__name__}: {e}" return out def check_x402_challenge(cli: httpx.Client) -> dict: """Hit /scan without X-PAYMENT. Should return 402 with a challenge body.""" out = {"ok": False} body = {"deps": [{"ecosystem": "npm", "package": "mathjs", "version": "15.1.0"}]} try: r = cli.post(f"{BASE_URL}/scan", json=body, timeout=HTTP_TIMEOUT) out["status"] = r.status_code if r.status_code == 402: data = r.json() accepts = data.get("accepts") or [] out["ok"] = bool(accepts) and accepts[0].get("network") == "base" if accepts: out["price_atomic"] = accepts[0].get("maxAmountRequired") out["pay_to"] = accepts[0].get("payTo") elif r.status_code == 200: # x402 disabled — service is giving away free scans out["error"] = "x402 disabled (scan returned 200 without payment)" except Exception as e: out["error"] = f"{type(e).__name__}: {e}" return out def check_usdc_balance(cli: httpx.Client) -> dict: """Read USDC.balanceOf(WALLET) on Base via public RPC.""" out = {"ok": False, "wallet": WALLET} # ERC20 balanceOf(address) selector + 32-byte padded address selector = "0x70a08231" addr = WALLET.lower().replace("0x", "").rjust(64, "0") data = selector + addr body = { "jsonrpc": "2.0", "id": 1, "method": "eth_call", "params": [{"to": USDC_CONTRACT, "data": data}, "latest"], } try: r = cli.post(BASE_RPC, json=body, timeout=HTTP_TIMEOUT) out["status"] = r.status_code if r.status_code == 200: j = r.json() hex_val = j.get("result", "0x0") atomic = int(hex_val, 16) if hex_val and hex_val != "0x" else 0 out["balance_atomic"] = atomic out["balance_usdc"] = atomic / 1_000_000 out["ok"] = True except Exception as e: out["error"] = f"{type(e).__name__}: {e}" return out def main() -> int: state = _load_state() prev_balance = state.get("last_balance_atomic") prev_first_payment = state.get("first_payment_received", False) record: dict = {"ts": _now(), "alerts": []} with httpx.Client() as cli: record["health"] = check_health(cli) record["mcp_rpc"] = check_mcp_rpc(cli) record["x402_challenge"] = check_x402_challenge(cli) record["wallet"] = check_usdc_balance(cli) h = record["health"] if not h.get("ok"): record["alerts"].append("service_down") else: age = h.get("ingest_age_seconds") or 0 if age > 90 * 60: record["alerts"].append(f"ingest_stale ({int(age/60)} min)") if not record["mcp_rpc"].get("ok"): record["alerts"].append("mcp_transport_broken") if not record["x402_challenge"].get("ok"): record["alerts"].append("x402_misconfig") w = record["wallet"] if w.get("ok"): cur = w["balance_atomic"] if cur > 0 and not prev_first_payment: record["alerts"].append(f"FIRST_PAYMENT_RECEIVED ({w['balance_usdc']} USDC)") state["first_payment_received"] = True elif prev_balance is not None and cur != prev_balance: delta = (cur - prev_balance) / 1_000_000 record["alerts"].append(f"balance_changed (delta={delta:+f} USDC)") state["last_balance_atomic"] = cur state["last_balance_check"] = record["ts"] _save_state(state) with LOG.open("a") as f: f.write(json.dumps(record, separators=(",", ":")) + "\n") # human-readable summary to stdout summary = ( f"[{record['ts']}] " f"health={h.get('ok')} " f"cves={h.get('cves')} " f"ingest_age={int(h.get('ingest_age_seconds') or 0)}s " f"mcp={record['mcp_rpc'].get('ok')} " f"x402={record['x402_challenge'].get('ok')} " f"usdc={w.get('balance_usdc') if w.get('ok') else 'ERR'} " f"alerts={record['alerts'] or 'none'}" ) print(summary) return 0 if not record["alerts"] or all( a.startswith("FIRST_PAYMENT") or a.startswith("balance_changed") for a in record["alerts"] ) else 1 if __name__ == "__main__": sys.exit(main())