From d3e909d39ee63a9b622d266079e3b3983e1a19d8 Mon Sep 17 00:00:00 2001 From: "david.delagneau" Date: Wed, 20 May 2026 14:57:54 -0600 Subject: [PATCH] feat: implement AI Workspace context MCP server with read-only access and add related tests --- README.md | 3 +- profiles/fidelity/services.json | 6 +- scripts/README.md | 7 + scripts/aiw/README.md | 2 +- scripts/mcp/aiw-context-mcp/README.md | 53 +++ scripts/mcp/aiw-context-mcp/server.py | 442 +++++++++++++++++++++ scripts/mcp/aiw-context-mcp/test_server.py | 87 ++++ 7 files changed, 595 insertions(+), 5 deletions(-) create mode 100644 scripts/mcp/aiw-context-mcp/README.md create mode 100644 scripts/mcp/aiw-context-mcp/server.py create mode 100644 scripts/mcp/aiw-context-mcp/test_server.py diff --git a/README.md b/README.md index 571b4da..2ba143a 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,8 @@ Repeatable working guides for: Helpers for automation around memory access, context generation, communication drafting, and imports. -- `scripts/aiw/` -> local AI Workspace service manager for profile services such as the Mattermost proxy mirror, Photo Inbox, and future context MCP +- `scripts/aiw/` -> local AI Workspace service manager for profile services such as the Mattermost proxy mirror, Photo Inbox, and context MCP +- `scripts/mcp/` -> local MCP servers that expose bounded read-only workspace context to AI clients - `scripts/memory/` -> project-agnostic interface for canonical memory - `scripts/obsidian/` -> current Obsidian adapter and URI helpers - `scripts/mattermost/` -> live communication connector diff --git a/profiles/fidelity/services.json b/profiles/fidelity/services.json index bc61f38..ab1cab4 100644 --- a/profiles/fidelity/services.json +++ b/profiles/fidelity/services.json @@ -3,10 +3,10 @@ "description": "Local AI Workspace services for the Fidelity profile.", "services": { "aiw-context-mcp": { - "enabled": false, + "enabled": true, "kind": "mcp", - "description": "Future read-only AI Workspace context MCP server.", - "command": ["python3", "scripts/mcp/aiw-context-mcp/server.py"], + "description": "Read-only AI Workspace context MCP server.", + "command": ["python3", "scripts/mcp/aiw-context-mcp/server.py", "--transport", "http"], "groups": ["mcp", "context"], "restart": "on-failure", "doctor": { diff --git a/scripts/README.md b/scripts/README.md index 148b04f..4701497 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -26,6 +26,13 @@ The service manager reads `profiles//services.json` and manages local services such as the Mattermost proxy mirror and Photo Inbox. Runtime PID/state and logs stay under `.aiw/runtime/`. +The local context MCP server lives in: + +- `scripts/mcp/aiw-context-mcp/` + +It exposes read-only workspace context to local AI clients and is started as the +`aiw-context-mcp` service for profiles that enable it. + Recommended memory commands: ```bash diff --git a/scripts/aiw/README.md b/scripts/aiw/README.md index 18284a4..6d60553 100644 --- a/scripts/aiw/README.md +++ b/scripts/aiw/README.md @@ -26,7 +26,7 @@ python3 scripts/aiw/services.py start --profile fidelity --group inbox - `mattermost-proxy`: runs the local Mattermost proxy mirror. - `mattermost-desktop`: launches Mattermost Desktop through the proxy. - `photo-inbox`: runs the local HTTP photo receiver. -- `aiw-context-mcp`: reserved placeholder for the future read-only context MCP server. +- `aiw-context-mcp`: read-only context MCP server for local AI clients. The service manager unifies startup and status. It does not move capture behavior into the MCP. diff --git a/scripts/mcp/aiw-context-mcp/README.md b/scripts/mcp/aiw-context-mcp/README.md new file mode 100644 index 0000000..38ffcc8 --- /dev/null +++ b/scripts/mcp/aiw-context-mcp/README.md @@ -0,0 +1,53 @@ +# AIW Context MCP + +Read-only local Model Context Protocol server for AI Workspace context. + +The server exposes bounded local evidence and canonical Markdown context to MCP clients. It does not capture traffic, send messages, mutate files, or promote memory. + +## HTTP transport + +The service manager starts the HTTP transport by default: + +```bash +python3 scripts/aiw/services.py start aiw-context-mcp --profile fidelity +``` + +Endpoint: + +```text +http://127.0.0.1:8765/mcp +``` + +Health: + +```text +http://127.0.0.1:8765/health +``` + +## stdio transport + +For clients that require stdio, launch: + +```bash +python3 scripts/mcp/aiw-context-mcp/server.py --transport stdio +``` + +## Tools + +- `context_profiles` +- `communication_latest` +- `communication_date_context` +- `communication_standup_context` +- `communication_channel_context` +- `communication_thread_context` +- `project_current_context` +- `project_search_memory` +- `photos_latest` + +All tools are read-only. Mattermost tools read `ai/inbox/mattermost-mirror/`; photo tools list local Photo Inbox files without embedding image data; project tools read canonical Markdown under `project-knowledge/`. + +## Tests + +```bash +python3 scripts/mcp/aiw-context-mcp/test_server.py +``` diff --git a/scripts/mcp/aiw-context-mcp/server.py b/scripts/mcp/aiw-context-mcp/server.py new file mode 100644 index 0000000..8b45008 --- /dev/null +++ b/scripts/mcp/aiw-context-mcp/server.py @@ -0,0 +1,442 @@ +#!/usr/bin/env python3 +"""Read-only AI Workspace context MCP server. + +This server intentionally exposes bounded local evidence and canonical memory. It +does not capture traffic, send messages, or promote memory. Capture lifecycle is +owned by the AI Workspace Service Manager. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import urllib.parse +from datetime import date, datetime, timedelta +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + + +ROOT = Path(__file__).resolve().parents[3] +PROTOCOL_VERSION = "2025-06-18" +SERVER_NAME = "aiw-context-mcp" +SERVER_VERSION = "0.1.0" +LOCAL_ENV = ROOT / "scripts" / "mattermost-proxy" / ".env" + + +def load_local_env(path: Path = LOCAL_ENV) -> None: + if not path.is_file(): + return + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + if line.startswith("export "): + line = line[len("export ") :].strip() + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip("'\"") + if key and key not in os.environ: + os.environ[key] = value + + +def profile_dir(profile: str) -> Path: + if profile == "fidelity": + return ROOT + candidate = ROOT / "profiles" / profile + return candidate if candidate.exists() else ROOT + + +def knowledge_dir(profile: str) -> Path: + base = profile_dir(profile) + candidate = base / "project-knowledge" + return candidate if candidate.exists() else ROOT / "project-knowledge" + + +def inbox_dir(profile: str) -> Path: + base = profile_dir(profile) + candidate = base / "ai" / "inbox" + return candidate if candidate.exists() else ROOT / "ai" / "inbox" + + +def mattermost_mirror_dir(profile: str) -> Path: + configured = os.getenv("MATTERMOST_MIRROR_DIR", "").strip() + if configured: + path = Path(configured).expanduser() + return path if path.is_absolute() else ROOT / path + return inbox_dir(profile) / "mattermost-mirror" + + +def photo_inbox_dir(profile: str) -> Path: + configured = os.getenv("PHOTO_INBOX_DIR", "").strip() + if configured: + return Path(configured).expanduser() + linked = inbox_dir(profile) / "photos" + if linked.exists(): + return linked + return Path.home() / "Pictures" / "Photo Inbox" + + +def parse_channels(raw: str | None) -> set[str]: + if not raw: + raw = os.getenv("AIW_MATTERMOST_CONTEXT_CHANNELS", "") or os.getenv("AIW_MATTERMOST_PROJECT_CHANNELS", "") + return {item.strip().lower() for item in (raw or "").split(",") if item.strip()} + + +def previous_workday(today: date) -> date: + day = today - timedelta(days=1) + while day.weekday() >= 5: + day -= timedelta(days=1) + return day + + +def read_jsonl(path: Path, limit: int | None = None) -> list[dict[str, Any]]: + if not path.is_file(): + return [] + records: list[dict[str, Any]] = [] + for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): + line = line.strip() + if not line: + continue + try: + records.append(json.loads(line)) + except json.JSONDecodeError: + continue + if limit and limit > 0: + return records[-limit:] + return records + + +def filter_channels(records: list[dict[str, Any]], channels: set[str]) -> list[dict[str, Any]]: + if not channels: + return records + return [ + item + for item in records + if str(item.get("channel_name", "")).lower() in channels + or str(item.get("channel_id", "")).lower() in channels + ] + + +def daily_by_date_path(profile: str, day: date) -> Path: + return mattermost_mirror_dir(profile) / "by-date" / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" + + +def daily_channel_path(profile: str, channel: str, day: date) -> Path: + return mattermost_mirror_dir(profile) / "channels" / channel / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" + + +def tool_result(data: Any, text_prefix: str | None = None) -> dict[str, Any]: + text = json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True) + if text_prefix: + text = f"{text_prefix}\n{text}" + return { + "content": [{"type": "text", "text": text}], + "structuredContent": data, + "isError": False, + } + + +def tool_error(message: str, data: Any | None = None) -> dict[str, Any]: + payload = {"error": message, "data": data} + return { + "content": [{"type": "text", "text": json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)}], + "structuredContent": payload, + "isError": True, + } + + +def as_date(raw: str | None) -> date: + return datetime.strptime(raw, "%Y-%m-%d").date() if raw else date.today() + + +def clamp_limit(value: Any, default: int = 80, maximum: int = 300) -> int: + try: + limit = int(value) + except (TypeError, ValueError): + limit = default + if limit <= 0: + return default + return min(limit, maximum) + + +def list_profiles(_: dict[str, Any]) -> dict[str, Any]: + profiles = sorted(path.name for path in (ROOT / "profiles").iterdir() if (path / "profile.md").is_file()) + return tool_result({"profiles": profiles, "active_default": os.getenv("AIW_PROJECT_PROFILE", "fidelity")}) + + +def communication_latest(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + limit = clamp_limit(args.get("limit"), default=50) + records = read_jsonl(mattermost_mirror_dir(profile) / "latest.jsonl", limit=limit) + return tool_result({"profile": profile, "source": "mattermost", "evidence_type": "communication", "canonical": False, "records": records}) + + +def communication_date_context(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + day = as_date(args.get("date")) + limit = clamp_limit(args.get("limit"), default=100) + channels = parse_channels(args.get("channels")) + records = filter_channels(read_jsonl(daily_by_date_path(profile, day), limit=None), channels)[-limit:] + return tool_result({"profile": profile, "source": "mattermost", "date": day.isoformat(), "channels": sorted(channels), "records": records, "canonical": False}) + + +def communication_standup_context(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + today = as_date(args.get("today") or args.get("date")) + previous = previous_workday(today) + limit = clamp_limit(args.get("limit"), default=80) + channels = parse_channels(args.get("channels")) + previous_records = filter_channels(read_jsonl(daily_by_date_path(profile, previous)), channels)[-limit:] + today_records = filter_channels(read_jsonl(daily_by_date_path(profile, today)), channels)[-limit:] + return tool_result({ + "profile": profile, + "source": "mattermost", + "mode": "standup", + "canonical": False, + "channels": sorted(channels), + "previous_workday": previous.isoformat(), + "today": today.isoformat(), + "previous_records": previous_records, + "today_records": today_records, + }) + + +def communication_channel_context(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + channel = str(args.get("channel") or "").strip() + if not channel: + return tool_error("channel is required") + day = as_date(args.get("date")) + limit = clamp_limit(args.get("limit"), default=100) + records = read_jsonl(daily_channel_path(profile, channel, day), limit=limit) + return tool_result({"profile": profile, "source": "mattermost", "channel": channel, "date": day.isoformat(), "records": records, "canonical": False}) + + +def communication_thread_context(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + thread_id = str(args.get("thread_id") or "").strip() + if not thread_id: + return tool_error("thread_id is required") + limit = clamp_limit(args.get("limit"), default=100) + path = mattermost_mirror_dir(profile) / "threads" / f"{thread_id}.jsonl" + records = read_jsonl(path, limit=limit) + return tool_result({"profile": profile, "source": "mattermost", "thread_id": thread_id, "records": records, "canonical": False}) + + +def project_current_context(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + base = knowledge_dir(profile) + files = [base / "01-current" / "current-work.md", base / "01-current" / "work-items.md"] + result = [] + for path in files: + if path.is_file(): + result.append({"path": str(path.relative_to(ROOT)), "text": path.read_text(encoding="utf-8", errors="replace")}) + return tool_result({"profile": profile, "canonical": True, "files": result}) + + +def project_search_memory(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + query = str(args.get("query") or "").strip().lower() + if not query: + return tool_error("query is required") + limit = clamp_limit(args.get("limit"), default=10, maximum=50) + base = knowledge_dir(profile) + matches: list[dict[str, Any]] = [] + for path in sorted(base.rglob("*.md")): + rel = path.relative_to(base) + if str(rel).startswith("09-templates/"): + continue + text = path.read_text(encoding="utf-8", errors="replace") + lowered = text.lower() + index = lowered.find(query) + if index < 0: + continue + start = max(0, index - 220) + end = min(len(text), index + len(query) + 220) + matches.append({"path": str(path.relative_to(ROOT)), "snippet": text[start:end].strip()}) + if len(matches) >= limit: + break + return tool_result({"profile": profile, "canonical": True, "query": query, "matches": matches}) + + +def photos_latest(args: dict[str, Any]) -> dict[str, Any]: + profile = str(args.get("profile") or "fidelity") + limit = clamp_limit(args.get("limit"), default=20, maximum=100) + base = photo_inbox_dir(profile) + photos = [] + if base.exists(): + candidates = [path for path in base.iterdir() if path.is_file() and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".heic"}] + for path in sorted(candidates, key=lambda item: item.stat().st_mtime)[-limit:]: + photos.append({"path": str(path), "modified_at": datetime.fromtimestamp(path.stat().st_mtime).isoformat(), "bytes": path.stat().st_size}) + return tool_result({"profile": profile, "source": "photo-inbox", "canonical": False, "photos": photos}) + + +TOOLS: dict[str, dict[str, Any]] = { + "context_profiles": {"handler": list_profiles, "description": "List AI Workspace project profiles.", "properties": {}}, + "communication_latest": {"handler": communication_latest, "description": "Read bounded latest Mattermost mirror evidence.", "properties": {"profile": {"type": "string"}, "limit": {"type": "integer"}}}, + "communication_date_context": {"handler": communication_date_context, "description": "Read Mattermost mirror evidence for one date, optionally filtered by channels.", "properties": {"profile": {"type": "string"}, "date": {"type": "string"}, "channels": {"type": "string"}, "limit": {"type": "integer"}}}, + "communication_standup_context": {"handler": communication_standup_context, "description": "Read previous-workday and today Mattermost evidence for standup drafting.", "properties": {"profile": {"type": "string"}, "today": {"type": "string"}, "channels": {"type": "string"}, "limit": {"type": "integer"}}}, + "communication_channel_context": {"handler": communication_channel_context, "description": "Read Mattermost mirror evidence for a channel and date.", "properties": {"profile": {"type": "string"}, "channel": {"type": "string"}, "date": {"type": "string"}, "limit": {"type": "integer"}}}, + "communication_thread_context": {"handler": communication_thread_context, "description": "Read Mattermost mirror evidence for a thread id.", "properties": {"profile": {"type": "string"}, "thread_id": {"type": "string"}, "limit": {"type": "integer"}}}, + "project_current_context": {"handler": project_current_context, "description": "Read canonical current-work and work-items context.", "properties": {"profile": {"type": "string"}}}, + "project_search_memory": {"handler": project_search_memory, "description": "Search canonical project-knowledge Markdown files.", "properties": {"profile": {"type": "string"}, "query": {"type": "string"}, "limit": {"type": "integer"}}}, + "photos_latest": {"handler": photos_latest, "description": "List recent local Photo Inbox files without embedding image data.", "properties": {"profile": {"type": "string"}, "limit": {"type": "integer"}}}, +} + + +def tool_definitions() -> list[dict[str, Any]]: + definitions = [] + for name, item in TOOLS.items(): + definitions.append({ + "name": name, + "title": name.replace("_", " ").title(), + "description": item["description"], + "inputSchema": {"type": "object", "properties": item["properties"], "additionalProperties": False}, + "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, + }) + return definitions + + +def jsonrpc_response(request_id: Any, result: Any) -> dict[str, Any]: + return {"jsonrpc": "2.0", "id": request_id, "result": result} + + +def jsonrpc_error(request_id: Any, code: int, message: str, data: Any | None = None) -> dict[str, Any]: + error: dict[str, Any] = {"code": code, "message": message} + if data is not None: + error["data"] = data + return {"jsonrpc": "2.0", "id": request_id, "error": error} + + +def handle_request(message: dict[str, Any]) -> dict[str, Any] | None: + method = message.get("method") + request_id = message.get("id") + params = message.get("params") or {} + if request_id is None: + return None + if method == "initialize": + requested = str(params.get("protocolVersion") or PROTOCOL_VERSION) + return jsonrpc_response(request_id, { + "protocolVersion": requested if requested in {PROTOCOL_VERSION, "2025-03-26"} else PROTOCOL_VERSION, + "capabilities": {"tools": {"listChanged": False}}, + "serverInfo": {"name": SERVER_NAME, "title": "AI Workspace Context MCP", "version": SERVER_VERSION}, + "instructions": "Read-only local AI Workspace context. Evidence is not canonical memory unless promoted by the agent/user.", + }) + if method == "ping": + return jsonrpc_response(request_id, {}) + if method == "tools/list": + return jsonrpc_response(request_id, {"tools": tool_definitions()}) + if method == "tools/call": + name = str(params.get("name") or "") + arguments = params.get("arguments") or {} + tool = TOOLS.get(name) + if tool is None: + return jsonrpc_error(request_id, -32602, f"Unknown tool: {name}") + try: + return jsonrpc_response(request_id, tool["handler"](arguments)) + except Exception as error: # Keep protocol alive; report tool failure. + return jsonrpc_response(request_id, tool_error(str(error))) + return jsonrpc_error(request_id, -32601, f"Method not found: {method}") + + +class MCPHandler(BaseHTTPRequestHandler): + server_version = "AIWContextMCP/0.1" + + def do_GET(self) -> None: + if self.path == "/health": + self.send_json(HTTPStatus.OK, {"status": "ok", "server": SERVER_NAME, "version": SERVER_VERSION}) + return + parsed = urllib.parse.urlparse(self.path) + if parsed.path == "/mcp": + self.send_error(HTTPStatus.METHOD_NOT_ALLOWED, "SSE GET stream is not implemented") + return + self.send_error(HTTPStatus.NOT_FOUND) + + def do_POST(self) -> None: + parsed = urllib.parse.urlparse(self.path) + if parsed.path != "/mcp": + self.send_error(HTTPStatus.NOT_FOUND) + return + if not self.origin_allowed(): + self.send_error(HTTPStatus.FORBIDDEN, "origin not allowed") + return + length = int(self.headers.get("Content-Length") or 0) + if length <= 0 or length > 2_000_000: + self.send_error(HTTPStatus.BAD_REQUEST, "invalid body length") + return + try: + message = json.loads(self.rfile.read(length).decode("utf-8")) + except json.JSONDecodeError: + self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32700, "Parse error")) + return + if not isinstance(message, dict): + self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32600, "Invalid Request")) + return + response = handle_request(message) + if response is None: + self.send_response(HTTPStatus.ACCEPTED) + self.end_headers() + return + self.send_json(HTTPStatus.OK, response, mcp=True) + + def origin_allowed(self) -> bool: + origin = self.headers.get("Origin") + if not origin: + return True + parsed = urllib.parse.urlparse(origin) + return parsed.hostname in {"127.0.0.1", "localhost", "::1"} + + def send_json(self, status: HTTPStatus, payload: dict[str, Any], mcp: bool = False) -> None: + encoded = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + if mcp: + self.send_header("MCP-Protocol-Version", PROTOCOL_VERSION) + self.end_headers() + self.wfile.write(encoded) + + def log_message(self, format: str, *args: object) -> None: + print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) + + +def run_http(host: str, port: int) -> None: + load_local_env() + server = ThreadingHTTPServer((host, port), MCPHandler) + print(f"{SERVER_NAME} listening on http://{host}:{port}/mcp", file=sys.stderr, flush=True) + server.serve_forever() + + +def run_stdio() -> None: + load_local_env() + for line in sys.stdin: + line = line.strip() + if not line: + continue + try: + message = json.loads(line) + response = handle_request(message) + except Exception as error: + response = jsonrpc_error(None, -32603, str(error)) + if response is not None: + print(json.dumps(response, ensure_ascii=False, separators=(",", ":")), flush=True) + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--transport", choices=["http", "stdio"], default="http") + parser.add_argument("--host", default=os.getenv("AIW_CONTEXT_MCP_HOST", "127.0.0.1")) + parser.add_argument("--port", type=int, default=int(os.getenv("AIW_CONTEXT_MCP_PORT", "8765"))) + args = parser.parse_args() + if args.transport == "stdio": + run_stdio() + else: + run_http(args.host, args.port) + + +if __name__ == "__main__": + main() diff --git a/scripts/mcp/aiw-context-mcp/test_server.py b/scripts/mcp/aiw-context-mcp/test_server.py new file mode 100644 index 0000000..0743743 --- /dev/null +++ b/scripts/mcp/aiw-context-mcp/test_server.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import importlib.util +import json +import sys +import tempfile +import unittest +from datetime import date +from pathlib import Path +from unittest.mock import patch + + +SERVER_PATH = Path(__file__).with_name("server.py") +SPEC = importlib.util.spec_from_file_location("aiw_context_mcp", SERVER_PATH) +server = importlib.util.module_from_spec(SPEC) +assert SPEC.loader is not None +sys.modules[SPEC.name] = server +SPEC.loader.exec_module(server) + + +class ContextMCPTests(unittest.TestCase): + def test_initialize_response_declares_tools(self) -> None: + response = server.handle_request({"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": server.PROTOCOL_VERSION}}) + + self.assertEqual(response["result"]["protocolVersion"], server.PROTOCOL_VERSION) + self.assertIn("tools", response["result"]["capabilities"]) + + def test_tools_list_includes_project_search(self) -> None: + response = server.handle_request({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}) + + names = {tool["name"] for tool in response["result"]["tools"]} + self.assertIn("project_search_memory", names) + self.assertIn("communication_latest", names) + + def test_unknown_tool_returns_protocol_error(self) -> None: + response = server.handle_request({"jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": "missing", "arguments": {}}}) + + self.assertEqual(response["error"]["code"], -32602) + + def test_notification_returns_none(self) -> None: + response = server.handle_request({"jsonrpc": "2.0", "method": "notifications/initialized"}) + + self.assertIsNone(response) + + def test_communication_latest_reads_bounded_records(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + latest = root / "ai" / "inbox" / "mattermost-mirror" / "latest.jsonl" + latest.parent.mkdir(parents=True) + for index in range(3): + latest.write_text("", encoding="utf-8") if index == 0 else None + latest.write_text( + "\n".join(json.dumps({"post_id": str(index), "message": f"m{index}"}) for index in range(3)) + "\n", + encoding="utf-8", + ) + + with patch.object(server, "ROOT", root): + result = server.communication_latest({"profile": "fidelity", "limit": 2})["structuredContent"] + + self.assertEqual([item["message"] for item in result["records"]], ["m1", "m2"]) + + def test_project_search_skips_templates(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + real = root / "project-knowledge" / "03-context" / "project.md" + template = root / "project-knowledge" / "09-templates" / "daily.md" + real.parent.mkdir(parents=True) + template.parent.mkdir(parents=True) + real.write_text("Important XFlow context", encoding="utf-8") + template.write_text("Important XFlow template", encoding="utf-8") + + with patch.object(server, "ROOT", root): + result = server.project_search_memory({"profile": "fidelity", "query": "XFlow"})["structuredContent"] + + self.assertEqual(len(result["matches"]), 1) + self.assertIn("03-context/project.md", result["matches"][0]["path"]) + + def test_previous_workday_skips_weekend(self) -> None: + monday = date(2026, 5, 18) + + self.assertEqual(server.previous_workday(monday), date(2026, 5, 15)) + + +if __name__ == "__main__": + unittest.main()