417 lines
17 KiB
Python
417 lines
17 KiB
Python
"""mitmproxy addon for a local Mattermost Desktop mirror.
|
|
|
|
This addon is intentionally narrow:
|
|
- allowlist a Mattermost host
|
|
- inspect only /api/v4 REST and WebSocket traffic
|
|
- redact secrets
|
|
- normalize posts into date-rotated JSONL files for AI context
|
|
|
|
The output under ai/inbox/ is raw evidence, not canonical project memory.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
from mitmproxy import http
|
|
|
|
|
|
DEFAULT_OUT_DIR = "ai/inbox/mattermost-mirror"
|
|
POST_ID_RE = re.compile(r"^[a-z0-9]{26}$")
|
|
|
|
|
|
def env_bool(name: str, default: bool = False) -> bool:
|
|
raw = os.getenv(name)
|
|
if raw is None:
|
|
return default
|
|
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def split_csv(raw: str) -> set[str]:
|
|
return {item.strip() for item in raw.replace("\n", ",").split(",") if item.strip()}
|
|
|
|
|
|
class MattermostMirror:
|
|
def __init__(self) -> None:
|
|
self.out_dir = Path(os.getenv("MATTERMOST_MIRROR_DIR", DEFAULT_OUT_DIR)).resolve()
|
|
self.host_allow = os.getenv("MATTERMOST_MIRROR_HOST_ALLOW", "").strip().lower()
|
|
self.channel_allow = split_csv(os.getenv("MATTERMOST_MIRROR_CHANNEL_IDS", ""))
|
|
self.latest_limit = int(os.getenv("MATTERMOST_MIRROR_LATEST_LIMIT", "200"))
|
|
self.write_raw = env_bool("MATTERMOST_MIRROR_WRITE_RAW", default=False)
|
|
|
|
self.messages_dir = self.out_dir / "messages"
|
|
self.raw_dir = self.out_dir / "raw"
|
|
self.state_path = self.out_dir / "state.json"
|
|
self.index_path = self.out_dir / "index.json"
|
|
self.latest_jsonl_path = self.out_dir / "latest.jsonl"
|
|
self.latest_md_path = self.out_dir / "latest.md"
|
|
|
|
self.seen_post_ids: set[str] = set()
|
|
self.seen_by_file: dict[Path, set[str]] = {}
|
|
self.users: dict[str, str] = {}
|
|
self.channels: dict[str, str] = {}
|
|
self.state: dict[str, Any] = {"channels": {}, "users": {}, "updated_at": None}
|
|
|
|
self._ensure_dirs()
|
|
self._load_state()
|
|
self._load_recent_seen_ids()
|
|
|
|
def _ensure_dirs(self) -> None:
|
|
self.messages_dir.mkdir(parents=True, exist_ok=True)
|
|
self.raw_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_state(self) -> None:
|
|
if not self.state_path.exists():
|
|
return
|
|
try:
|
|
self.state = json.loads(self.state_path.read_text(encoding="utf-8"))
|
|
self.users = dict(self.state.get("users") or {})
|
|
for channel_id, value in (self.state.get("channels") or {}).items():
|
|
if isinstance(value, dict):
|
|
name = value.get("channel_name") or value.get("name")
|
|
if name:
|
|
self.channels[channel_id] = name
|
|
except Exception:
|
|
self.state = {"channels": {}, "users": {}, "updated_at": None}
|
|
|
|
def _load_recent_seen_ids(self) -> None:
|
|
# Bound startup work: latest.jsonl contains the hot dedupe window. Daily
|
|
# files are loaded lazily when older/backfilled messages are encountered.
|
|
for path in [self.latest_jsonl_path, self._daily_messages_path(datetime.now(timezone.utc))]:
|
|
if not path.exists():
|
|
continue
|
|
try:
|
|
ids = self._load_seen_ids_for_file(path)
|
|
self.seen_post_ids.update(ids)
|
|
except Exception:
|
|
continue
|
|
|
|
def _load_seen_ids_for_file(self, path: Path) -> set[str]:
|
|
if path in self.seen_by_file:
|
|
return self.seen_by_file[path]
|
|
ids: set[str] = set()
|
|
if path.exists():
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
if not line.strip():
|
|
continue
|
|
obj = json.loads(line)
|
|
post_id = obj.get("post_id")
|
|
if post_id:
|
|
ids.add(post_id)
|
|
except Exception:
|
|
ids = set()
|
|
self.seen_by_file[path] = ids
|
|
return ids
|
|
|
|
def _atomic_write_text(self, path: Path, text: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=str(path.parent), delete=False) as tmp:
|
|
tmp.write(text)
|
|
tmp_path = Path(tmp.name)
|
|
tmp_path.replace(path)
|
|
|
|
def _append_jsonl(self, path: Path, obj: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("a", encoding="utf-8") as handle:
|
|
handle.write(json.dumps(obj, ensure_ascii=False, sort_keys=True) + "\n")
|
|
|
|
def _dt_from_ms(self, value: Any) -> datetime:
|
|
try:
|
|
ms = int(value)
|
|
if ms > 0:
|
|
return datetime.fromtimestamp(ms / 1000, timezone.utc)
|
|
except Exception:
|
|
pass
|
|
return datetime.now(timezone.utc)
|
|
|
|
def _daily_messages_path(self, dt: datetime) -> Path:
|
|
return self.messages_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl"
|
|
|
|
def _daily_raw_path(self, dt: datetime, suffix: str) -> Path:
|
|
return self.raw_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}-{suffix}.jsonl"
|
|
|
|
def _safe_url(self, url: str) -> str:
|
|
parsed = urlparse(url)
|
|
return parsed._replace(query=parsed.query, fragment="").geturl()
|
|
|
|
def _is_allowed_host(self, host: str) -> bool:
|
|
host = host.lower()
|
|
if self.host_allow:
|
|
return host == self.host_allow or host.endswith(f".{self.host_allow}")
|
|
return "mattermost" in host
|
|
|
|
def _is_allowed_channel(self, channel_id: str | None) -> bool:
|
|
if not self.channel_allow:
|
|
return True
|
|
return bool(channel_id and channel_id in self.channel_allow)
|
|
|
|
def _capture_flow(self, flow: http.HTTPFlow) -> bool:
|
|
return self._is_allowed_host(flow.request.pretty_host) and "/api/v4/" in flow.request.path
|
|
|
|
def _redact_headers(self, headers: Any) -> dict[str, str]:
|
|
redacted: dict[str, str] = {}
|
|
for key, value in headers.items():
|
|
lowered = key.lower()
|
|
if lowered in {"authorization", "cookie", "set-cookie", "x-csrf-token"}:
|
|
redacted[key] = "[REDACTED]"
|
|
else:
|
|
redacted[key] = str(value)
|
|
return redacted
|
|
|
|
def _remember_user(self, user: dict[str, Any]) -> None:
|
|
user_id = user.get("id")
|
|
if not user_id:
|
|
return
|
|
username = user.get("username") or user.get("nickname") or user.get("first_name") or user_id
|
|
self.users[user_id] = username
|
|
|
|
def _remember_channel(self, channel: dict[str, Any]) -> None:
|
|
channel_id = channel.get("id")
|
|
if not channel_id:
|
|
return
|
|
name = channel.get("name") or channel.get("display_name") or channel_id
|
|
self.channels[channel_id] = name
|
|
|
|
def _ingest_reference_payload(self, payload: Any) -> None:
|
|
if isinstance(payload, list):
|
|
for item in payload:
|
|
self._ingest_reference_payload(item)
|
|
return
|
|
if not isinstance(payload, dict):
|
|
return
|
|
|
|
if payload.get("id") and ("username" in payload or "first_name" in payload):
|
|
self._remember_user(payload)
|
|
if payload.get("id") and ("display_name" in payload or "team_id" in payload) and "type" in payload:
|
|
self._remember_channel(payload)
|
|
|
|
users = payload.get("users")
|
|
if isinstance(users, dict):
|
|
for user in users.values():
|
|
if isinstance(user, dict):
|
|
self._remember_user(user)
|
|
elif isinstance(users, list):
|
|
for user in users:
|
|
if isinstance(user, dict):
|
|
self._remember_user(user)
|
|
|
|
channels = payload.get("channels")
|
|
if isinstance(channels, list):
|
|
for channel in channels:
|
|
if isinstance(channel, dict):
|
|
self._remember_channel(channel)
|
|
|
|
def _normalize_post(self, post: dict[str, Any], source: str, raw_event: str | None = None) -> dict[str, Any] | None:
|
|
post_id = post.get("id")
|
|
channel_id = post.get("channel_id")
|
|
if not post_id or not POST_ID_RE.match(str(post_id)):
|
|
return None
|
|
if not self._is_allowed_channel(channel_id):
|
|
return None
|
|
|
|
created_dt = self._dt_from_ms(post.get("create_at"))
|
|
root_id = post.get("root_id") or None
|
|
user_id = post.get("user_id") or None
|
|
message = post.get("message") or ""
|
|
message_type = "thread_reply" if root_id else "channel_post"
|
|
|
|
return {
|
|
"source": source,
|
|
"captured_at": datetime.now(timezone.utc).isoformat(),
|
|
"created_at": created_dt.isoformat(),
|
|
"created_at_ms": int(post.get("create_at") or created_dt.timestamp() * 1000),
|
|
"updated_at_ms": int(post.get("update_at") or 0),
|
|
"channel_id": channel_id,
|
|
"channel_name": self.channels.get(channel_id) if channel_id else None,
|
|
"post_id": post_id,
|
|
"root_id": root_id,
|
|
"thread_id": root_id or post_id,
|
|
"user_id": user_id,
|
|
"username": self.users.get(user_id) if user_id else None,
|
|
"message": message,
|
|
"type": message_type,
|
|
"raw_event": raw_event,
|
|
"props": post.get("props") or {},
|
|
}
|
|
|
|
def _write_message(self, msg: dict[str, Any]) -> None:
|
|
post_id = msg["post_id"]
|
|
created_dt = self._dt_from_ms(msg.get("created_at_ms"))
|
|
daily_path = self._daily_messages_path(created_dt)
|
|
daily_seen = self._load_seen_ids_for_file(daily_path)
|
|
if post_id in self.seen_post_ids or post_id in daily_seen:
|
|
return
|
|
|
|
self.seen_post_ids.add(post_id)
|
|
daily_seen.add(post_id)
|
|
self._append_jsonl(daily_path, msg)
|
|
self._update_state(msg)
|
|
self._update_latest(msg)
|
|
self._update_index(created_dt, msg)
|
|
|
|
def _update_state(self, msg: dict[str, Any]) -> None:
|
|
channel_id = msg.get("channel_id") or "unknown"
|
|
channels = self.state.setdefault("channels", {})
|
|
entry = channels.setdefault(channel_id, {})
|
|
if msg.get("channel_name"):
|
|
entry["channel_name"] = msg.get("channel_name")
|
|
entry["last_seen_create_at"] = max(int(entry.get("last_seen_create_at") or 0), int(msg.get("created_at_ms") or 0))
|
|
entry["last_seen_post_id"] = msg.get("post_id")
|
|
self.state["users"] = self.users
|
|
self.state["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
self._atomic_write_text(self.state_path, json.dumps(self.state, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
|
|
|
|
def _read_jsonl(self, path: Path) -> list[dict[str, Any]]:
|
|
if not path.exists():
|
|
return []
|
|
records: list[dict[str, Any]] = []
|
|
try:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
for line in handle:
|
|
if line.strip():
|
|
records.append(json.loads(line))
|
|
except Exception:
|
|
return []
|
|
return records
|
|
|
|
def _update_latest(self, msg: dict[str, Any]) -> None:
|
|
records = self._read_jsonl(self.latest_jsonl_path)
|
|
by_id: dict[str, dict[str, Any]] = {item.get("post_id"): item for item in records if item.get("post_id")}
|
|
by_id[msg["post_id"]] = msg
|
|
latest = sorted(by_id.values(), key=lambda item: int(item.get("created_at_ms") or 0))[-self.latest_limit :]
|
|
jsonl = "".join(json.dumps(item, ensure_ascii=False, sort_keys=True) + "\n" for item in latest)
|
|
self._atomic_write_text(self.latest_jsonl_path, jsonl)
|
|
self._atomic_write_text(self.latest_md_path, self._render_latest_md(latest))
|
|
|
|
def _render_latest_md(self, records: list[dict[str, Any]]) -> str:
|
|
lines = ["# Latest Mattermost Mirror", "", "Generated from local proxy mirror evidence.", ""]
|
|
current_channel = None
|
|
for item in records:
|
|
channel = item.get("channel_name") or item.get("channel_id") or "unknown-channel"
|
|
if channel != current_channel:
|
|
lines.extend([f"## {channel}", ""])
|
|
current_channel = channel
|
|
author = item.get("username") or item.get("user_id") or "unknown-user"
|
|
created = item.get("created_at") or "unknown-time"
|
|
prefix = "reply" if item.get("type") == "thread_reply" else "post"
|
|
text = (item.get("message") or "").strip()
|
|
lines.append(f"- {created} {author} ({prefix} `{item.get('post_id')}`): {text}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
def _update_index(self, dt: datetime, msg: dict[str, Any]) -> None:
|
|
index: dict[str, Any] = {"dates": [], "channels": {}, "updated_at": None}
|
|
if self.index_path.exists():
|
|
try:
|
|
index = json.loads(self.index_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
date_key = f"{dt:%Y-%m-%d}"
|
|
rel_path = str(self._daily_messages_path(dt).relative_to(self.out_dir))
|
|
dates = set(index.get("dates") or [])
|
|
dates.add(date_key)
|
|
index["dates"] = sorted(dates)
|
|
|
|
channel_key = msg.get("channel_name") or msg.get("channel_id") or "unknown-channel"
|
|
channels = index.setdefault("channels", {})
|
|
channel_entry = channels.setdefault(channel_key, {"files": []})
|
|
files = set(channel_entry.get("files") or [])
|
|
files.add(rel_path)
|
|
channel_entry["files"] = sorted(files)
|
|
index["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
self._atomic_write_text(self.index_path, json.dumps(index, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
|
|
|
|
def _write_raw(self, suffix: str, obj: dict[str, Any]) -> None:
|
|
if not self.write_raw:
|
|
return
|
|
self._append_jsonl(self._daily_raw_path(datetime.now(timezone.utc), suffix), obj)
|
|
|
|
def response(self, flow: http.HTTPFlow) -> None:
|
|
if not self._capture_flow(flow) or not flow.response:
|
|
return
|
|
content_type = flow.response.headers.get("content-type", "")
|
|
if "json" not in content_type:
|
|
return
|
|
try:
|
|
payload = flow.response.json()
|
|
except Exception:
|
|
return
|
|
|
|
self._ingest_reference_payload(payload)
|
|
|
|
path = flow.request.path
|
|
raw_record = {
|
|
"captured_at": datetime.now(timezone.utc).isoformat(),
|
|
"method": flow.request.method,
|
|
"url": self._safe_url(flow.request.pretty_url),
|
|
"path": path,
|
|
"status_code": flow.response.status_code,
|
|
"request_headers": self._redact_headers(flow.request.headers),
|
|
"response": payload,
|
|
}
|
|
self._write_raw("rest-flows", raw_record)
|
|
|
|
# Mattermost post-list shape: { order: [...], posts: {post_id: {...}} }
|
|
if isinstance(payload, dict) and isinstance(payload.get("posts"), dict):
|
|
for post in payload["posts"].values():
|
|
if isinstance(post, dict):
|
|
normalized = self._normalize_post(post, source="rest", raw_event="posts")
|
|
if normalized:
|
|
self._write_message(normalized)
|
|
elif isinstance(payload, dict) and payload.get("id") and payload.get("message") is not None:
|
|
normalized = self._normalize_post(payload, source="rest", raw_event="post")
|
|
if normalized:
|
|
self._write_message(normalized)
|
|
|
|
def websocket_message(self, flow: http.HTTPFlow) -> None:
|
|
if not self._is_allowed_host(flow.request.pretty_host):
|
|
return
|
|
if "/api/v4/websocket" not in flow.request.path:
|
|
return
|
|
if not flow.websocket or not flow.websocket.messages:
|
|
return
|
|
message = flow.websocket.messages[-1]
|
|
if message.from_client:
|
|
return
|
|
try:
|
|
text = message.content.decode("utf-8") if isinstance(message.content, bytes) else str(message.content)
|
|
payload = json.loads(text)
|
|
except Exception:
|
|
return
|
|
|
|
self._write_raw("websocket", {
|
|
"captured_at": datetime.now(timezone.utc).isoformat(),
|
|
"url": self._safe_url(flow.request.pretty_url),
|
|
"event": payload.get("event"),
|
|
"seq": payload.get("seq"),
|
|
"data": payload.get("data"),
|
|
"broadcast": payload.get("broadcast"),
|
|
})
|
|
|
|
event = payload.get("event")
|
|
if event != "posted":
|
|
return
|
|
data = payload.get("data") or {}
|
|
post_raw = data.get("post")
|
|
if not post_raw:
|
|
return
|
|
try:
|
|
post = json.loads(post_raw)
|
|
except Exception:
|
|
return
|
|
normalized = self._normalize_post(post, source="websocket", raw_event=event)
|
|
if normalized:
|
|
self._write_message(normalized)
|
|
|
|
|
|
addons = [MattermostMirror()]
|