542 lines
24 KiB
Python
542 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""Read-only AI Workspace context MCP server.
|
|
|
|
This server intentionally exposes bounded local evidence and canonical memory. It
|
|
does not capture traffic, send messages, or promote memory. Capture lifecycle is
|
|
owned by the AI Workspace Service Manager.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.parse
|
|
from datetime import date, datetime, timedelta
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[3]
|
|
PROTOCOL_VERSION = "2025-06-18"
|
|
SERVER_NAME = "aiw-context-mcp"
|
|
SERVER_VERSION = "0.1.0"
|
|
LOCAL_ENV = ROOT / "scripts" / "mattermost-proxy" / ".env"
|
|
|
|
|
|
def load_local_env(path: Path = LOCAL_ENV) -> None:
|
|
if not path.is_file():
|
|
return
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
if line.startswith("export "):
|
|
line = line[len("export ") :].strip()
|
|
key, value = line.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip("'\"")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def profile_dir(profile: str) -> Path:
|
|
if profile == "fidelity":
|
|
return ROOT
|
|
candidate = ROOT / "profiles" / profile
|
|
return candidate if candidate.exists() else ROOT
|
|
|
|
|
|
def knowledge_dir(profile: str) -> Path:
|
|
base = profile_dir(profile)
|
|
candidate = base / "project-knowledge"
|
|
return candidate if candidate.exists() else ROOT / "project-knowledge"
|
|
|
|
|
|
def inbox_dir(profile: str) -> Path:
|
|
base = profile_dir(profile)
|
|
candidate = base / "ai" / "inbox"
|
|
return candidate if candidate.exists() else ROOT / "ai" / "inbox"
|
|
|
|
|
|
def mattermost_mirror_dir(profile: str) -> Path:
|
|
configured = os.getenv("MATTERMOST_MIRROR_DIR", "").strip()
|
|
if configured:
|
|
path = Path(configured).expanduser()
|
|
return path if path.is_absolute() else ROOT / path
|
|
return inbox_dir(profile) / "mattermost-mirror"
|
|
|
|
|
|
def profile_context_channels(profile: str, source: str = "mattermost") -> set[str]:
|
|
path = ROOT / "profiles" / profile / "context-sources.json"
|
|
if not path.is_file():
|
|
return set()
|
|
try:
|
|
config = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return set()
|
|
channels = (((config.get("communication_sources") or {}).get(source) or {}).get("context_channels") or [])
|
|
return {str(item).strip().lower() for item in channels if str(item).strip()}
|
|
|
|
|
|
def photo_inbox_dir(profile: str) -> Path:
|
|
configured = os.getenv("PHOTO_INBOX_DIR", "").strip()
|
|
if configured:
|
|
return Path(configured).expanduser()
|
|
linked = inbox_dir(profile) / "photos"
|
|
if linked.exists():
|
|
return linked
|
|
return Path.home() / "Pictures" / "Photo Inbox"
|
|
|
|
|
|
def parse_channels(raw: str | None, profile: str | None = None) -> set[str]:
|
|
if not raw:
|
|
raw = os.getenv("AIW_MATTERMOST_CONTEXT_CHANNELS", "") or os.getenv("AIW_MATTERMOST_PROJECT_CHANNELS", "")
|
|
channels = {item.strip().lower() for item in (raw or "").split(",") if item.strip()}
|
|
if channels:
|
|
return channels
|
|
return profile_context_channels(profile or "fidelity") if profile else set()
|
|
|
|
|
|
def should_use_all_channels(args: dict[str, Any]) -> bool:
|
|
return bool(args.get("include_all_channels") or args.get("all_channels"))
|
|
|
|
|
|
def previous_workday(today: date) -> date:
|
|
day = today - timedelta(days=1)
|
|
while day.weekday() >= 5:
|
|
day -= timedelta(days=1)
|
|
return day
|
|
|
|
|
|
def read_jsonl(path: Path, limit: int | None = None) -> list[dict[str, Any]]:
|
|
if not path.is_file():
|
|
return []
|
|
records: list[dict[str, Any]] = []
|
|
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
records.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if limit and limit > 0:
|
|
return records[-limit:]
|
|
return records
|
|
|
|
|
|
def filter_channels(records: list[dict[str, Any]], channels: set[str]) -> list[dict[str, Any]]:
|
|
if not channels:
|
|
return records
|
|
return [
|
|
item
|
|
for item in records
|
|
if str(item.get("channel_name", "")).lower() in channels
|
|
or str(item.get("channel_id", "")).lower() in channels
|
|
]
|
|
|
|
|
|
def daily_by_date_path(profile: str, day: date) -> Path:
|
|
return mattermost_mirror_dir(profile) / "by-date" / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl"
|
|
|
|
|
|
def daily_channel_path(profile: str, channel: str, day: date) -> Path:
|
|
return mattermost_mirror_dir(profile) / "channels" / channel / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl"
|
|
|
|
|
|
def tool_result(data: Any, text_prefix: str | None = None) -> dict[str, Any]:
|
|
text = json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True)
|
|
if text_prefix:
|
|
text = f"{text_prefix}\n{text}"
|
|
return {
|
|
"content": [{"type": "text", "text": text}],
|
|
"structuredContent": data,
|
|
"isError": False,
|
|
}
|
|
|
|
|
|
def tool_error(message: str, data: Any | None = None) -> dict[str, Any]:
|
|
payload = {"error": message, "data": data}
|
|
return {
|
|
"content": [{"type": "text", "text": json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)}],
|
|
"structuredContent": payload,
|
|
"isError": True,
|
|
}
|
|
|
|
|
|
def as_date(raw: str | None) -> date:
|
|
return datetime.strptime(raw, "%Y-%m-%d").date() if raw else date.today()
|
|
|
|
|
|
def clamp_limit(value: Any, default: int = 80, maximum: int = 300) -> int:
|
|
try:
|
|
limit = int(value)
|
|
except (TypeError, ValueError):
|
|
limit = default
|
|
if limit <= 0:
|
|
return default
|
|
return min(limit, maximum)
|
|
|
|
|
|
def list_profiles(_: dict[str, Any]) -> dict[str, Any]:
|
|
profiles = sorted(path.name for path in (ROOT / "profiles").iterdir() if (path / "profile.md").is_file())
|
|
return tool_result({"profiles": profiles, "active_default": os.getenv("AIW_PROJECT_PROFILE", "fidelity")})
|
|
|
|
|
|
def communication_latest(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
limit = clamp_limit(args.get("limit"), default=50)
|
|
channels = set() if should_use_all_channels(args) else parse_channels(args.get("channels"), profile=profile)
|
|
records = filter_channels(read_jsonl(mattermost_mirror_dir(profile) / "latest.jsonl", limit=None), channels)[-limit:]
|
|
return tool_result({"profile": profile, "source": "mattermost", "evidence_type": "communication", "canonical": False, "channel_scope": "all" if not channels else "profile", "channels": sorted(channels), "records": records})
|
|
|
|
|
|
def communication_date_context(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
day = as_date(args.get("date"))
|
|
limit = clamp_limit(args.get("limit"), default=100)
|
|
channels = set() if should_use_all_channels(args) else parse_channels(args.get("channels"), profile=profile)
|
|
records = filter_channels(read_jsonl(daily_by_date_path(profile, day), limit=None), channels)[-limit:]
|
|
return tool_result({"profile": profile, "source": "mattermost", "date": day.isoformat(), "channels": sorted(channels), "records": records, "canonical": False})
|
|
|
|
|
|
def communication_standup_context(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
today = as_date(args.get("today") or args.get("date"))
|
|
previous = previous_workday(today)
|
|
limit = clamp_limit(args.get("limit"), default=80)
|
|
channels = set() if should_use_all_channels(args) else parse_channels(args.get("channels"), profile=profile)
|
|
previous_records = filter_channels(read_jsonl(daily_by_date_path(profile, previous)), channels)[-limit:]
|
|
today_records = filter_channels(read_jsonl(daily_by_date_path(profile, today)), channels)[-limit:]
|
|
return tool_result({
|
|
"profile": profile,
|
|
"source": "mattermost",
|
|
"mode": "standup",
|
|
"canonical": False,
|
|
"channels": sorted(channels),
|
|
"previous_workday": previous.isoformat(),
|
|
"today": today.isoformat(),
|
|
"previous_records": previous_records,
|
|
"today_records": today_records,
|
|
})
|
|
|
|
|
|
def communication_channel_context(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
channel = str(args.get("channel") or "").strip()
|
|
if not channel:
|
|
return tool_error("channel is required")
|
|
day = as_date(args.get("date"))
|
|
limit = clamp_limit(args.get("limit"), default=100)
|
|
records = read_jsonl(daily_channel_path(profile, channel, day), limit=limit)
|
|
return tool_result({"profile": profile, "source": "mattermost", "channel": channel, "date": day.isoformat(), "records": records, "canonical": False})
|
|
|
|
|
|
def communication_thread_context(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
thread_id = str(args.get("thread_id") or "").strip()
|
|
if not thread_id:
|
|
return tool_error("thread_id is required")
|
|
limit = clamp_limit(args.get("limit"), default=100)
|
|
path = mattermost_mirror_dir(profile) / "threads" / f"{thread_id}.jsonl"
|
|
records = read_jsonl(path, limit=limit)
|
|
return tool_result({"profile": profile, "source": "mattermost", "thread_id": thread_id, "records": records, "canonical": False})
|
|
|
|
|
|
def project_current_context(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
base = knowledge_dir(profile)
|
|
files = [base / "01-current" / "current-work.md", base / "01-current" / "work-items.md"]
|
|
result = []
|
|
for path in files:
|
|
if path.is_file():
|
|
result.append({"path": str(path.relative_to(ROOT)), "text": path.read_text(encoding="utf-8", errors="replace")})
|
|
return tool_result({"profile": profile, "canonical": True, "files": result})
|
|
|
|
|
|
def project_search_memory(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
query = str(args.get("query") or "").strip().lower()
|
|
if not query:
|
|
return tool_error("query is required")
|
|
limit = clamp_limit(args.get("limit"), default=10, maximum=50)
|
|
base = knowledge_dir(profile)
|
|
matches: list[dict[str, Any]] = []
|
|
for path in sorted(base.rglob("*.md")):
|
|
rel = path.relative_to(base)
|
|
if str(rel).startswith("09-templates/"):
|
|
continue
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
lowered = text.lower()
|
|
index = lowered.find(query)
|
|
if index < 0:
|
|
continue
|
|
start = max(0, index - 220)
|
|
end = min(len(text), index + len(query) + 220)
|
|
matches.append({"path": str(path.relative_to(ROOT)), "snippet": text[start:end].strip()})
|
|
if len(matches) >= limit:
|
|
break
|
|
return tool_result({"profile": profile, "canonical": True, "query": query, "matches": matches})
|
|
|
|
|
|
def photos_latest(args: dict[str, Any]) -> dict[str, Any]:
|
|
profile = str(args.get("profile") or "fidelity")
|
|
limit = clamp_limit(args.get("limit"), default=20, maximum=100)
|
|
base = photo_inbox_dir(profile)
|
|
photos = []
|
|
if base.exists():
|
|
candidates = [path for path in base.iterdir() if path.is_file() and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".heic"}]
|
|
for path in sorted(candidates, key=lambda item: item.stat().st_mtime)[-limit:]:
|
|
photos.append({"path": str(path), "modified_at": datetime.fromtimestamp(path.stat().st_mtime).isoformat(), "bytes": path.stat().st_size})
|
|
return tool_result({"profile": profile, "source": "photo-inbox", "canonical": False, "photos": photos})
|
|
|
|
|
|
def resource_definitions() -> list[dict[str, Any]]:
|
|
resources: list[dict[str, Any]] = []
|
|
for profile in sorted(path.name for path in (ROOT / "profiles").iterdir() if (path / "profile.md").is_file()):
|
|
resources.extend([
|
|
{
|
|
"uri": f"aiw://profiles/{profile}/current-work",
|
|
"name": f"{profile}-current-work",
|
|
"title": f"{profile} Current Work",
|
|
"description": "Canonical current work context.",
|
|
"mimeType": "text/markdown",
|
|
"annotations": {"audience": ["assistant"], "priority": 0.95},
|
|
},
|
|
{
|
|
"uri": f"aiw://profiles/{profile}/work-items",
|
|
"name": f"{profile}-work-items",
|
|
"title": f"{profile} Active Work Items",
|
|
"description": "Canonical active work-item summary.",
|
|
"mimeType": "text/markdown",
|
|
"annotations": {"audience": ["assistant"], "priority": 0.9},
|
|
},
|
|
{
|
|
"uri": f"aiw://profiles/{profile}/mattermost/latest",
|
|
"name": f"{profile}-mattermost-latest",
|
|
"title": f"{profile} Mattermost Latest",
|
|
"description": "Profile-filtered latest Mattermost mirror evidence as JSON.",
|
|
"mimeType": "application/json",
|
|
"annotations": {"audience": ["assistant"], "priority": 0.75},
|
|
},
|
|
{
|
|
"uri": f"aiw://profiles/{profile}/photos/latest",
|
|
"name": f"{profile}-photos-latest",
|
|
"title": f"{profile} Photo Inbox Latest",
|
|
"description": "Latest local Photo Inbox file metadata as JSON; image data is not embedded.",
|
|
"mimeType": "application/json",
|
|
"annotations": {"audience": ["assistant"], "priority": 0.45},
|
|
},
|
|
])
|
|
return resources
|
|
|
|
|
|
def read_resource(uri: str) -> dict[str, Any] | None:
|
|
parsed = urllib.parse.urlparse(uri)
|
|
if parsed.scheme != "aiw":
|
|
return None
|
|
parts = [part for part in parsed.path.split("/") if part]
|
|
if parsed.netloc != "profiles" or len(parts) < 2:
|
|
return None
|
|
profile = parts[0]
|
|
resource = "/".join(parts[1:])
|
|
base = knowledge_dir(profile)
|
|
if resource == "current-work":
|
|
path = base / "01-current" / "current-work.md"
|
|
if not path.is_file():
|
|
return None
|
|
return {"uri": uri, "mimeType": "text/markdown", "text": path.read_text(encoding="utf-8", errors="replace")}
|
|
if resource == "work-items":
|
|
path = base / "01-current" / "work-items.md"
|
|
if not path.is_file():
|
|
return None
|
|
return {"uri": uri, "mimeType": "text/markdown", "text": path.read_text(encoding="utf-8", errors="replace")}
|
|
if resource == "mattermost/latest":
|
|
data = communication_latest({"profile": profile, "limit": 80})["structuredContent"]
|
|
return {"uri": uri, "mimeType": "application/json", "text": json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True)}
|
|
if resource == "photos/latest":
|
|
data = photos_latest({"profile": profile, "limit": 30})["structuredContent"]
|
|
return {"uri": uri, "mimeType": "application/json", "text": json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True)}
|
|
return None
|
|
|
|
|
|
TOOLS: dict[str, dict[str, Any]] = {
|
|
"context_profiles": {"handler": list_profiles, "description": "List AI Workspace project profiles.", "properties": {}},
|
|
"communication_latest": {"handler": communication_latest, "description": "Read bounded latest Mattermost mirror evidence, filtered to the profile's context channels by default.", "properties": {"profile": {"type": "string"}, "channels": {"type": "string"}, "include_all_channels": {"type": "boolean"}, "limit": {"type": "integer"}}},
|
|
"communication_date_context": {"handler": communication_date_context, "description": "Read Mattermost mirror evidence for one date, filtered to profile context channels by default unless include_all_channels is true.", "properties": {"profile": {"type": "string"}, "date": {"type": "string"}, "channels": {"type": "string"}, "include_all_channels": {"type": "boolean"}, "limit": {"type": "integer"}}},
|
|
"communication_standup_context": {"handler": communication_standup_context, "description": "Read previous-workday and today Mattermost evidence for standup drafting, filtered to profile context channels by default.", "properties": {"profile": {"type": "string"}, "today": {"type": "string"}, "channels": {"type": "string"}, "include_all_channels": {"type": "boolean"}, "limit": {"type": "integer"}}},
|
|
"communication_channel_context": {"handler": communication_channel_context, "description": "Read Mattermost mirror evidence for a channel and date.", "properties": {"profile": {"type": "string"}, "channel": {"type": "string"}, "date": {"type": "string"}, "limit": {"type": "integer"}}},
|
|
"communication_thread_context": {"handler": communication_thread_context, "description": "Read Mattermost mirror evidence for a thread id.", "properties": {"profile": {"type": "string"}, "thread_id": {"type": "string"}, "limit": {"type": "integer"}}},
|
|
"project_current_context": {"handler": project_current_context, "description": "Read canonical current-work and work-items context.", "properties": {"profile": {"type": "string"}}},
|
|
"project_search_memory": {"handler": project_search_memory, "description": "Search canonical project-knowledge Markdown files.", "properties": {"profile": {"type": "string"}, "query": {"type": "string"}, "limit": {"type": "integer"}}},
|
|
"photos_latest": {"handler": photos_latest, "description": "List recent local Photo Inbox files without embedding image data.", "properties": {"profile": {"type": "string"}, "limit": {"type": "integer"}}},
|
|
}
|
|
|
|
|
|
def tool_definitions() -> list[dict[str, Any]]:
|
|
definitions = []
|
|
for name, item in TOOLS.items():
|
|
definitions.append({
|
|
"name": name,
|
|
"title": name.replace("_", " ").title(),
|
|
"description": item["description"],
|
|
"inputSchema": {"type": "object", "properties": item["properties"], "additionalProperties": False},
|
|
"annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False},
|
|
})
|
|
return definitions
|
|
|
|
|
|
def jsonrpc_response(request_id: Any, result: Any) -> dict[str, Any]:
|
|
return {"jsonrpc": "2.0", "id": request_id, "result": result}
|
|
|
|
|
|
def jsonrpc_error(request_id: Any, code: int, message: str, data: Any | None = None) -> dict[str, Any]:
|
|
error: dict[str, Any] = {"code": code, "message": message}
|
|
if data is not None:
|
|
error["data"] = data
|
|
return {"jsonrpc": "2.0", "id": request_id, "error": error}
|
|
|
|
|
|
def handle_request(message: dict[str, Any]) -> dict[str, Any] | None:
|
|
method = message.get("method")
|
|
request_id = message.get("id")
|
|
params = message.get("params") or {}
|
|
if request_id is None:
|
|
return None
|
|
if method == "initialize":
|
|
requested = str(params.get("protocolVersion") or PROTOCOL_VERSION)
|
|
return jsonrpc_response(request_id, {
|
|
"protocolVersion": requested if requested in {PROTOCOL_VERSION, "2025-03-26"} else PROTOCOL_VERSION,
|
|
"capabilities": {"tools": {"listChanged": False}, "resources": {"listChanged": False}},
|
|
"serverInfo": {"name": SERVER_NAME, "title": "AI Workspace Context MCP", "version": SERVER_VERSION},
|
|
"instructions": "Read-only local AI Workspace context. Evidence is not canonical memory unless promoted by the agent/user.",
|
|
})
|
|
if method == "ping":
|
|
return jsonrpc_response(request_id, {})
|
|
if method == "tools/list":
|
|
return jsonrpc_response(request_id, {"tools": tool_definitions()})
|
|
if method == "resources/list":
|
|
return jsonrpc_response(request_id, {"resources": resource_definitions()})
|
|
if method == "resources/read":
|
|
uri = str(params.get("uri") or "")
|
|
content = read_resource(uri)
|
|
if content is None:
|
|
return jsonrpc_error(request_id, -32002, "Resource not found", {"uri": uri})
|
|
return jsonrpc_response(request_id, {"contents": [content]})
|
|
if method == "resources/templates/list":
|
|
return jsonrpc_response(request_id, {"resourceTemplates": []})
|
|
if method == "tools/call":
|
|
name = str(params.get("name") or "")
|
|
arguments = params.get("arguments") or {}
|
|
tool = TOOLS.get(name)
|
|
if tool is None:
|
|
return jsonrpc_error(request_id, -32602, f"Unknown tool: {name}")
|
|
try:
|
|
return jsonrpc_response(request_id, tool["handler"](arguments))
|
|
except Exception as error: # Keep protocol alive; report tool failure.
|
|
return jsonrpc_response(request_id, tool_error(str(error)))
|
|
return jsonrpc_error(request_id, -32601, f"Method not found: {method}")
|
|
|
|
|
|
class MCPHandler(BaseHTTPRequestHandler):
|
|
server_version = "AIWContextMCP/0.1"
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path == "/health":
|
|
self.send_json(HTTPStatus.OK, {"status": "ok", "server": SERVER_NAME, "version": SERVER_VERSION})
|
|
return
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path == "/mcp":
|
|
self.send_error(HTTPStatus.METHOD_NOT_ALLOWED, "SSE GET stream is not implemented")
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
if parsed.path != "/mcp":
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return
|
|
if not self.origin_allowed():
|
|
self.send_error(HTTPStatus.FORBIDDEN, "origin not allowed")
|
|
return
|
|
length = int(self.headers.get("Content-Length") or 0)
|
|
if length <= 0 or length > 2_000_000:
|
|
self.send_error(HTTPStatus.BAD_REQUEST, "invalid body length")
|
|
return
|
|
try:
|
|
message = json.loads(self.rfile.read(length).decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32700, "Parse error"))
|
|
return
|
|
if not isinstance(message, dict):
|
|
self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32600, "Invalid Request"))
|
|
return
|
|
response = handle_request(message)
|
|
if response is None:
|
|
self.send_response(HTTPStatus.ACCEPTED)
|
|
self.end_headers()
|
|
return
|
|
self.send_json(HTTPStatus.OK, response, mcp=True)
|
|
|
|
def origin_allowed(self) -> bool:
|
|
origin = self.headers.get("Origin")
|
|
if not origin:
|
|
return True
|
|
parsed = urllib.parse.urlparse(origin)
|
|
return parsed.hostname in {"127.0.0.1", "localhost", "::1"}
|
|
|
|
def send_json(self, status: HTTPStatus, payload: dict[str, Any], mcp: bool = False) -> None:
|
|
encoded = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(encoded)))
|
|
if mcp:
|
|
self.send_header("MCP-Protocol-Version", PROTOCOL_VERSION)
|
|
self.end_headers()
|
|
self.wfile.write(encoded)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True)
|
|
|
|
|
|
def run_http(host: str, port: int) -> None:
|
|
load_local_env()
|
|
server = ThreadingHTTPServer((host, port), MCPHandler)
|
|
print(f"{SERVER_NAME} listening on http://{host}:{port}/mcp", file=sys.stderr, flush=True)
|
|
server.serve_forever()
|
|
|
|
|
|
def run_stdio() -> None:
|
|
load_local_env()
|
|
for line in sys.stdin:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
message = json.loads(line)
|
|
response = handle_request(message)
|
|
except Exception as error:
|
|
response = jsonrpc_error(None, -32603, str(error))
|
|
if response is not None:
|
|
print(json.dumps(response, ensure_ascii=False, separators=(",", ":")), flush=True)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--transport", choices=["http", "stdio"], default="http")
|
|
parser.add_argument("--host", default=os.getenv("AIW_CONTEXT_MCP_HOST", "127.0.0.1"))
|
|
parser.add_argument("--port", type=int, default=int(os.getenv("AIW_CONTEXT_MCP_PORT", "8765")))
|
|
args = parser.parse_args()
|
|
if args.transport == "stdio":
|
|
run_stdio()
|
|
else:
|
|
run_http(args.host, args.port)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|