Files
fidelity-ai-workspace/scripts/mattermost-proxy/mattermost_mirror.py

515 lines
22 KiB
Python

"""mitmproxy addon for a local Mattermost Desktop mirror.
This addon is intentionally narrow:
- allowlist a Mattermost host
- inspect only /api/v4 REST and WebSocket traffic
- redact secrets
- normalize posts into date-rotated JSONL files for AI context
The output under the profile inbox is raw evidence, not canonical project memory.
"""
from __future__ import annotations
import json
import os
import re
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from mitmproxy import http
DEFAULT_OUT_DIR = "mattermost-mirror"
POST_ID_RE = re.compile(r"^[a-z0-9]{26}$")
SAFE_NAME_RE = re.compile(r"[^a-zA-Z0-9._-]+")
def env_bool(name: str, default: bool = False) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def split_csv(raw: str) -> set[str]:
return {item.strip() for item in raw.replace("\n", ",").split(",") if item.strip()}
class MattermostMirror:
def __init__(self) -> None:
self.out_dir = Path(os.getenv("MATTERMOST_MIRROR_DIR", DEFAULT_OUT_DIR)).resolve()
self.host_allow = os.getenv("MATTERMOST_MIRROR_HOST_ALLOW", "").strip().lower()
self.channel_allow = split_csv(os.getenv("MATTERMOST_MIRROR_CHANNEL_IDS", ""))
self.latest_limit = int(os.getenv("MATTERMOST_MIRROR_LATEST_LIMIT", "200"))
self.write_raw = env_bool("MATTERMOST_MIRROR_WRITE_RAW", default=False)
self.channels_dir = self.out_dir / "channels"
self.by_date_dir = self.out_dir / "by-date"
self.threads_dir = self.out_dir / "threads"
self.refs_dir = self.out_dir / "refs"
self.raw_dir = self.out_dir / "raw"
self.state_path = self.out_dir / "state.json"
self.index_path = self.out_dir / "index.json"
self.latest_jsonl_path = self.out_dir / "latest.jsonl"
self.latest_md_path = self.out_dir / "latest.md"
self.seen_post_ids: set[str] = set()
self.seen_by_file: dict[Path, set[str]] = {}
self.users: dict[str, str] = {}
self.channels: dict[str, str] = {}
self.channel_meta: dict[str, dict[str, Any]] = {}
self.state: dict[str, Any] = {"channels": {}, "users": {}, "updated_at": None}
self._ensure_dirs()
self._load_state()
self._load_recent_seen_ids()
def _ensure_dirs(self) -> None:
self.channels_dir.mkdir(parents=True, exist_ok=True)
self.by_date_dir.mkdir(parents=True, exist_ok=True)
self.threads_dir.mkdir(parents=True, exist_ok=True)
self.refs_dir.mkdir(parents=True, exist_ok=True)
self.raw_dir.mkdir(parents=True, exist_ok=True)
def _load_state(self) -> None:
if not self.state_path.exists():
return
try:
self.state = json.loads(self.state_path.read_text(encoding="utf-8"))
self.users = dict(self.state.get("users") or {})
self.channel_meta = dict(self.state.get("channel_meta") or {})
for channel_id, value in (self.state.get("channels") or {}).items():
if isinstance(value, dict):
name = value.get("channel_name") or value.get("name")
if name:
self.channels[channel_id] = name
except Exception:
self.state = {"channels": {}, "users": {}, "updated_at": None}
def _load_recent_seen_ids(self) -> None:
# Bound startup work: latest.jsonl contains the hot dedupe window. Daily
# files are loaded lazily when older/backfilled messages are encountered.
today = datetime.now(timezone.utc)
for path in [self.latest_jsonl_path, self._daily_by_date_path(today)]:
if not path.exists():
continue
try:
ids = self._load_seen_ids_for_file(path)
self.seen_post_ids.update(ids)
except Exception:
continue
def _load_seen_ids_for_file(self, path: Path) -> set[str]:
if path in self.seen_by_file:
return self.seen_by_file[path]
ids: set[str] = set()
if path.exists():
try:
with path.open("r", encoding="utf-8") as handle:
for line in handle:
if not line.strip():
continue
obj = json.loads(line)
post_id = obj.get("post_id")
if post_id:
ids.add(post_id)
except Exception:
ids = set()
self.seen_by_file[path] = ids
return ids
def _atomic_write_text(self, path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=str(path.parent), delete=False) as tmp:
tmp.write(text)
tmp_path = Path(tmp.name)
tmp_path.replace(path)
def _append_jsonl(self, path: Path, obj: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(obj, ensure_ascii=False, sort_keys=True) + "\n")
def _dt_from_ms(self, value: Any) -> datetime:
try:
ms = int(value)
if ms > 0:
return datetime.fromtimestamp(ms / 1000, timezone.utc)
except Exception:
pass
return datetime.now(timezone.utc)
def _safe_name(self, value: str | None, fallback: str = "unknown") -> str:
raw = (value or fallback).strip() or fallback
safe = SAFE_NAME_RE.sub("-", raw).strip("-._")
return safe or fallback
def _daily_channel_path(self, dt: datetime, channel_name: str | None, channel_id: str | None) -> Path:
channel_slug = self._safe_name(channel_name or channel_id, fallback="unknown-channel")
return self.channels_dir / channel_slug / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl"
def _daily_by_date_path(self, dt: datetime) -> Path:
return self.by_date_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl"
def _thread_path(self, thread_id: str | None) -> Path | None:
if not thread_id:
return None
return self.threads_dir / f"{self._safe_name(thread_id)}.jsonl"
def _daily_raw_path(self, dt: datetime, suffix: str) -> Path:
return self.raw_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}-{suffix}.jsonl"
def _safe_url(self, url: str) -> str:
parsed = urlparse(url)
return parsed._replace(query=parsed.query, fragment="").geturl()
def _is_allowed_host(self, host: str) -> bool:
host = host.lower()
if self.host_allow:
return host == self.host_allow or host.endswith(f".{self.host_allow}")
# The launched Mattermost Desktop app is already scoped to this proxy.
# Some company hosts do not include "mattermost" in the hostname
# (for example, mm.example.com), so default to allowing the proxied
# app's /api/v4 traffic when no explicit host allowlist is configured.
return True
def _is_allowed_channel(self, channel_id: str | None) -> bool:
if not self.channel_allow:
return True
return bool(channel_id and channel_id in self.channel_allow)
def _capture_flow(self, flow: http.HTTPFlow) -> bool:
return self._is_allowed_host(flow.request.pretty_host) and "/api/v4/" in flow.request.path
def _redact_headers(self, headers: Any) -> dict[str, str]:
redacted: dict[str, str] = {}
for key, value in headers.items():
lowered = key.lower()
if lowered in {"authorization", "cookie", "set-cookie", "x-csrf-token"}:
redacted[key] = "[REDACTED]"
else:
redacted[key] = str(value)
return redacted
def _remember_user(self, user: dict[str, Any]) -> None:
user_id = user.get("id")
if not user_id:
return
username = user.get("username") or user.get("nickname") or user.get("first_name") or user_id
self.users[user_id] = username
self._write_refs()
def _remember_channel(self, channel: dict[str, Any]) -> None:
channel_id = channel.get("id")
if not channel_id:
return
self.channel_meta[channel_id] = channel
name = self._channel_label(channel)
self.channels[channel_id] = name
self._write_refs()
def _user_label(self, user_id: str | None) -> str | None:
if not user_id:
return None
return self.users.get(user_id) or user_id
def _channel_label(self, channel: dict[str, Any]) -> str:
channel_id = channel.get("id") or "unknown-channel"
channel_type = channel.get("type")
display_name = (channel.get("display_name") or "").strip()
name = (channel.get("name") or "").strip()
if channel_type == "D":
user_ids = [item for item in name.split("__") if item]
labels = [self._user_label(user_id) or user_id for user_id in user_ids]
if labels:
return "dm-" + "--".join(labels)
if channel_type == "G":
if display_name:
return "group-" + display_name
user_ids = [item for item in name.split("__") if item]
labels = [self._user_label(user_id) or user_id for user_id in user_ids]
if labels:
return "group-" + "--".join(labels)
return display_name or name or channel_id
def _refresh_channel_labels(self) -> None:
changed = False
for channel_id, meta in self.channel_meta.items():
label = self._channel_label(meta)
if label and self.channels.get(channel_id) != label:
self.channels[channel_id] = label
changed = True
if changed:
self._write_refs()
def _write_refs(self) -> None:
users_path = self.refs_dir / "users.json"
channels_path = self.refs_dir / "channels.json"
self._atomic_write_text(users_path, json.dumps(self.users, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
self._atomic_write_text(channels_path, json.dumps(self.channels, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
def _ingest_reference_payload(self, payload: Any) -> None:
if isinstance(payload, list):
for item in payload:
self._ingest_reference_payload(item)
return
if not isinstance(payload, dict):
return
if payload.get("id") and ("username" in payload or "first_name" in payload):
self._remember_user(payload)
if payload.get("id") and ("display_name" in payload or "team_id" in payload) and "type" in payload:
self._remember_channel(payload)
users = payload.get("users")
if isinstance(users, dict):
for user in users.values():
if isinstance(user, dict):
self._remember_user(user)
elif isinstance(users, list):
for user in users:
if isinstance(user, dict):
self._remember_user(user)
channels = payload.get("channels")
if isinstance(channels, list):
for channel in channels:
if isinstance(channel, dict):
self._remember_channel(channel)
self._refresh_channel_labels()
def _normalize_post(self, post: dict[str, Any], source: str, raw_event: str | None = None) -> dict[str, Any] | None:
post_id = post.get("id")
channel_id = post.get("channel_id")
if not post_id or not POST_ID_RE.match(str(post_id)):
return None
if not self._is_allowed_channel(channel_id):
return None
created_dt = self._dt_from_ms(post.get("create_at"))
root_id = post.get("root_id") or None
user_id = post.get("user_id") or None
message = post.get("message") or ""
message_type = "thread_reply" if root_id else "channel_post"
return {
"source": source,
"captured_at": datetime.now(timezone.utc).isoformat(),
"created_at": created_dt.isoformat(),
"created_at_ms": int(post.get("create_at") or created_dt.timestamp() * 1000),
"updated_at_ms": int(post.get("update_at") or 0),
"channel_id": channel_id,
"channel_name": self.channels.get(channel_id) if channel_id else None,
"post_id": post_id,
"root_id": root_id,
"thread_id": root_id or post_id,
"user_id": user_id,
"username": self.users.get(user_id) if user_id else None,
"message": message,
"type": message_type,
"raw_event": raw_event,
"props": post.get("props") or {},
}
def _write_message(self, msg: dict[str, Any]) -> None:
post_id = msg["post_id"]
created_dt = self._dt_from_ms(msg.get("created_at_ms"))
channel_path = self._daily_channel_path(created_dt, msg.get("channel_name"), msg.get("channel_id"))
by_date_path = self._daily_by_date_path(created_dt)
thread_path = self._thread_path(msg.get("thread_id"))
channel_seen = self._load_seen_ids_for_file(channel_path)
by_date_seen = self._load_seen_ids_for_file(by_date_path)
if post_id in self.seen_post_ids or post_id in channel_seen or post_id in by_date_seen:
return
self.seen_post_ids.add(post_id)
channel_seen.add(post_id)
by_date_seen.add(post_id)
self._append_jsonl(channel_path, msg)
self._append_jsonl(by_date_path, msg)
if thread_path:
thread_seen = self._load_seen_ids_for_file(thread_path)
if post_id not in thread_seen:
thread_seen.add(post_id)
self._append_jsonl(thread_path, msg)
self._update_state(msg)
self._update_latest(msg)
self._update_index(created_dt, msg)
def _update_state(self, msg: dict[str, Any]) -> None:
channel_id = msg.get("channel_id") or "unknown"
channels = self.state.setdefault("channels", {})
entry = channels.setdefault(channel_id, {})
if msg.get("channel_name"):
entry["channel_name"] = msg.get("channel_name")
entry["last_seen_create_at"] = max(int(entry.get("last_seen_create_at") or 0), int(msg.get("created_at_ms") or 0))
entry["last_seen_post_id"] = msg.get("post_id")
self.state["users"] = self.users
self.state["channel_meta"] = self.channel_meta
self.state["updated_at"] = datetime.now(timezone.utc).isoformat()
self._atomic_write_text(self.state_path, json.dumps(self.state, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
self._write_refs()
def _read_jsonl(self, path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
records: list[dict[str, Any]] = []
try:
with path.open("r", encoding="utf-8") as handle:
for line in handle:
if line.strip():
records.append(json.loads(line))
except Exception:
return []
return records
def _update_latest(self, msg: dict[str, Any]) -> None:
records = self._read_jsonl(self.latest_jsonl_path)
by_id: dict[str, dict[str, Any]] = {item.get("post_id"): item for item in records if item.get("post_id")}
by_id[msg["post_id"]] = msg
latest = sorted(by_id.values(), key=lambda item: int(item.get("created_at_ms") or 0))[-self.latest_limit :]
jsonl = "".join(json.dumps(item, ensure_ascii=False, sort_keys=True) + "\n" for item in latest)
self._atomic_write_text(self.latest_jsonl_path, jsonl)
self._atomic_write_text(self.latest_md_path, self._render_latest_md(latest))
def _render_latest_md(self, records: list[dict[str, Any]]) -> str:
lines = ["# Latest Mattermost Mirror", "", "Generated from local proxy mirror evidence.", ""]
current_channel = None
for item in records:
channel = item.get("channel_name") or item.get("channel_id") or "unknown-channel"
if channel != current_channel:
lines.extend([f"## {channel}", ""])
current_channel = channel
author = item.get("username") or item.get("user_id") or "unknown-user"
created = item.get("created_at") or "unknown-time"
prefix = "reply" if item.get("type") == "thread_reply" else "post"
text = (item.get("message") or "").strip()
lines.append(f"- {created} {author} ({prefix} `{item.get('post_id')}`): {text}")
lines.append("")
return "\n".join(lines)
def _update_index(self, dt: datetime, msg: dict[str, Any]) -> None:
index: dict[str, Any] = {"dates": [], "channels": {}, "updated_at": None}
if self.index_path.exists():
try:
index = json.loads(self.index_path.read_text(encoding="utf-8"))
except Exception:
pass
date_key = f"{dt:%Y-%m-%d}"
channel_path = self._daily_channel_path(dt, msg.get("channel_name"), msg.get("channel_id"))
by_date_path = self._daily_by_date_path(dt)
thread_path = self._thread_path(msg.get("thread_id"))
channel_rel_path = str(channel_path.relative_to(self.out_dir))
by_date_rel_path = str(by_date_path.relative_to(self.out_dir))
dates = set(index.get("dates") or [])
dates.add(date_key)
index["dates"] = sorted(dates)
by_date = index.setdefault("by_date", {})
by_date[date_key] = by_date_rel_path
channel_key = msg.get("channel_name") or msg.get("channel_id") or "unknown-channel"
channels = index.setdefault("channels", {})
channel_entry = channels.setdefault(channel_key, {"channel_id": msg.get("channel_id"), "files": []})
channel_entry["channel_id"] = msg.get("channel_id")
files = set(channel_entry.get("files") or [])
files.add(channel_rel_path)
channel_entry["files"] = sorted(files)
if thread_path:
threads = index.setdefault("threads", {})
threads[msg.get("thread_id")] = str(thread_path.relative_to(self.out_dir))
index["updated_at"] = datetime.now(timezone.utc).isoformat()
self._atomic_write_text(self.index_path, json.dumps(index, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
def _write_raw(self, suffix: str, obj: dict[str, Any]) -> None:
if not self.write_raw:
return
self._append_jsonl(self._daily_raw_path(datetime.now(timezone.utc), suffix), obj)
def response(self, flow: http.HTTPFlow) -> None:
if not self._capture_flow(flow) or not flow.response:
return
content_type = flow.response.headers.get("content-type", "")
if "json" not in content_type:
return
try:
payload = flow.response.json()
except Exception:
return
self._ingest_reference_payload(payload)
path = flow.request.path
raw_record = {
"captured_at": datetime.now(timezone.utc).isoformat(),
"method": flow.request.method,
"url": self._safe_url(flow.request.pretty_url),
"path": path,
"status_code": flow.response.status_code,
"request_headers": self._redact_headers(flow.request.headers),
"response": payload,
}
self._write_raw("rest-flows", raw_record)
# Mattermost post-list shape: { order: [...], posts: {post_id: {...}} }
if isinstance(payload, dict) and isinstance(payload.get("posts"), dict):
for post in payload["posts"].values():
if isinstance(post, dict):
normalized = self._normalize_post(post, source="rest", raw_event="posts")
if normalized:
self._write_message(normalized)
elif isinstance(payload, dict) and payload.get("id") and payload.get("message") is not None:
normalized = self._normalize_post(payload, source="rest", raw_event="post")
if normalized:
self._write_message(normalized)
def websocket_message(self, flow: http.HTTPFlow) -> None:
if not self._is_allowed_host(flow.request.pretty_host):
return
if "/api/v4/websocket" not in flow.request.path:
return
if not flow.websocket or not flow.websocket.messages:
return
message = flow.websocket.messages[-1]
if message.from_client:
return
try:
text = message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content)
payload = json.loads(text)
except Exception:
return
self._write_raw("websocket", {
"captured_at": datetime.now(timezone.utc).isoformat(),
"url": self._safe_url(flow.request.pretty_url),
"event": payload.get("event"),
"seq": payload.get("seq"),
"data": payload.get("data"),
"broadcast": payload.get("broadcast"),
})
event = payload.get("event")
if event != "posted":
return
data = payload.get("data") or {}
post_raw = data.get("post")
if not post_raw:
return
try:
post = json.loads(post_raw)
except Exception:
return
normalized = self._normalize_post(post, source="websocket", raw_event=event)
if normalized:
self._write_message(normalized)
addons = [MattermostMirror()]