"""mitmproxy addon for a local Mattermost Desktop mirror. This addon is intentionally narrow: - allowlist a Mattermost host - inspect only /api/v4 REST and WebSocket traffic - redact secrets - normalize posts into date-rotated JSONL files for AI context The output under ai/inbox/ is raw evidence, not canonical project memory. """ from __future__ import annotations import json import os import re import tempfile from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import urlparse from mitmproxy import http DEFAULT_OUT_DIR = "ai/inbox/mattermost-mirror" POST_ID_RE = re.compile(r"^[a-z0-9]{26}$") def env_bool(name: str, default: bool = False) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def split_csv(raw: str) -> set[str]: return {item.strip() for item in raw.replace("\n", ",").split(",") if item.strip()} class MattermostMirror: def __init__(self) -> None: self.out_dir = Path(os.getenv("MATTERMOST_MIRROR_DIR", DEFAULT_OUT_DIR)).resolve() self.host_allow = os.getenv("MATTERMOST_MIRROR_HOST_ALLOW", "").strip().lower() self.channel_allow = split_csv(os.getenv("MATTERMOST_MIRROR_CHANNEL_IDS", "")) self.latest_limit = int(os.getenv("MATTERMOST_MIRROR_LATEST_LIMIT", "200")) self.write_raw = env_bool("MATTERMOST_MIRROR_WRITE_RAW", default=False) self.messages_dir = self.out_dir / "messages" self.raw_dir = self.out_dir / "raw" self.state_path = self.out_dir / "state.json" self.index_path = self.out_dir / "index.json" self.latest_jsonl_path = self.out_dir / "latest.jsonl" self.latest_md_path = self.out_dir / "latest.md" self.seen_post_ids: set[str] = set() self.users: dict[str, str] = {} self.channels: dict[str, str] = {} self.state: dict[str, Any] = {"channels": {}, "users": {}, "updated_at": None} self._ensure_dirs() self._load_state() self._load_recent_seen_ids() def _ensure_dirs(self) -> None: self.messages_dir.mkdir(parents=True, exist_ok=True) self.raw_dir.mkdir(parents=True, exist_ok=True) def _load_state(self) -> None: if not self.state_path.exists(): return try: self.state = json.loads(self.state_path.read_text(encoding="utf-8")) self.users = dict(self.state.get("users") or {}) for channel_id, value in (self.state.get("channels") or {}).items(): if isinstance(value, dict): name = value.get("channel_name") or value.get("name") if name: self.channels[channel_id] = name except Exception: self.state = {"channels": {}, "users": {}, "updated_at": None} def _load_recent_seen_ids(self) -> None: # Bound startup work: latest.jsonl contains the dedupe window. Daily files also # protect same-day restarts below. for path in [self.latest_jsonl_path, self._daily_messages_path(datetime.now(timezone.utc))]: if not path.exists(): continue try: with path.open("r", encoding="utf-8") as handle: for line in handle: if not line.strip(): continue obj = json.loads(line) post_id = obj.get("post_id") if post_id: self.seen_post_ids.add(post_id) except Exception: continue def _atomic_write_text(self, path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=str(path.parent), delete=False) as tmp: tmp.write(text) tmp_path = Path(tmp.name) tmp_path.replace(path) def _append_jsonl(self, path: Path, obj: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(obj, ensure_ascii=False, sort_keys=True) + "\n") def _dt_from_ms(self, value: Any) -> datetime: try: ms = int(value) if ms > 0: return datetime.fromtimestamp(ms / 1000, timezone.utc) except Exception: pass return datetime.now(timezone.utc) def _daily_messages_path(self, dt: datetime) -> Path: return self.messages_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl" def _daily_raw_path(self, dt: datetime, suffix: str) -> Path: return self.raw_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}-{suffix}.jsonl" def _safe_url(self, url: str) -> str: parsed = urlparse(url) return parsed._replace(query=parsed.query, fragment="").geturl() def _is_allowed_host(self, host: str) -> bool: host = host.lower() if self.host_allow: return host == self.host_allow or host.endswith(f".{self.host_allow}") return "mattermost" in host def _is_allowed_channel(self, channel_id: str | None) -> bool: if not self.channel_allow: return True return bool(channel_id and channel_id in self.channel_allow) def _capture_flow(self, flow: http.HTTPFlow) -> bool: return self._is_allowed_host(flow.request.pretty_host) and "/api/v4/" in flow.request.path def _redact_headers(self, headers: Any) -> dict[str, str]: redacted: dict[str, str] = {} for key, value in headers.items(): lowered = key.lower() if lowered in {"authorization", "cookie", "set-cookie", "x-csrf-token"}: redacted[key] = "[REDACTED]" else: redacted[key] = str(value) return redacted def _remember_user(self, user: dict[str, Any]) -> None: user_id = user.get("id") if not user_id: return username = user.get("username") or user.get("nickname") or user.get("first_name") or user_id self.users[user_id] = username def _remember_channel(self, channel: dict[str, Any]) -> None: channel_id = channel.get("id") if not channel_id: return name = channel.get("name") or channel.get("display_name") or channel_id self.channels[channel_id] = name def _ingest_reference_payload(self, payload: Any) -> None: if isinstance(payload, list): for item in payload: self._ingest_reference_payload(item) return if not isinstance(payload, dict): return if payload.get("id") and ("username" in payload or "first_name" in payload): self._remember_user(payload) if payload.get("id") and ("display_name" in payload or "team_id" in payload) and "type" in payload: self._remember_channel(payload) users = payload.get("users") if isinstance(users, dict): for user in users.values(): if isinstance(user, dict): self._remember_user(user) elif isinstance(users, list): for user in users: if isinstance(user, dict): self._remember_user(user) channels = payload.get("channels") if isinstance(channels, list): for channel in channels: if isinstance(channel, dict): self._remember_channel(channel) def _normalize_post(self, post: dict[str, Any], source: str, raw_event: str | None = None) -> dict[str, Any] | None: post_id = post.get("id") channel_id = post.get("channel_id") if not post_id or not POST_ID_RE.match(str(post_id)): return None if not self._is_allowed_channel(channel_id): return None created_dt = self._dt_from_ms(post.get("create_at")) root_id = post.get("root_id") or None user_id = post.get("user_id") or None message = post.get("message") or "" message_type = "thread_reply" if root_id else "channel_post" return { "source": source, "captured_at": datetime.now(timezone.utc).isoformat(), "created_at": created_dt.isoformat(), "created_at_ms": int(post.get("create_at") or created_dt.timestamp() * 1000), "updated_at_ms": int(post.get("update_at") or 0), "channel_id": channel_id, "channel_name": self.channels.get(channel_id) if channel_id else None, "post_id": post_id, "root_id": root_id, "thread_id": root_id or post_id, "user_id": user_id, "username": self.users.get(user_id) if user_id else None, "message": message, "type": message_type, "raw_event": raw_event, "props": post.get("props") or {}, } def _write_message(self, msg: dict[str, Any]) -> None: post_id = msg["post_id"] if post_id in self.seen_post_ids: return self.seen_post_ids.add(post_id) created_dt = self._dt_from_ms(msg.get("created_at_ms")) self._append_jsonl(self._daily_messages_path(created_dt), msg) self._update_state(msg) self._update_latest(msg) self._update_index(created_dt, msg) def _update_state(self, msg: dict[str, Any]) -> None: channel_id = msg.get("channel_id") or "unknown" channels = self.state.setdefault("channels", {}) entry = channels.setdefault(channel_id, {}) if msg.get("channel_name"): entry["channel_name"] = msg.get("channel_name") entry["last_seen_create_at"] = max(int(entry.get("last_seen_create_at") or 0), int(msg.get("created_at_ms") or 0)) entry["last_seen_post_id"] = msg.get("post_id") self.state["users"] = self.users self.state["updated_at"] = datetime.now(timezone.utc).isoformat() self._atomic_write_text(self.state_path, json.dumps(self.state, ensure_ascii=False, indent=2, sort_keys=True) + "\n") def _read_jsonl(self, path: Path) -> list[dict[str, Any]]: if not path.exists(): return [] records: list[dict[str, Any]] = [] try: with path.open("r", encoding="utf-8") as handle: for line in handle: if line.strip(): records.append(json.loads(line)) except Exception: return [] return records def _update_latest(self, msg: dict[str, Any]) -> None: records = self._read_jsonl(self.latest_jsonl_path) by_id: dict[str, dict[str, Any]] = {item.get("post_id"): item for item in records if item.get("post_id")} by_id[msg["post_id"]] = msg latest = sorted(by_id.values(), key=lambda item: int(item.get("created_at_ms") or 0))[-self.latest_limit :] jsonl = "".join(json.dumps(item, ensure_ascii=False, sort_keys=True) + "\n" for item in latest) self._atomic_write_text(self.latest_jsonl_path, jsonl) self._atomic_write_text(self.latest_md_path, self._render_latest_md(latest)) def _render_latest_md(self, records: list[dict[str, Any]]) -> str: lines = ["# Latest Mattermost Mirror", "", "Generated from local proxy mirror evidence.", ""] current_channel = None for item in records: channel = item.get("channel_name") or item.get("channel_id") or "unknown-channel" if channel != current_channel: lines.extend([f"## {channel}", ""]) current_channel = channel author = item.get("username") or item.get("user_id") or "unknown-user" created = item.get("created_at") or "unknown-time" prefix = "reply" if item.get("type") == "thread_reply" else "post" text = (item.get("message") or "").strip() lines.append(f"- {created} {author} ({prefix} `{item.get('post_id')}`): {text}") lines.append("") return "\n".join(lines) def _update_index(self, dt: datetime, msg: dict[str, Any]) -> None: index: dict[str, Any] = {"dates": [], "channels": {}, "updated_at": None} if self.index_path.exists(): try: index = json.loads(self.index_path.read_text(encoding="utf-8")) except Exception: pass date_key = f"{dt:%Y-%m-%d}" rel_path = str(self._daily_messages_path(dt).relative_to(self.out_dir)) dates = set(index.get("dates") or []) dates.add(date_key) index["dates"] = sorted(dates) channel_key = msg.get("channel_name") or msg.get("channel_id") or "unknown-channel" channels = index.setdefault("channels", {}) channel_entry = channels.setdefault(channel_key, {"files": []}) files = set(channel_entry.get("files") or []) files.add(rel_path) channel_entry["files"] = sorted(files) index["updated_at"] = datetime.now(timezone.utc).isoformat() self._atomic_write_text(self.index_path, json.dumps(index, ensure_ascii=False, indent=2, sort_keys=True) + "\n") def _write_raw(self, suffix: str, obj: dict[str, Any]) -> None: if not self.write_raw: return self._append_jsonl(self._daily_raw_path(datetime.now(timezone.utc), suffix), obj) def response(self, flow: http.HTTPFlow) -> None: if not self._capture_flow(flow) or not flow.response: return content_type = flow.response.headers.get("content-type", "") if "json" not in content_type: return try: payload = flow.response.json() except Exception: return self._ingest_reference_payload(payload) path = flow.request.path raw_record = { "captured_at": datetime.now(timezone.utc).isoformat(), "method": flow.request.method, "url": self._safe_url(flow.request.pretty_url), "path": path, "status_code": flow.response.status_code, "request_headers": self._redact_headers(flow.request.headers), "response": payload, } self._write_raw("rest-flows", raw_record) # Mattermost post-list shape: { order: [...], posts: {post_id: {...}} } if isinstance(payload, dict) and isinstance(payload.get("posts"), dict): for post in payload["posts"].values(): if isinstance(post, dict): normalized = self._normalize_post(post, source="rest", raw_event="posts") if normalized: self._write_message(normalized) elif isinstance(payload, dict) and payload.get("id") and payload.get("message") is not None: normalized = self._normalize_post(payload, source="rest", raw_event="post") if normalized: self._write_message(normalized) def websocket_message(self, flow: http.HTTPFlow) -> None: if not self._is_allowed_host(flow.request.pretty_host): return if "/api/v4/websocket" not in flow.request.path: return if not flow.websocket or not flow.websocket.messages: return message = flow.websocket.messages[-1] if message.from_client: return try: text = message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content) payload = json.loads(text) except Exception: return self._write_raw("websocket", { "captured_at": datetime.now(timezone.utc).isoformat(), "url": self._safe_url(flow.request.pretty_url), "event": payload.get("event"), "seq": payload.get("seq"), "data": payload.get("data"), "broadcast": payload.get("broadcast"), }) event = payload.get("event") if event != "posted": return data = payload.get("data") or {} post_raw = data.get("post") if not post_raw: return try: post = json.loads(post_raw) except Exception: return normalized = self._normalize_post(post, source="websocket", raw_event=event) if normalized: self._write_message(normalized) addons = [MattermostMirror()]