226 lines
7.7 KiB
Python
226 lines
7.7 KiB
Python
"""Aegis402 — autonomous monitor.
|
|
|
|
Runs every 5 min via systemd timer on the VPS. Each run:
|
|
1. Hits /health and parses JSON
|
|
2. Hits /mcp/rpc with an initialize and checks status + Mcp-Session-Id
|
|
3. Hits /scan without payment header (x402 ON in prod) → expects 402
|
|
4. Reads on-chain USDC balance of the receive wallet (Base mainnet)
|
|
5. Appends one JSON line to data/monitor.jsonl with all metrics + alerts
|
|
|
|
Alerts (raised by setting "alerts": [...]):
|
|
- service_down health 5xx or non-200
|
|
- ingest_stale last ingest > 90 min ago
|
|
- mcp_transport_broken /mcp/rpc initialize fails
|
|
- x402_misconfig /scan without payment header returns != 402
|
|
- first_payment_received USDC balance > 0 for the first time
|
|
- balance_changed USDC balance differs from previous run
|
|
|
|
This script is fully self-contained — only stdlib + httpx (already installed).
|
|
Output is human-readable to stdout AND machine-readable to monitor.jsonl.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
DATA = ROOT / "data"
|
|
DATA.mkdir(exist_ok=True)
|
|
LOG = DATA / "monitor.jsonl"
|
|
STATE = DATA / "monitor_state.json"
|
|
|
|
BASE_URL = os.environ.get("AEGIS402_PUBLIC_URL", "https://aegis402.vmaxbadge.ch").rstrip("/")
|
|
WALLET = os.environ.get("AEGIS402_WALLET", "0x3D1F0F7E51392f85877dB107696d5b3f591E4ff6")
|
|
USDC_CONTRACT = "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913"
|
|
BASE_RPC = os.environ.get("BASE_RPC", "https://mainnet.base.org")
|
|
HTTP_TIMEOUT = 12.0
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _load_state() -> dict:
|
|
if STATE.exists():
|
|
try:
|
|
return json.loads(STATE.read_text())
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _save_state(s: dict) -> None:
|
|
STATE.write_text(json.dumps(s, indent=2))
|
|
|
|
|
|
def check_health(cli: httpx.Client) -> dict:
|
|
out = {"ok": False}
|
|
try:
|
|
r = cli.get(f"{BASE_URL}/health", timeout=HTTP_TIMEOUT)
|
|
out["status"] = r.status_code
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
out["ok"] = bool(data.get("ok"))
|
|
out["cves"] = data.get("cves")
|
|
out["affected_packages"] = data.get("affected_packages")
|
|
out["ingest_age_seconds"] = data.get("ingest_age_seconds")
|
|
out["ingest_status"] = data.get("ingest_status")
|
|
except Exception as e:
|
|
out["error"] = f"{type(e).__name__}: {e}"
|
|
return out
|
|
|
|
|
|
def check_mcp_rpc(cli: httpx.Client) -> dict:
|
|
out = {"ok": False}
|
|
body = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "initialize",
|
|
"params": {
|
|
"protocolVersion": "2025-03-26",
|
|
"capabilities": {},
|
|
"clientInfo": {"name": "aegis-monitor", "version": "0.1"},
|
|
},
|
|
}
|
|
try:
|
|
r = cli.post(
|
|
f"{BASE_URL}/mcp/rpc",
|
|
json=body,
|
|
headers={"Accept": "application/json, text/event-stream"},
|
|
timeout=HTTP_TIMEOUT,
|
|
)
|
|
out["status"] = r.status_code
|
|
out["session_id"] = r.headers.get("Mcp-Session-Id")
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
res = data.get("result") or {}
|
|
out["protocol_version"] = res.get("protocolVersion")
|
|
out["server_name"] = (res.get("serverInfo") or {}).get("name")
|
|
out["ok"] = out["server_name"] == "aegis402"
|
|
except Exception as e:
|
|
out["error"] = f"{type(e).__name__}: {e}"
|
|
return out
|
|
|
|
|
|
def check_x402_challenge(cli: httpx.Client) -> dict:
|
|
"""Hit /scan without X-PAYMENT. Should return 402 with a challenge body."""
|
|
out = {"ok": False}
|
|
body = {"deps": [{"ecosystem": "npm", "package": "mathjs", "version": "15.1.0"}]}
|
|
try:
|
|
r = cli.post(f"{BASE_URL}/scan", json=body, timeout=HTTP_TIMEOUT)
|
|
out["status"] = r.status_code
|
|
if r.status_code == 402:
|
|
data = r.json()
|
|
accepts = data.get("accepts") or []
|
|
out["ok"] = bool(accepts) and accepts[0].get("network") == "base"
|
|
if accepts:
|
|
out["price_atomic"] = accepts[0].get("maxAmountRequired")
|
|
out["pay_to"] = accepts[0].get("payTo")
|
|
elif r.status_code == 200:
|
|
# x402 disabled — service is giving away free scans
|
|
out["error"] = "x402 disabled (scan returned 200 without payment)"
|
|
except Exception as e:
|
|
out["error"] = f"{type(e).__name__}: {e}"
|
|
return out
|
|
|
|
|
|
def check_usdc_balance(cli: httpx.Client) -> dict:
|
|
"""Read USDC.balanceOf(WALLET) on Base via public RPC."""
|
|
out = {"ok": False, "wallet": WALLET}
|
|
# ERC20 balanceOf(address) selector + 32-byte padded address
|
|
selector = "0x70a08231"
|
|
addr = WALLET.lower().replace("0x", "").rjust(64, "0")
|
|
data = selector + addr
|
|
body = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "eth_call",
|
|
"params": [{"to": USDC_CONTRACT, "data": data}, "latest"],
|
|
}
|
|
try:
|
|
r = cli.post(BASE_RPC, json=body, timeout=HTTP_TIMEOUT)
|
|
out["status"] = r.status_code
|
|
if r.status_code == 200:
|
|
j = r.json()
|
|
hex_val = j.get("result", "0x0")
|
|
atomic = int(hex_val, 16) if hex_val and hex_val != "0x" else 0
|
|
out["balance_atomic"] = atomic
|
|
out["balance_usdc"] = atomic / 1_000_000
|
|
out["ok"] = True
|
|
except Exception as e:
|
|
out["error"] = f"{type(e).__name__}: {e}"
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
state = _load_state()
|
|
prev_balance = state.get("last_balance_atomic")
|
|
prev_first_payment = state.get("first_payment_received", False)
|
|
|
|
record: dict = {"ts": _now(), "alerts": []}
|
|
|
|
with httpx.Client() as cli:
|
|
record["health"] = check_health(cli)
|
|
record["mcp_rpc"] = check_mcp_rpc(cli)
|
|
record["x402_challenge"] = check_x402_challenge(cli)
|
|
record["wallet"] = check_usdc_balance(cli)
|
|
|
|
h = record["health"]
|
|
if not h.get("ok"):
|
|
record["alerts"].append("service_down")
|
|
else:
|
|
age = h.get("ingest_age_seconds") or 0
|
|
if age > 90 * 60:
|
|
record["alerts"].append(f"ingest_stale ({int(age/60)} min)")
|
|
|
|
if not record["mcp_rpc"].get("ok"):
|
|
record["alerts"].append("mcp_transport_broken")
|
|
|
|
if not record["x402_challenge"].get("ok"):
|
|
record["alerts"].append("x402_misconfig")
|
|
|
|
w = record["wallet"]
|
|
if w.get("ok"):
|
|
cur = w["balance_atomic"]
|
|
if cur > 0 and not prev_first_payment:
|
|
record["alerts"].append(f"FIRST_PAYMENT_RECEIVED ({w['balance_usdc']} USDC)")
|
|
state["first_payment_received"] = True
|
|
elif prev_balance is not None and cur != prev_balance:
|
|
delta = (cur - prev_balance) / 1_000_000
|
|
record["alerts"].append(f"balance_changed (delta={delta:+f} USDC)")
|
|
state["last_balance_atomic"] = cur
|
|
state["last_balance_check"] = record["ts"]
|
|
|
|
_save_state(state)
|
|
|
|
with LOG.open("a") as f:
|
|
f.write(json.dumps(record, separators=(",", ":")) + "\n")
|
|
|
|
# human-readable summary to stdout
|
|
summary = (
|
|
f"[{record['ts']}] "
|
|
f"health={h.get('ok')} "
|
|
f"cves={h.get('cves')} "
|
|
f"ingest_age={int(h.get('ingest_age_seconds') or 0)}s "
|
|
f"mcp={record['mcp_rpc'].get('ok')} "
|
|
f"x402={record['x402_challenge'].get('ok')} "
|
|
f"usdc={w.get('balance_usdc') if w.get('ok') else 'ERR'} "
|
|
f"alerts={record['alerts'] or 'none'}"
|
|
)
|
|
print(summary)
|
|
|
|
return 0 if not record["alerts"] or all(
|
|
a.startswith("FIRST_PAYMENT") or a.startswith("balance_changed")
|
|
for a in record["alerts"]
|
|
) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|