aegis402/deploy/monitor.py

226 lines
7.7 KiB
Python

"""Aegis402 — autonomous monitor.
Runs every 5 min via systemd timer on the VPS. Each run:
1. Hits /health and parses JSON
2. Hits /mcp/rpc with an initialize and checks status + Mcp-Session-Id
3. Hits /scan without payment header (x402 ON in prod) → expects 402
4. Reads on-chain USDC balance of the receive wallet (Base mainnet)
5. Appends one JSON line to data/monitor.jsonl with all metrics + alerts
Alerts (raised by setting "alerts": [...]):
- service_down health 5xx or non-200
- ingest_stale last ingest > 90 min ago
- mcp_transport_broken /mcp/rpc initialize fails
- x402_misconfig /scan without payment header returns != 402
- first_payment_received USDC balance > 0 for the first time
- balance_changed USDC balance differs from previous run
This script is fully self-contained — only stdlib + httpx (already installed).
Output is human-readable to stdout AND machine-readable to monitor.jsonl.
"""
from __future__ import annotations
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import httpx
ROOT = Path(__file__).resolve().parent.parent
DATA = ROOT / "data"
DATA.mkdir(exist_ok=True)
LOG = DATA / "monitor.jsonl"
STATE = DATA / "monitor_state.json"
BASE_URL = os.environ.get("AEGIS402_PUBLIC_URL", "https://aegis402.vmaxbadge.ch").rstrip("/")
WALLET = os.environ.get("AEGIS402_WALLET", "0x3D1F0F7E51392f85877dB107696d5b3f591E4ff6")
USDC_CONTRACT = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913"
BASE_RPC = os.environ.get("BASE_RPC", "https://mainnet.base.org")
HTTP_TIMEOUT = 12.0
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _load_state() -> dict:
if STATE.exists():
try:
return json.loads(STATE.read_text())
except Exception:
return {}
return {}
def _save_state(s: dict) -> None:
STATE.write_text(json.dumps(s, indent=2))
def check_health(cli: httpx.Client) -> dict:
out = {"ok": False}
try:
r = cli.get(f"{BASE_URL}/health", timeout=HTTP_TIMEOUT)
out["status"] = r.status_code
if r.status_code == 200:
data = r.json()
out["ok"] = bool(data.get("ok"))
out["cves"] = data.get("cves")
out["affected_packages"] = data.get("affected_packages")
out["ingest_age_seconds"] = data.get("ingest_age_seconds")
out["ingest_status"] = data.get("ingest_status")
except Exception as e:
out["error"] = f"{type(e).__name__}: {e}"
return out
def check_mcp_rpc(cli: httpx.Client) -> dict:
out = {"ok": False}
body = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "aegis-monitor", "version": "0.1"},
},
}
try:
r = cli.post(
f"{BASE_URL}/mcp/rpc",
json=body,
headers={"Accept": "application/json, text/event-stream"},
timeout=HTTP_TIMEOUT,
)
out["status"] = r.status_code
out["session_id"] = r.headers.get("Mcp-Session-Id")
if r.status_code == 200:
data = r.json()
res = data.get("result") or {}
out["protocol_version"] = res.get("protocolVersion")
out["server_name"] = (res.get("serverInfo") or {}).get("name")
out["ok"] = out["server_name"] == "aegis402"
except Exception as e:
out["error"] = f"{type(e).__name__}: {e}"
return out
def check_x402_challenge(cli: httpx.Client) -> dict:
"""Hit /scan without X-PAYMENT. Should return 402 with a challenge body."""
out = {"ok": False}
body = {"deps": [{"ecosystem": "npm", "package": "mathjs", "version": "15.1.0"}]}
try:
r = cli.post(f"{BASE_URL}/scan", json=body, timeout=HTTP_TIMEOUT)
out["status"] = r.status_code
if r.status_code == 402:
data = r.json()
accepts = data.get("accepts") or []
out["ok"] = bool(accepts) and accepts[0].get("network") == "base"
if accepts:
out["price_atomic"] = accepts[0].get("maxAmountRequired")
out["pay_to"] = accepts[0].get("payTo")
elif r.status_code == 200:
# x402 disabled — service is giving away free scans
out["error"] = "x402 disabled (scan returned 200 without payment)"
except Exception as e:
out["error"] = f"{type(e).__name__}: {e}"
return out
def check_usdc_balance(cli: httpx.Client) -> dict:
"""Read USDC.balanceOf(WALLET) on Base via public RPC."""
out = {"ok": False, "wallet": WALLET}
# ERC20 balanceOf(address) selector + 32-byte padded address
selector = "0x70a08231"
addr = WALLET.lower().replace("0x", "").rjust(64, "0")
data = selector + addr
body = {
"jsonrpc": "2.0",
"id": 1,
"method": "eth_call",
"params": [{"to": USDC_CONTRACT, "data": data}, "latest"],
}
try:
r = cli.post(BASE_RPC, json=body, timeout=HTTP_TIMEOUT)
out["status"] = r.status_code
if r.status_code == 200:
j = r.json()
hex_val = j.get("result", "0x0")
atomic = int(hex_val, 16) if hex_val and hex_val != "0x" else 0
out["balance_atomic"] = atomic
out["balance_usdc"] = atomic / 1_000_000
out["ok"] = True
except Exception as e:
out["error"] = f"{type(e).__name__}: {e}"
return out
def main() -> int:
state = _load_state()
prev_balance = state.get("last_balance_atomic")
prev_first_payment = state.get("first_payment_received", False)
record: dict = {"ts": _now(), "alerts": []}
with httpx.Client() as cli:
record["health"] = check_health(cli)
record["mcp_rpc"] = check_mcp_rpc(cli)
record["x402_challenge"] = check_x402_challenge(cli)
record["wallet"] = check_usdc_balance(cli)
h = record["health"]
if not h.get("ok"):
record["alerts"].append("service_down")
else:
age = h.get("ingest_age_seconds") or 0
if age > 90 * 60:
record["alerts"].append(f"ingest_stale ({int(age/60)} min)")
if not record["mcp_rpc"].get("ok"):
record["alerts"].append("mcp_transport_broken")
if not record["x402_challenge"].get("ok"):
record["alerts"].append("x402_misconfig")
w = record["wallet"]
if w.get("ok"):
cur = w["balance_atomic"]
if cur > 0 and not prev_first_payment:
record["alerts"].append(f"FIRST_PAYMENT_RECEIVED ({w['balance_usdc']} USDC)")
state["first_payment_received"] = True
elif prev_balance is not None and cur != prev_balance:
delta = (cur - prev_balance) / 1_000_000
record["alerts"].append(f"balance_changed (delta={delta:+f} USDC)")
state["last_balance_atomic"] = cur
state["last_balance_check"] = record["ts"]
_save_state(state)
with LOG.open("a") as f:
f.write(json.dumps(record, separators=(",", ":")) + "\n")
# human-readable summary to stdout
summary = (
f"[{record['ts']}] "
f"health={h.get('ok')} "
f"cves={h.get('cves')} "
f"ingest_age={int(h.get('ingest_age_seconds') or 0)}s "
f"mcp={record['mcp_rpc'].get('ok')} "
f"x402={record['x402_challenge'].get('ok')} "
f"usdc={w.get('balance_usdc') if w.get('ok') else 'ERR'} "
f"alerts={record['alerts'] or 'none'}"
)
print(summary)
return 0 if not record["alerts"] or all(
a.startswith("FIRST_PAYMENT") or a.startswith("balance_changed")
for a in record["alerts"]
) else 1
if __name__ == "__main__":
sys.exit(main())