#!/usr/bin/env python3 """Read-only AI Workspace context MCP server. This server intentionally exposes bounded local evidence and canonical memory. It does not capture traffic, send messages, or promote memory. Capture lifecycle is owned by the AI Workspace Service Manager. """ from __future__ import annotations import argparse import hashlib import json import os import re import sys import urllib.parse from datetime import date, datetime, timedelta from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[3] PROTOCOL_VERSION = "2025-06-18" SERVER_NAME = "aiw-context-mcp" SERVER_VERSION = "0.1.0" LOCAL_ENV = ROOT / "scripts" / "mattermost-proxy" / ".env" AIW_SCRIPT_DIR = ROOT / "scripts" / "aiw" sys.path.insert(0, str(AIW_SCRIPT_DIR)) import profile as aiw_profile # noqa: E402 def load_local_env(path: Path = LOCAL_ENV) -> None: if not path.is_file(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue if line.startswith("export "): line = line[len("export ") :].strip() key, value = line.split("=", 1) key = key.strip() value = value.strip().strip("'\"") if key and key not in os.environ: os.environ[key] = value def profile_dir(profile: str) -> Path: candidate = ROOT / "profiles" / profile return candidate if candidate.exists() else ROOT def knowledge_dir(profile: str) -> Path: return aiw_profile.knowledge_dir(profile, root=ROOT) def inbox_dir(profile: str) -> Path: return aiw_profile.inbox_dir(profile, root=ROOT) def rel(path: Path) -> str: return aiw_profile.relative_to_root(path, root=ROOT) def mattermost_mirror_dir(profile: str) -> Path: configured = os.getenv("MATTERMOST_MIRROR_DIR", "").strip() if configured: path = Path(configured).expanduser() return path if path.is_absolute() else ROOT / path return inbox_dir(profile) / "mattermost-mirror" def profile_context_channels(profile: str, source: str = "mattermost") -> set[str]: path = ROOT / "profiles" / profile / "context-sources.json" if not path.is_file(): return set() try: config = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return set() channels = (((config.get("communication_sources") or {}).get(source) or {}).get("context_channels") or []) return {str(item).strip().lower() for item in channels if str(item).strip()} def photo_inbox_dir(profile: str) -> Path: configured = os.getenv("PHOTO_INBOX_DIR", "").strip() if configured: return Path(configured).expanduser() linked = inbox_dir(profile) / "photos" if linked.exists(): return linked return Path.home() / "Pictures" / "Photo Inbox" def parse_channels(raw: str | None, profile: str | None = None) -> set[str]: if not raw: raw = os.getenv("AIW_MATTERMOST_CONTEXT_CHANNELS", "") or os.getenv("AIW_MATTERMOST_PROJECT_CHANNELS", "") channels = {item.strip().lower() for item in (raw or "").split(",") if item.strip()} if channels: return channels return profile_context_channels(profile or "fidelity") if profile else set() def should_use_all_channels(args: dict[str, Any]) -> bool: return bool(args.get("include_all_channels") or args.get("all_channels")) def previous_workday(today: date) -> date: day = today - timedelta(days=1) while day.weekday() >= 5: day -= timedelta(days=1) return day def read_jsonl(path: Path, limit: int | None = None) -> list[dict[str, Any]]: if not path.is_file(): return [] records: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: continue if limit and limit > 0: return records[-limit:] return records def filter_channels(records: list[dict[str, Any]], channels: set[str]) -> list[dict[str, Any]]: if not channels: return records return [ item for item in records if str(item.get("channel_name", "")).lower() in channels or str(item.get("channel_id", "")).lower() in channels ] def daily_by_date_path(profile: str, day: date) -> Path: return mattermost_mirror_dir(profile) / "by-date" / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" def daily_channel_path(profile: str, channel: str, day: date) -> Path: return mattermost_mirror_dir(profile) / "channels" / channel / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" def tool_result(data: Any, text_prefix: str | None = None) -> dict[str, Any]: text = json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True) if text_prefix: text = f"{text_prefix}\n{text}" return { "content": [{"type": "text", "text": text}], "structuredContent": data, "isError": False, } def tool_error(message: str, data: Any | None = None) -> dict[str, Any]: payload = {"error": message, "data": data} return { "content": [{"type": "text", "text": json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)}], "structuredContent": payload, "isError": True, } def as_date(raw: str | None) -> date: return datetime.strptime(raw, "%Y-%m-%d").date() if raw else date.today() def clamp_limit(value: Any, default: int = 80, maximum: int = 300) -> int: try: limit = int(value) except (TypeError, ValueError): limit = default if limit <= 0: return default return min(limit, maximum) def list_profiles(_: dict[str, Any]) -> dict[str, Any]: profiles = sorted(path.name for path in (ROOT / "profiles").iterdir() if (path / "profile.md").is_file()) return tool_result({"profiles": profiles, "active_default": os.getenv("AIW_PROJECT_PROFILE", "fidelity")}) def communication_latest(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") limit = clamp_limit(args.get("limit"), default=50) channels = set() if should_use_all_channels(args) else parse_channels(args.get("channels"), profile=profile) records = filter_channels(read_jsonl(mattermost_mirror_dir(profile) / "latest.jsonl", limit=None), channels)[-limit:] return tool_result({"profile": profile, "source": "mattermost", "evidence_type": "communication", "canonical": False, "channel_scope": "all" if not channels else "profile", "channels": sorted(channels), "records": records}) def communication_date_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") day = as_date(args.get("date")) limit = clamp_limit(args.get("limit"), default=100) channels = set() if should_use_all_channels(args) else parse_channels(args.get("channels"), profile=profile) records = filter_channels(read_jsonl(daily_by_date_path(profile, day), limit=None), channels)[-limit:] return tool_result({"profile": profile, "source": "mattermost", "date": day.isoformat(), "channels": sorted(channels), "records": records, "canonical": False}) def communication_standup_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") today = as_date(args.get("today") or args.get("date")) previous = previous_workday(today) limit = clamp_limit(args.get("limit"), default=80) channels = set() if should_use_all_channels(args) else parse_channels(args.get("channels"), profile=profile) previous_records = filter_channels(read_jsonl(daily_by_date_path(profile, previous)), channels)[-limit:] today_records = filter_channels(read_jsonl(daily_by_date_path(profile, today)), channels)[-limit:] return tool_result({ "profile": profile, "source": "mattermost", "mode": "standup", "canonical": False, "channels": sorted(channels), "previous_workday": previous.isoformat(), "today": today.isoformat(), "previous_records": previous_records, "today_records": today_records, }) def communication_channel_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") channel = str(args.get("channel") or "").strip() if not channel: return tool_error("channel is required") day = as_date(args.get("date")) limit = clamp_limit(args.get("limit"), default=100) records = read_jsonl(daily_channel_path(profile, channel, day), limit=limit) return tool_result({"profile": profile, "source": "mattermost", "channel": channel, "date": day.isoformat(), "records": records, "canonical": False}) def communication_thread_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") thread_id = str(args.get("thread_id") or "").strip() if not thread_id: return tool_error("thread_id is required") limit = clamp_limit(args.get("limit"), default=100) path = mattermost_mirror_dir(profile) / "threads" / f"{thread_id}.jsonl" records = read_jsonl(path, limit=limit) return tool_result({"profile": profile, "source": "mattermost", "thread_id": thread_id, "records": records, "canonical": False}) def project_current_context(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") base = knowledge_dir(profile) files = [base / "01-current" / "current-work.md", base / "01-current" / "work-items.md"] result = [] for path in files: if path.is_file(): result.append({"path": rel(path), "text": path.read_text(encoding="utf-8", errors="replace")}) return tool_result({"profile": profile, "canonical": True, "files": result}) def project_search_memory(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") query = str(args.get("query") or "").strip().lower() if not query: return tool_error("query is required") limit = clamp_limit(args.get("limit"), default=10, maximum=50) base = knowledge_dir(profile) matches: list[dict[str, Any]] = [] for path in sorted(base.rglob("*.md")): relative_to_base = path.relative_to(base) if str(relative_to_base).startswith("09-templates/"): continue text = path.read_text(encoding="utf-8", errors="replace") lowered = text.lower() index = lowered.find(query) if index < 0: continue start = max(0, index - 220) end = min(len(text), index + len(query) + 220) matches.append({"path": rel(path), "snippet": text[start:end].strip()}) if len(matches) >= limit: break return tool_result({"profile": profile, "canonical": True, "query": query, "matches": matches}) def index_path(profile: str) -> Path: return aiw_profile.index_dir(profile, root=ROOT) / "project-knowledge.jsonl" def index_manifest_path(profile: str) -> Path: return aiw_profile.index_dir(profile, root=ROOT) / "manifest.json" def search_tokens(text: str) -> set[str]: return {item for item in re.findall(r"[a-z0-9][a-z0-9_-]{1,}", text.lower()) if len(item) > 1} def read_project_index(profile: str) -> list[dict[str, Any]]: path = index_path(profile) if not path.is_file(): return [] rows: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): if not line.strip(): continue try: rows.append(json.loads(line)) except json.JSONDecodeError: continue return rows def indexed_snippet(query: str, text: str, width: int = 520) -> str: lowered = text.lower() index = lowered.find(query.lower()) if query else -1 if index < 0: positions = [lowered.find(term) for term in search_tokens(query) if lowered.find(term) >= 0] index = min(positions) if positions else 0 start = max(0, index - width // 2) end = min(len(text), start + width) return re.sub(r"\s+", " ", text[start:end]).strip() def score_index_row(query: str, query_tokens: set[str], row: dict[str, Any]) -> float: text = str(row.get("text") or "") haystack = f"{row.get('path', '')} {row.get('heading', '')} {text}".lower() exact = haystack.count(query.lower()) overlap = len(query_tokens & search_tokens(haystack)) if exact == 0 and overlap == 0: return 0.0 heading_bonus = 1.5 if query.lower() in str(row.get("heading") or "").lower() else 0.0 path_bonus = 1.0 if query.lower() in str(row.get("path") or "").lower() else 0.0 return exact * 5.0 + overlap * 1.25 + heading_bonus + path_bonus def read_index_manifest(profile: str) -> dict[str, Any]: path = index_manifest_path(profile) if not path.is_file(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} def memory_hybrid_search(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") query = str(args.get("query") or "").strip() if not query: return tool_error("query is required") limit = clamp_limit(args.get("limit"), default=10, maximum=50) rows = read_project_index(profile) if not rows: fallback = project_search_memory({"profile": profile, "query": query, "limit": limit})["structuredContent"] fallback["source"] = "live-project-knowledge-fallback" fallback["index_available"] = False return tool_result(fallback) query_tokens = search_tokens(query) scored = [] for row in rows: score = score_index_row(query, query_tokens, row) if score > 0: scored.append((score, row)) scored.sort(key=lambda item: (-item[0], item[1].get("path", ""), item[1].get("chunk_id", ""))) matches = [] for score, row in scored[:limit]: text = str(row.get("text") or "") matches.append({ "score": round(score, 3), "path": row.get("path"), "heading": row.get("heading"), "chunk_id": row.get("chunk_id") or hashlib.sha256(text.encode("utf-8")).hexdigest()[:16], "snippet": indexed_snippet(query, text), "mtime": row.get("mtime"), "sha256": row.get("sha256"), }) return tool_result({"profile": profile, "canonical": False, "source": "derived-project-knowledge-index", "index_available": True, "manifest": read_index_manifest(profile), "query": query, "matches": matches}) def photos_latest(args: dict[str, Any]) -> dict[str, Any]: profile = str(args.get("profile") or "fidelity") limit = clamp_limit(args.get("limit"), default=20, maximum=100) base = photo_inbox_dir(profile) photos = [] if base.exists(): candidates = [path for path in base.iterdir() if path.is_file() and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".heic"}] for path in sorted(candidates, key=lambda item: item.stat().st_mtime)[-limit:]: photos.append({"path": str(path), "modified_at": datetime.fromtimestamp(path.stat().st_mtime).isoformat(), "bytes": path.stat().st_size}) return tool_result({"profile": profile, "source": "photo-inbox", "canonical": False, "photos": photos}) def resource_definitions() -> list[dict[str, Any]]: resources: list[dict[str, Any]] = [] for profile in sorted(path.name for path in (ROOT / "profiles").iterdir() if (path / "profile.md").is_file()): resources.extend([ { "uri": f"aiw://profiles/{profile}/current-work", "name": f"{profile}-current-work", "title": f"{profile} Current Work", "description": "Canonical current work context.", "mimeType": "text/markdown", "annotations": {"audience": ["assistant"], "priority": 0.95}, }, { "uri": f"aiw://profiles/{profile}/work-items", "name": f"{profile}-work-items", "title": f"{profile} Active Work Items", "description": "Canonical active work-item summary.", "mimeType": "text/markdown", "annotations": {"audience": ["assistant"], "priority": 0.9}, }, { "uri": f"aiw://profiles/{profile}/mattermost/latest", "name": f"{profile}-mattermost-latest", "title": f"{profile} Mattermost Latest", "description": "Profile-filtered latest Mattermost mirror evidence as JSON.", "mimeType": "application/json", "annotations": {"audience": ["assistant"], "priority": 0.75}, }, { "uri": f"aiw://profiles/{profile}/photos/latest", "name": f"{profile}-photos-latest", "title": f"{profile} Photo Inbox Latest", "description": "Latest local Photo Inbox file metadata as JSON; image data is not embedded.", "mimeType": "application/json", "annotations": {"audience": ["assistant"], "priority": 0.45}, }, ]) return resources def read_resource(uri: str) -> dict[str, Any] | None: parsed = urllib.parse.urlparse(uri) if parsed.scheme != "aiw": return None parts = [part for part in parsed.path.split("/") if part] if parsed.netloc != "profiles" or len(parts) < 2: return None profile = parts[0] resource = "/".join(parts[1:]) base = knowledge_dir(profile) if resource == "current-work": path = base / "01-current" / "current-work.md" if not path.is_file(): return None return {"uri": uri, "mimeType": "text/markdown", "text": path.read_text(encoding="utf-8", errors="replace")} if resource == "work-items": path = base / "01-current" / "work-items.md" if not path.is_file(): return None return {"uri": uri, "mimeType": "text/markdown", "text": path.read_text(encoding="utf-8", errors="replace")} if resource == "mattermost/latest": data = communication_latest({"profile": profile, "limit": 80})["structuredContent"] return {"uri": uri, "mimeType": "application/json", "text": json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True)} if resource == "photos/latest": data = photos_latest({"profile": profile, "limit": 30})["structuredContent"] return {"uri": uri, "mimeType": "application/json", "text": json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True)} return None TOOLS: dict[str, dict[str, Any]] = { "context_profiles": {"handler": list_profiles, "description": "List AI Workspace project profiles.", "properties": {}}, "communication_latest": {"handler": communication_latest, "description": "Read bounded latest Mattermost mirror evidence, filtered to the profile's context channels by default.", "properties": {"profile": {"type": "string"}, "channels": {"type": "string"}, "include_all_channels": {"type": "boolean"}, "limit": {"type": "integer"}}}, "communication_date_context": {"handler": communication_date_context, "description": "Read Mattermost mirror evidence for one date, filtered to profile context channels by default unless include_all_channels is true.", "properties": {"profile": {"type": "string"}, "date": {"type": "string"}, "channels": {"type": "string"}, "include_all_channels": {"type": "boolean"}, "limit": {"type": "integer"}}}, "communication_standup_context": {"handler": communication_standup_context, "description": "Read previous-workday and today Mattermost evidence for standup drafting, filtered to profile context channels by default.", "properties": {"profile": {"type": "string"}, "today": {"type": "string"}, "channels": {"type": "string"}, "include_all_channels": {"type": "boolean"}, "limit": {"type": "integer"}}}, "communication_channel_context": {"handler": communication_channel_context, "description": "Read Mattermost mirror evidence for a channel and date.", "properties": {"profile": {"type": "string"}, "channel": {"type": "string"}, "date": {"type": "string"}, "limit": {"type": "integer"}}}, "communication_thread_context": {"handler": communication_thread_context, "description": "Read Mattermost mirror evidence for a thread id.", "properties": {"profile": {"type": "string"}, "thread_id": {"type": "string"}, "limit": {"type": "integer"}}}, "project_current_context": {"handler": project_current_context, "description": "Read canonical current-work and work-items context.", "properties": {"profile": {"type": "string"}}}, "project_search_memory": {"handler": project_search_memory, "description": "Search canonical project-knowledge Markdown files.", "properties": {"profile": {"type": "string"}, "query": {"type": "string"}, "limit": {"type": "integer"}}}, "memory_hybrid_search": {"handler": memory_hybrid_search, "description": "Search the derived local project-knowledge index with lexical scoring and source citations; falls back to live Markdown search if no index exists.", "properties": {"profile": {"type": "string"}, "query": {"type": "string"}, "limit": {"type": "integer"}}}, "photos_latest": {"handler": photos_latest, "description": "List recent local Photo Inbox files without embedding image data.", "properties": {"profile": {"type": "string"}, "limit": {"type": "integer"}}}, } def tool_definitions() -> list[dict[str, Any]]: definitions = [] for name, item in TOOLS.items(): definitions.append({ "name": name, "title": name.replace("_", " ").title(), "description": item["description"], "inputSchema": {"type": "object", "properties": item["properties"], "additionalProperties": False}, "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, }) return definitions def jsonrpc_response(request_id: Any, result: Any) -> dict[str, Any]: return {"jsonrpc": "2.0", "id": request_id, "result": result} def jsonrpc_error(request_id: Any, code: int, message: str, data: Any | None = None) -> dict[str, Any]: error: dict[str, Any] = {"code": code, "message": message} if data is not None: error["data"] = data return {"jsonrpc": "2.0", "id": request_id, "error": error} def handle_request(message: dict[str, Any]) -> dict[str, Any] | None: method = message.get("method") request_id = message.get("id") params = message.get("params") or {} if request_id is None: return None if method == "initialize": requested = str(params.get("protocolVersion") or PROTOCOL_VERSION) return jsonrpc_response(request_id, { "protocolVersion": requested if requested in {PROTOCOL_VERSION, "2025-03-26"} else PROTOCOL_VERSION, "capabilities": {"tools": {"listChanged": False}, "resources": {"listChanged": False}}, "serverInfo": {"name": SERVER_NAME, "title": "AI Workspace Context MCP", "version": SERVER_VERSION}, "instructions": "Read-only local AI Workspace context. Evidence is not canonical memory unless promoted by the agent/user.", }) if method == "ping": return jsonrpc_response(request_id, {}) if method == "tools/list": return jsonrpc_response(request_id, {"tools": tool_definitions()}) if method == "resources/list": return jsonrpc_response(request_id, {"resources": resource_definitions()}) if method == "resources/read": uri = str(params.get("uri") or "") content = read_resource(uri) if content is None: return jsonrpc_error(request_id, -32002, "Resource not found", {"uri": uri}) return jsonrpc_response(request_id, {"contents": [content]}) if method == "resources/templates/list": return jsonrpc_response(request_id, {"resourceTemplates": []}) if method == "tools/call": name = str(params.get("name") or "") arguments = params.get("arguments") or {} tool = TOOLS.get(name) if tool is None: return jsonrpc_error(request_id, -32602, f"Unknown tool: {name}") try: return jsonrpc_response(request_id, tool["handler"](arguments)) except Exception as error: # Keep protocol alive; report tool failure. return jsonrpc_response(request_id, tool_error(str(error))) return jsonrpc_error(request_id, -32601, f"Method not found: {method}") class MCPHandler(BaseHTTPRequestHandler): server_version = "AIWContextMCP/0.1" def do_GET(self) -> None: if self.path == "/health": self.send_json(HTTPStatus.OK, {"status": "ok", "server": SERVER_NAME, "version": SERVER_VERSION}) return parsed = urllib.parse.urlparse(self.path) if parsed.path == "/mcp": self.send_error(HTTPStatus.METHOD_NOT_ALLOWED, "SSE GET stream is not implemented") return self.send_error(HTTPStatus.NOT_FOUND) def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path != "/mcp": self.send_error(HTTPStatus.NOT_FOUND) return if not self.origin_allowed(): self.send_error(HTTPStatus.FORBIDDEN, "origin not allowed") return length = int(self.headers.get("Content-Length") or 0) if length <= 0 or length > 2_000_000: self.send_error(HTTPStatus.BAD_REQUEST, "invalid body length") return try: message = json.loads(self.rfile.read(length).decode("utf-8")) except json.JSONDecodeError: self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32700, "Parse error")) return if not isinstance(message, dict): self.send_json(HTTPStatus.BAD_REQUEST, jsonrpc_error(None, -32600, "Invalid Request")) return response = handle_request(message) if response is None: self.send_response(HTTPStatus.ACCEPTED) self.end_headers() return self.send_json(HTTPStatus.OK, response, mcp=True) def origin_allowed(self) -> bool: origin = self.headers.get("Origin") if not origin: return True parsed = urllib.parse.urlparse(origin) return parsed.hostname in {"127.0.0.1", "localhost", "::1"} def send_json(self, status: HTTPStatus, payload: dict[str, Any], mcp: bool = False) -> None: encoded = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(encoded))) if mcp: self.send_header("MCP-Protocol-Version", PROTOCOL_VERSION) self.end_headers() self.wfile.write(encoded) def log_message(self, format: str, *args: object) -> None: print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) def run_http(host: str, port: int) -> None: load_local_env() server = ThreadingHTTPServer((host, port), MCPHandler) print(f"{SERVER_NAME} listening on http://{host}:{port}/mcp", file=sys.stderr, flush=True) server.serve_forever() def run_stdio() -> None: load_local_env() for line in sys.stdin: line = line.strip() if not line: continue try: message = json.loads(line) response = handle_request(message) except Exception as error: response = jsonrpc_error(None, -32603, str(error)) if response is not None: print(json.dumps(response, ensure_ascii=False, separators=(",", ":")), flush=True) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--transport", choices=["http", "stdio"], default="http") parser.add_argument("--host", default=os.getenv("AIW_CONTEXT_MCP_HOST", "127.0.0.1")) parser.add_argument("--port", type=int, default=int(os.getenv("AIW_CONTEXT_MCP_PORT", "8765"))) args = parser.parse_args() if args.transport == "stdio": run_stdio() else: run_http(args.host, args.port) if __name__ == "__main__": main()