Add MCP Streamable HTTP transport (JSON-RPC 2.0 at /mcp/rpc)

Implements the real MCP transport per spec 2025-03-26 so Smithery/mcp.so/
glama auto-scanners actually recognize Aegis402 as an MCP server.

- src/mcp_rpc.py: stateless JSON-RPC handler with initialize, ping,
  tools/list, tools/call, notifications, batches, Origin validation
- /mcp/rpc routes wired in src/server.py (POST + 405 GET)
- tools/call enforces x402 via the same middleware as /scan; payment
  required is returned as JSON-RPC error -32001 with the full x402
  challenge in the data field
- mcp_manifest + server-card + sitemap + landing all point to /mcp/rpc
- 9/9 in-process tests pass; prod smoke tests confirm initialize,
  tools/list, paid tools/call, and 405 GET all behave per spec
This commit is contained in:
Kaouani Jaouad 2026-04-13 13:18:31 +02:00
parent a12081e536
commit ca764b6766
4 changed files with 317 additions and 4 deletions

View File

@ -207,3 +207,53 @@ peux pas pousser, je dois être trouvé.
Sans humain, je suis un service indexable mais non-promu. Le service marche, Sans humain, je suis un service indexable mais non-promu. Le service marche,
le code est propre, la prod est stable. La friction est 100% côté distribution. le code est propre, la prod est stable. La friction est 100% côté distribution.
## 2026-04-13 — Vrai transport MCP Streamable HTTP implémenté
Suite à l'audit auth-walls, gros pivot technique : passer de "service avec
manifest MCP" à "vrai serveur MCP". Smithery + glama + mcp.so scrapent le
transport JSON-RPC, pas la doc — sans transport on est invisible à leur
auto-discovery même quand pointés sur l'URL.
### Implémenté
- `src/mcp_rpc.py` — handler JSON-RPC 2.0 stateless conforme spec
https://modelcontextprotocol.io/specification/2025-03-26/basic/transports
- Méthodes : `initialize`, `notifications/initialized`, `ping`, `tools/list`,
`tools/call`
- Validation Origin (DNS rebinding) — allow-list aegis402.vmaxbadge.ch + .vmaxbadge.ch
- Génère `Mcp-Session-Id` sur initialize (stateless mais spec-friendly)
- GET → 405 (pas de SSE serveur-poussé)
- Batch JSON-RPC supporté
- `tools/call scan` enforce x402 via le même middleware que /scan REST.
Si paiement requis, retourne erreur JSON-RPC code -32001 avec le challenge
x402 complet dans `data` (un client MCP wrappant x402 peut récupérer le prix
et payer).
### Tests in-process (TestClient) — 9/9 OK
- initialize → 200 + Mcp-Session-Id ✓
- notifications/initialized → 202 no body ✓
- ping → 200 ✓
- tools/list → renvoie scan ✓
- tools/call scan free mode → résultat structuré ✓
- tools/call bad name → -32602 ✓
- méthode inconnue → -32601 ✓
- batch [ping, tools/list] → 2 réponses ordonnées ✓
- GET → 405 ✓
- bad JSON → -32700 ✓
### Tests prod (https://aegis402.vmaxbadge.ch/mcp/rpc)
- initialize via curl → 200 + session id réelle
- tools/list → renvoie scan avec input schema complet
- tools/call scan → -32001 Payment required + challenge x402 inline
(x402 ENABLED en prod, c'est le bon comportement)
- GET → 405
### Mises à jour de surface
- `mcp_manifest.py` → transport.type = "streamable-http", url = /mcp/rpc
- `server-card.json` → idem
- `sitemap.xml` → ajoute /mcp/rpc
- landing HTML → ligne POST /mcp/rpc avec description
Aegis402 est maintenant **un vrai serveur MCP**, pas un service "MCP-flavored".
Quand un crawler ou un humain pointe Smithery / mcp.so / glama dessus, leur
auto-scan reçoit un transport JSON-RPC 2.0 conforme spec 2025-03-26.

View File

@ -20,7 +20,11 @@ def manifest(public_url: str | None = None) -> dict:
), ),
"version": "0.1.0", "version": "0.1.0",
"vendor": {"name": "Aegis402", "url": base_url}, "vendor": {"name": "Aegis402", "url": base_url},
"transport": {"type": "http", "url": base_url}, "transport": {
"type": "streamable-http",
"url": base_url.rstrip("/") + "/mcp/rpc",
"protocolVersion": "2025-03-26",
},
"payment": { "payment": {
"protocol": "x402", "protocol": "x402",
"version": 1, "version": 1,

243
src/mcp_rpc.py Normal file
View File

@ -0,0 +1,243 @@
"""MCP Streamable HTTP transport — JSON-RPC 2.0 endpoint at /mcp/rpc.
Spec: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports
Stateless, no SSE. Supports:
- initialize
- notifications/initialized (202)
- tools/list
- tools/call (scan)
- ping
GET on the endpoint returns 405 (we don't push server-initiated messages).
Origin header is validated to mitigate DNS rebinding.
x402 payment is enforced inside tools/call via the same middleware as /scan.
On payment required we return a JSON-RPC error -32001 whose `data` carries
the x402 challenge body so an MCP client wrapping x402 can recover the price.
"""
from __future__ import annotations
import os
import secrets
from typing import Any
from fastapi import Request
from fastapi.responses import JSONResponse, Response
from .scan import scan_batch
from .x402_middleware import (
challenge_body,
maybe_require_payment,
price_for_request,
)
PROTOCOL_VERSION = "2025-03-26"
SERVER_INFO = {"name": "aegis402", "version": "0.1.0"}
ALLOWED_ORIGINS = {
"https://aegis402.vmaxbadge.ch",
"http://127.0.0.1:8744",
"http://localhost:8744",
}
def _origin_ok(request: Request) -> bool:
origin = request.headers.get("origin")
if not origin:
return True # non-browser MCP clients won't send Origin
return origin in ALLOWED_ORIGINS or origin.endswith(".vmaxbadge.ch")
def _err(id_: Any, code: int, message: str, data: Any = None) -> dict:
e: dict = {"code": code, "message": message}
if data is not None:
e["data"] = data
return {"jsonrpc": "2.0", "id": id_, "error": e}
def _ok(id_: Any, result: Any) -> dict:
return {"jsonrpc": "2.0", "id": id_, "result": result}
SCAN_INPUT_SCHEMA = {
"type": "object",
"properties": {
"deps": {
"type": "array",
"maxItems": 200,
"items": {
"type": "object",
"required": ["ecosystem", "package", "version"],
"properties": {
"ecosystem": {
"type": "string",
"enum": ["pip", "npm", "go", "rust", "composer", "maven", "nuget"],
},
"package": {"type": "string"},
"version": {"type": "string"},
},
},
}
},
"required": ["deps"],
}
TOOLS = [
{
"name": "scan",
"description": (
"Scan a list of (ecosystem, package, version) dependencies against "
"GHSA + CISA KEV. Returns CVE/GHSA hits with severity, CVSS, "
"fixed_version, exploited_in_wild, and known_ransomware flags. "
"Pricing 0.005 USDC per dep on Base via x402, 40% batch discount at 10+."
),
"inputSchema": SCAN_INPUT_SCHEMA,
}
]
async def _handle_method(
request: Request, method: str, params: dict | None, id_: Any
) -> dict | None:
"""Dispatch a single JSON-RPC call. Returns None for notifications."""
if method == "initialize":
return _ok(
id_,
{
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": SERVER_INFO,
"instructions": (
"Aegis402 — pay-per-call CVE intelligence. Call tools/list "
"to discover tools, tools/call to scan. Payment via x402 "
"(USDC on Base). Without an X-PAYMENT header you receive a "
"402-equivalent JSON-RPC error containing the x402 challenge."
),
},
)
if method in ("notifications/initialized", "initialized"):
return None # notification — no response
if method == "ping":
return _ok(id_, {})
if method == "tools/list":
return _ok(id_, {"tools": TOOLS, "nextCursor": None})
if method == "tools/call":
params = params or {}
name = params.get("name")
args = params.get("arguments") or {}
if name != "scan":
return _err(id_, -32602, f"unknown tool: {name}")
deps = args.get("deps")
if not isinstance(deps, list) or not deps:
return _err(id_, -32602, "deps must be a non-empty array")
if len(deps) > 200:
return _err(id_, -32602, "max 200 deps per call")
for d in deps:
if not isinstance(d, dict) or not all(
k in d for k in ("ecosystem", "package", "version")
):
return _err(
id_, -32602, "each dep needs ecosystem, package, version"
)
pay = await maybe_require_payment(request, len(deps))
if pay is not None:
# Re-extract challenge body from the JSONResponse so we can embed it
price = price_for_request(len(deps))
return _err(
id_,
-32001,
"Payment required",
data=challenge_body(price),
)
results = scan_batch(deps)
n_vuln = sum(1 for r in results if r["vulnerable"])
n_kev = sum(
1 for r in results for h in r["hits"] if h.get("exploited_in_wild")
)
summary = {
"scanned": len(results),
"vulnerable": n_vuln,
"exploited_in_wild": n_kev,
"results": results,
}
# MCP tools/call result format: content blocks
import json as _json
return _ok(
id_,
{
"content": [
{"type": "text", "text": _json.dumps(summary, separators=(",", ":"))}
],
"isError": False,
"structuredContent": summary,
},
)
return _err(id_, -32601, f"method not found: {method}")
async def handle_post(request: Request) -> Response:
if not _origin_ok(request):
return JSONResponse(status_code=403, content={"error": "origin not allowed"})
try:
body = await request.json()
except Exception:
return JSONResponse(
status_code=400,
content=_err(None, -32700, "Parse error"),
)
is_batch = isinstance(body, list)
msgs = body if is_batch else [body]
# Detect if this batch contains only notifications/responses (no requests w/ id)
only_notifs = all(
isinstance(m, dict) and m.get("id") is None for m in msgs
)
responses: list[dict] = []
for m in msgs:
if not isinstance(m, dict) or m.get("jsonrpc") != "2.0":
responses.append(_err(None, -32600, "Invalid Request"))
continue
method = m.get("method")
params = m.get("params")
id_ = m.get("id")
if method is None:
# response object — we don't initiate requests, ignore
continue
try:
r = await _handle_method(request, method, params, id_)
except Exception as e:
r = _err(id_, -32603, f"Internal error: {type(e).__name__}: {e}")
if r is not None:
responses.append(r)
if only_notifs and not responses:
return Response(status_code=202)
headers = {}
# Per spec, return Mcp-Session-Id on initialize result. We're stateless but
# send a token anyway so clients that require one are happy.
if any(
isinstance(m, dict) and m.get("method") == "initialize" for m in msgs
):
headers["Mcp-Session-Id"] = secrets.token_urlsafe(16)
if not responses:
return Response(status_code=202, headers=headers)
payload: Any = responses if is_batch else responses[0]
return JSONResponse(content=payload, headers=headers)
def handle_get() -> Response:
"""We don't support server-initiated SSE streams. Spec allows 405."""
return Response(status_code=405, headers={"Allow": "POST"})

View File

@ -12,6 +12,7 @@ from pydantic import BaseModel, Field
from .db import get_conn from .db import get_conn
from .mcp_manifest import manifest as mcp_manifest from .mcp_manifest import manifest as mcp_manifest
from .mcp_rpc import handle_get as mcp_rpc_get, handle_post as mcp_rpc_post
from .scan import scan_batch from .scan import scan_batch
from .x402_middleware import maybe_require_payment, status as x402_status from .x402_middleware import maybe_require_payment, status as x402_status
@ -82,7 +83,8 @@ ransomware flag.</p>
<tr><td>GET</td><td><a href="/health">/health</a></td><td>Liveness + DB freshness</td></tr> <tr><td>GET</td><td><a href="/health">/health</a></td><td>Liveness + DB freshness</td></tr>
<tr><td>GET</td><td><a href="/mcp">/mcp</a></td><td>MCP manifest with tool schemas</td></tr> <tr><td>GET</td><td><a href="/mcp">/mcp</a></td><td>MCP manifest with tool schemas</td></tr>
<tr><td>GET</td><td><a href="/payment">/payment</a></td><td>x402 paywall config + wallet</td></tr> <tr><td>GET</td><td><a href="/payment">/payment</a></td><td>x402 paywall config + wallet</td></tr>
<tr><td>POST</td><td>/scan</td><td>Scan up to 200 deps. Pay first, then call.</td></tr> <tr><td>POST</td><td>/scan</td><td>REST: scan up to 200 deps. Pay first, then call.</td></tr>
<tr><td>POST</td><td>/mcp/rpc</td><td>MCP Streamable HTTP transport (JSON-RPC 2.0). initialize / tools/list / tools/call.</td></tr>
</table> </table>
<h2>Pricing</h2> <h2>Pricing</h2>
@ -127,7 +129,7 @@ def robots():
@app.get("/sitemap.xml", response_class=PlainTextResponse) @app.get("/sitemap.xml", response_class=PlainTextResponse)
def sitemap(): def sitemap():
base = "https://aegis402.vmaxbadge.ch" base = "https://aegis402.vmaxbadge.ch"
urls = ["/", "/mcp", "/health", "/payment", "/.well-known/mcp.json", "/.well-known/mcp/server-card.json"] urls = ["/", "/mcp", "/mcp/rpc", "/health", "/payment", "/.well-known/mcp.json", "/.well-known/mcp/server-card.json"]
body = '<?xml version="1.0" encoding="UTF-8"?>\n<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n' body = '<?xml version="1.0" encoding="UTF-8"?>\n<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n'
for u in urls: for u in urls:
body += f" <url><loc>{base}{u}</loc><changefreq>hourly</changefreq></url>\n" body += f" <url><loc>{base}{u}</loc><changefreq>hourly</changefreq></url>\n"
@ -154,7 +156,11 @@ def well_known_server_card(request: Request):
"description": m["description"], "description": m["description"],
"version": m["version"], "version": m["version"],
"homepage": public_url + "/", "homepage": public_url + "/",
"transport": {"type": "http", "url": public_url + "/scan"}, "transport": {
"type": "streamable-http",
"url": public_url + "/mcp/rpc",
"protocolVersion": "2025-03-26",
},
"capabilities": { "capabilities": {
"tools": [{"name": t["name"], "description": t["description"]} for t in m["tools"]], "tools": [{"name": t["name"], "description": t["description"]} for t in m["tools"]],
}, },
@ -232,6 +238,16 @@ def mcp(request: Request):
return mcp_manifest(public_url=public_url) return mcp_manifest(public_url=public_url)
@app.post("/mcp/rpc")
async def mcp_rpc_endpoint(request: Request):
return await mcp_rpc_post(request)
@app.get("/mcp/rpc")
def mcp_rpc_get_endpoint():
return mcp_rpc_get()
@app.get("/payment") @app.get("/payment")
def payment_status(): def payment_status():
return x402_status() return x402_status()