#!/usr/bin/env python3 """Read-only AI Workspace context MCP server. This server intentionally exposes bounded local evidence and canonical memory. It does not capture traffic, send messages, or promote memory. Capture lifecycle is owned by the AI Workspace Service Manager. """ from __future__ import annotations import argparse import json import os import sys import urllib.parse from datetime import date, datetime, timedelta from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[3] PROTOCOL_VERSION = "2025-06-18" SERVER_NAME = "aiw-context-mcp" SERVER_VERSION = "0.1.0" LOCAL_ENV = ROOT / "scripts" / "mattermost-proxy" / ".env" def load_local_env(path: Path = LOCAL_ENV) -> None: if not path.is_file(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue if line.startswith("export "): line = line[len("export ") :].strip() key, value = line.split("=", 1) key = key.strip() value = value.strip().strip("'\"") if key and key not in os.environ: os.environ[key] = value def profile_dir(profile: str) -> Path: if profile == "fidelity": return ROOT candidate = ROOT / "profiles" / profile return candidate if candidate.exists() else ROOT def knowledge_dir(profile: str) -> Path: base = profile_dir(profile) candidate = base / "project-knowledge" return candidate if candidate.exists() else ROOT / "project-knowledge" def inbox_dir(profile: str) -> Path: base = profile_dir(profile) candidate = base / "ai" / "inbox" return candidate if candidate.exists() else ROOT / "ai" / "inbox" def mattermost_mirror_dir(profile: str) -> Path: configured = os.getenv("MATTERMOST_MIRROR_DIR", "").strip() if configured: path = Path(configured).expanduser() return path if path.is_absolute() else ROOT / path return inbox_dir(profile) / "mattermost-mirror" def photo_inbox_dir(profile: str) -> Path: configured = os.getenv("PHOTO_INBOX_DIR", "").strip() if configured: return Path(configured).expanduser() linked = inbox_dir(profile) / "photos" if linked.exists(): return linked return Path.home() / "Pictures" / "Photo Inbox" def parse_channels(raw: str | None) -> set[str]: if not raw: raw = os.getenv("AIW_MATTERMOST_CONTEXT_CHANNELS", "") or os.getenv("AIW_MATTERMOST_PROJECT_CHANNELS", "") return {item.strip().lower() for item in (raw or "").split(",") if item.strip()} def previous_workday(today: date) -> date: day = today - timedelta(days=1) while day.weekday() >= 5: day -= timedelta(days=1) return day def read_jsonl(path: Path, limit: int | None = None) -> list[dict[str, Any]]: if not path.is_file(): return [] records: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: continue if limit and limit > 0: return records[-limit:] return records def filter_channels(records: list[dict[str, Any]], channels: set[str]) -> list[dict[str, Any]]: if not channels: return records return [ item for item in records if str(item.get("channel_name", "")).lower() in channels or str(item.get("channel_id", "")).lower() in channels ] def daily_by_date_path(profile: str, day: date) -> Path: return mattermost_mirror_dir(profile) / "by-date" / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" def daily_channel_path(profile: str, channel: str, day: date) -> Path: return mattermost_mirror_dir(profile) / "channels" / channel / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" def tool_result(data: Any, text_prefix: str | None = None) -> dict[str, Any]: text = json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True) if text_prefix: text = f"{text_prefix}\n{text}" return { "content": [{"type": "text", "text": text}], "structuredContent": data, "isError": False, } def tool_error(message: str, data: Any | None = None) -> dict[str, Any]: payload = {"error": message, "data": data} return { "content": [{"type": "text", "text": json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)}], "structuredContent": payload, "isError": True, } def as_date(raw: str | None) -> date: return datetime.strptime(raw, "%Y-%m-%d").date() if raw else date.today() def clamp_limit(value: Any, default: int = 80, maximum: int = 300) -> int: try: limit = int(value) except (TypeError, ValueError): limit = default if limit <= 0: return default return min(limit, maximum) def list_profiles(_: dict[str, Any]) -> dict[str, Any]: profiles = sorted(path.name for path in (ROOT / "profiles").iterdir() if (path / "profile.md").is_file()) return tool_result({"profiles": profiles, "active_default": os.getenv("AIW_PROJECT_PROFILE", "fidelity")}) def communication_latest(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") limit = clamp_limit(args.get("limit"), default=50) records = read_jsonl(mattermost_mirror_dir(profile) / "latest.jsonl", limit=limit) return tool_result({"profile": profile, "source": "mattermost", "evidence_type": "communication", "canonical": False, "records": records}) def communication_date_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") day = as_date(args.get("date")) limit = clamp_limit(args.get("limit"), default=100) channels = parse_channels(args.get("channels")) records = filter_channels(read_jsonl(daily_by_date_path(profile, day), limit=None), channels)[-limit:] return tool_result({"profile": profile, "source": "mattermost", "date": day.isoformat(), "channels": sorted(channels), "records": records, "canonical": False}) def communication_standup_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") today = as_date(args.get("today") or args.get("date")) previous = previous_workday(today) limit = clamp_limit(args.get("limit"), default=80) channels = parse_channels(args.get("channels")) previous_records = filter_channels(read_jsonl(daily_by_date_path(profile, previous)), channels)[-limit:] today_records = filter_channels(read_jsonl(daily_by_date_path(profile, today)), channels)[-limit:] return tool_result({ "profile": profile, "source": "mattermost", "mode": "standup", "canonical": False, "channels": sorted(channels), "previous_workday": previous.isoformat(), "today": today.isoformat(), "previous_records": previous_records, "today_records": today_records, }) def communication_channel_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") channel = str(args.get("channel") or "").strip() if not channel: return tool_error("channel is required") day = as_date(args.get("date")) limit = clamp_limit(args.get("limit"), default=100) records = read_jsonl(daily_channel_path(profile, channel, day), limit=limit) return tool_result({"profile": profile, "source": "mattermost", "channel": channel, "date": day.isoformat(), "records": records, "canonical": False}) def communication_thread_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") thread_id = str(args.get("thread_id") or "").strip() if not thread_id: return tool_error("thread_id is required") limit = clamp_limit(args.get("limit"), default=100) path = mattermost_mirror_dir(profile) / "threads" / f"{thread_id}.jsonl" records = read_jsonl(path, limit=limit) return tool_result({"profile": profile, "source": "mattermost", "thread_id": thread_id, "records": records, "canonical": False}) def project_current_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") base = knowledge_dir(profile) files = [base / "01-current" / "current-work.md", base / "01-current" / "work-items.md"] result = [] for path in files: if path.is_file(): result.append({"path": str(path.relative_to(ROOT)), "text": path.read_text(encoding="utf-8", errors="replace")}) return tool_result({"profile": profile, "canonical": True, "files": result}) def project_search_memory(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") query = str(args.get("query") or "").strip().lower() if not query: return tool_error("query is required") limit = clamp_limit(args.get("limit"), default=10, maximum=50) base = knowledge_dir(profile) matches: list[dict[str, Any]] = [] for path in sorted(base.rglob("*.md")): rel = path.relative_to(base) if str(rel).startswith("09-templates/"): continue text = path.read_text(encoding="utf-8", errors="replace") lowered = text.lower() index = lowered.find(query) if index < 0: continue start = max(0, index - 220) end = min(len(text), index + len(query) + 220) matches.append({"path": str(path.relative_to(ROOT)), "snippet": text[start:end].strip()}) if len(matches) >= limit: break return tool_result({"profile": profile, "canonical": True, "query": query, "matches": matches}) def photos_latest(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") limit = clamp_limit(args.get("limit"), default=20, maximum=100) base = photo_inbox_dir(profile) photos = [] if base.exists(): candidates = [path for path in base.iterdir() if path.is_file() and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".heic"}] for path in sorted(candidates, key=lambda item: item.stat().st_mtime)[-limit:]: photos.append({"path": str(path), "modified_at": datetime.fromtimestamp(path.stat().st_mtime).isoformat(), "bytes": path.stat().st_size}) return tool_result({"profile": profile, "source": "photo-inbox", "canonical": False, "photos": photos}) TOOLS: dict[str, dict[str, Any]] = { "context_profiles": {"handler": list_profiles, "description": "List AI Workspace project profiles.", "properties": {}}, "communication_latest": {"handler": communication_latest, "description": "Read bounded latest Mattermost mirror evidence.", "properties": {"profile": {"type": "string"}, "limit": {"type": "integer"}}}, "communication_date_context": {"handler": communication_date_context, "description": "Read Mattermost mirror evidence for one date, optionally filtered by channels.", "properties": {"profile": {"type": "string"}, "date": {"type": "string"}, "channels": {"type": "string"}, "limit": {"type": "integer"}}}, "communication_standup_context": {"handler": communication_standup_context, "description": "Read previous-workday and today Mattermost evidence for standup drafting.", "properties": {"profile": {"type": "string"}, "today": {"type": "string"}, "channels": {"type": "string"}, "limit": {"type": "integer"}}}, "communication_channel_context": {"handler": communication_channel_context, "description": "Read Mattermost mirror evidence for a channel and date.", "properties": {"profile": {"type": "string"}, "channel": {"type": "string"}, "date": {"type": "string"}, "limit": {"type": "integer"}}}, "communication_thread_context": {"handler": communication_thread_context, "description": "Read Mattermost mirror evidence for a thread id.", "properties": {"profile": {"type": "string"}, "thread_id": {"type": "string"}, "limit": {"type": "integer"}}}, "project_current_context": {"handler": project_current_context, "description": "Read canonical current-work and work-items context.", "properties": {"profile": {"type": "string"}}}, "project_search_memory": {"handler": project_search_memory, "description": "Search canonical project-knowledge Markdown files.", "properties": {"profile": {"type": "string"}, "query": {"type": "string"}, "limit": {"type": "integer"}}}, "photos_latest": {"handler": photos_latest, "description": "List recent local Photo Inbox files without embedding image data.", "properties": {"profile": {"type": "string"}, "limit": {"type": "integer"}}}, } def tool_definitions() -> list[dict[str, Any]]: definitions = [] for name, item in TOOLS.items(): definitions.append({ "name": name, "title": name.replace("_", " ").title(), "description": item["description"], "inputSchema": {"type": "object", "properties": item["properties"], "additionalProperties": False}, "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, }) return definitions def jsonrpc_response(request_id: Any, result: Any) -> dict[str, Any]: return {"jsonrpc": "2.0", "id": request_id, "result": result} def jsonrpc_error(request_id: Any, code: int, message: str, data: Any | None = None) -> dict[str, Any]: error: dict[str, Any] = {"code": code, "message": message} if data is not None: error["data"] = data return {"jsonrpc": "2.0", "id": request_id, "error": error} def handle_request(message: dict[str, Any]) -> dict[str, Any] | None: method = message.get("method") request_id = message.get("id") params = message.get("params") or {} if request_id is None: return None if method == "initialize": requested = str(params.get("protocolVersion") or PROTOCOL_VERSION) return jsonrpc_response(request_id, { "protocolVersion": requested if requested in {PROTOCOL_VERSION, "2025-03-26"} else PROTOCOL_VERSION, "capabilities": {"tools": {"listChanged": False}}, "serverInfo": {"name": SERVER_NAME, "title": "AI Workspace Context MCP", "version": SERVER_VERSION}, "instructions": "Read-only local AI Workspace context. Evidence is not canonical memory unless promoted by the agent/user.", }) if method == "ping": return jsonrpc_response(request_id, {}) if method == "tools/list": return jsonrpc_response(request_id, {"tools": tool_definitions()}) if method == "tools/call": name = str(params.get("name") or "") arguments = params.get("arguments") or {} tool = TOOLS.get(name) if tool is None: return jsonrpc_error(request_id, -32602, f"Unknown tool: {name}") try: return jsonrpc_response(request_id, tool["handler"](arguments)) except Exception as error: # Keep protocol alive; report tool failure. return jsonrpc_response(request_id, tool_error(str(error))) return jsonrpc_error(request_id, -32601, f"Method not found: {method}") class MCPHandler(BaseHTTPRequestHandler): server_version = "AIWContextMCP/0.1" def do_GET(self) -> None: if self.path == "/health": self.send_json(HTTPStatus.OK, {"status": "ok", "server": SERVER_NAME, "version": SERVER_VERSION}) return parsed = urllib.parse.urlparse(self.path) if parsed.path == "/mcp": self.send_error(HTTPStatus.METHOD_NOT_ALLOWED, "SSE GET stream is not implemented") return self.send_error(HTTPStatus.NOT_FOUND) def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path != "/mcp": self.send_error(HTTPStatus.NOT_FOUND) return if not self.origin_allowed(): self.send_error(HTTPStatus.FORBIDDEN, "origin not allowed") return length = int(self.headers.get("Content-Length") or 0) if length <= 0 or length > 2_000_000: self.send_error(HTTPStatus.BAD_REQUEST, "invalid body length") return try: message = json.loads(self.rfile.read(length).decode("utf-8")) except json.JSONDecodeError: self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32700, "Parse error")) return if not isinstance(message, dict): self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32600, "Invalid Request")) return response = handle_request(message) if response is None: self.send_response(HTTPStatus.ACCEPTED) self.end_headers() return self.send_json(HTTPStatus.OK, response, mcp=True) def origin_allowed(self) -> bool: origin = self.headers.get("Origin") if not origin: return True parsed = urllib.parse.urlparse(origin) return parsed.hostname in {"127.0.0.1", "localhost", "::1"} def send_json(self, status: HTTPStatus, payload: dict[str, Any], mcp: bool = False) -> None: encoded = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(encoded))) if mcp: self.send_header("MCP-Protocol-Version", PROTOCOL_VERSION) self.end_headers() self.wfile.write(encoded) def log_message(self, format: str, *args: object) -> None: print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) def run_http(host: str, port: int) -> None: load_local_env() server = ThreadingHTTPServer((host, port), MCPHandler) print(f"{SERVER_NAME} listening on http://{host}:{port}/mcp", file=sys.stderr, flush=True) server.serve_forever() def run_stdio() -> None: load_local_env() for line in sys.stdin: line = line.strip() if not line: continue try: message = json.loads(line) response = handle_request(message) except Exception as error: response = jsonrpc_error(None, -32603, str(error)) if response is not None: print(json.dumps(response, ensure_ascii=False, separators=(",", ":")), flush=True) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--transport", choices=["http", "stdio"], default="http") parser.add_argument("--host", default=os.getenv("AIW_CONTEXT_MCP_HOST", "127.0.0.1")) parser.add_argument("--port", type=int, default=int(os.getenv("AIW_CONTEXT_MCP_PORT", "8765"))) args = parser.parse_args() if args.transport == "stdio": run_stdio() else: run_http(args.host, args.port) if __name__ == "__main__": main()