feat: enhance Mattermost mirror structure with channels, by-date, and thread directories for improved organization and analysis

This commit is contained in:
2026-05-19 16:32:05 -06:00
parent 3816487bec
commit d318701899
2 changed files with 82 additions and 18 deletions

View File

@@ -25,6 +25,7 @@ from mitmproxy import http
DEFAULT_OUT_DIR = "ai/inbox/mattermost-mirror"
POST_ID_RE = re.compile(r"^[a-z0-9]{26}$")
SAFE_NAME_RE = re.compile(r"[^a-zA-Z0-9._-]+")
def env_bool(name: str, default: bool = False) -> bool:
@@ -46,7 +47,10 @@ class MattermostMirror:
self.latest_limit = int(os.getenv("MATTERMOST_MIRROR_LATEST_LIMIT", "200"))
self.write_raw = env_bool("MATTERMOST_MIRROR_WRITE_RAW", default=False)
self.messages_dir = self.out_dir / "messages"
self.channels_dir = self.out_dir / "channels"
self.by_date_dir = self.out_dir / "by-date"
self.threads_dir = self.out_dir / "threads"
self.refs_dir = self.out_dir / "refs"
self.raw_dir = self.out_dir / "raw"
self.state_path = self.out_dir / "state.json"
self.index_path = self.out_dir / "index.json"
@@ -64,7 +68,10 @@ class MattermostMirror:
self._load_recent_seen_ids()
def _ensure_dirs(self) -> None:
self.messages_dir.mkdir(parents=True, exist_ok=True)
self.channels_dir.mkdir(parents=True, exist_ok=True)
self.by_date_dir.mkdir(parents=True, exist_ok=True)
self.threads_dir.mkdir(parents=True, exist_ok=True)
self.refs_dir.mkdir(parents=True, exist_ok=True)
self.raw_dir.mkdir(parents=True, exist_ok=True)
def _load_state(self) -> None:
@@ -84,7 +91,8 @@ class MattermostMirror:
def _load_recent_seen_ids(self) -> None:
# Bound startup work: latest.jsonl contains the hot dedupe window. Daily
# files are loaded lazily when older/backfilled messages are encountered.
for path in [self.latest_jsonl_path, self._daily_messages_path(datetime.now(timezone.utc))]:
today = datetime.now(timezone.utc)
for path in [self.latest_jsonl_path, self._daily_by_date_path(today)]:
if not path.exists():
continue
try:
@@ -133,8 +141,22 @@ class MattermostMirror:
pass
return datetime.now(timezone.utc)
def _daily_messages_path(self, dt: datetime) -> Path:
return self.messages_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl"
def _safe_name(self, value: str | None, fallback: str = "unknown") -> str:
raw = (value or fallback).strip() or fallback
safe = SAFE_NAME_RE.sub("-", raw).strip("-._")
return safe or fallback
def _daily_channel_path(self, dt: datetime, channel_name: str | None, channel_id: str | None) -> Path:
channel_slug = self._safe_name(channel_name or channel_id, fallback="unknown-channel")
return self.channels_dir / channel_slug / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl"
def _daily_by_date_path(self, dt: datetime) -> Path:
return self.by_date_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}.jsonl"
def _thread_path(self, thread_id: str | None) -> Path | None:
if not thread_id:
return None
return self.threads_dir / f"{self._safe_name(thread_id)}.jsonl"
def _daily_raw_path(self, dt: datetime, suffix: str) -> Path:
return self.raw_dir / f"{dt:%Y}" / f"{dt:%m}" / f"{dt:%Y-%m-%d}-{suffix}.jsonl"
@@ -177,6 +199,7 @@ class MattermostMirror:
return
username = user.get("username") or user.get("nickname") or user.get("first_name") or user_id
self.users[user_id] = username
self._write_refs()
def _remember_channel(self, channel: dict[str, Any]) -> None:
channel_id = channel.get("id")
@@ -184,6 +207,13 @@ class MattermostMirror:
return
name = channel.get("name") or channel.get("display_name") or channel_id
self.channels[channel_id] = name
self._write_refs()
def _write_refs(self) -> None:
users_path = self.refs_dir / "users.json"
channels_path = self.refs_dir / "channels.json"
self._atomic_write_text(users_path, json.dumps(self.users, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
self._atomic_write_text(channels_path, json.dumps(self.channels, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
def _ingest_reference_payload(self, payload: Any) -> None:
if isinstance(payload, list):
@@ -250,14 +280,24 @@ class MattermostMirror:
def _write_message(self, msg: dict[str, Any]) -> None:
post_id = msg["post_id"]
created_dt = self._dt_from_ms(msg.get("created_at_ms"))
daily_path = self._daily_messages_path(created_dt)
daily_seen = self._load_seen_ids_for_file(daily_path)
if post_id in self.seen_post_ids or post_id in daily_seen:
channel_path = self._daily_channel_path(created_dt, msg.get("channel_name"), msg.get("channel_id"))
by_date_path = self._daily_by_date_path(created_dt)
thread_path = self._thread_path(msg.get("thread_id"))
channel_seen = self._load_seen_ids_for_file(channel_path)
by_date_seen = self._load_seen_ids_for_file(by_date_path)
if post_id in self.seen_post_ids or post_id in channel_seen or post_id in by_date_seen:
return
self.seen_post_ids.add(post_id)
daily_seen.add(post_id)
self._append_jsonl(daily_path, msg)
channel_seen.add(post_id)
by_date_seen.add(post_id)
self._append_jsonl(channel_path, msg)
self._append_jsonl(by_date_path, msg)
if thread_path:
thread_seen = self._load_seen_ids_for_file(thread_path)
if post_id not in thread_seen:
thread_seen.add(post_id)
self._append_jsonl(thread_path, msg)
self._update_state(msg)
self._update_latest(msg)
self._update_index(created_dt, msg)
@@ -273,6 +313,7 @@ class MattermostMirror:
self.state["users"] = self.users
self.state["updated_at"] = datetime.now(timezone.utc).isoformat()
self._atomic_write_text(self.state_path, json.dumps(self.state, ensure_ascii=False, indent=2, sort_keys=True) + "\n")
self._write_refs()
def _read_jsonl(self, path: Path) -> list[dict[str, Any]]:
if not path.exists():
@@ -320,17 +361,27 @@ class MattermostMirror:
except Exception:
pass
date_key = f"{dt:%Y-%m-%d}"
rel_path = str(self._daily_messages_path(dt).relative_to(self.out_dir))
channel_path = self._daily_channel_path(dt, msg.get("channel_name"), msg.get("channel_id"))
by_date_path = self._daily_by_date_path(dt)
thread_path = self._thread_path(msg.get("thread_id"))
channel_rel_path = str(channel_path.relative_to(self.out_dir))
by_date_rel_path = str(by_date_path.relative_to(self.out_dir))
dates = set(index.get("dates") or [])
dates.add(date_key)
index["dates"] = sorted(dates)
by_date = index.setdefault("by_date", {})
by_date[date_key] = by_date_rel_path
channel_key = msg.get("channel_name") or msg.get("channel_id") or "unknown-channel"
channels = index.setdefault("channels", {})
channel_entry = channels.setdefault(channel_key, {"files": []})
channel_entry = channels.setdefault(channel_key, {"channel_id": msg.get("channel_id"), "files": []})
channel_entry["channel_id"] = msg.get("channel_id")
files = set(channel_entry.get("files") or [])
files.add(rel_path)
files.add(channel_rel_path)
channel_entry["files"] = sorted(files)
if thread_path:
threads = index.setdefault("threads", {})
threads[msg.get("thread_id")] = str(thread_path.relative_to(self.out_dir))
index["updated_at"] = datetime.now(timezone.utc).isoformat()
self._atomic_write_text(self.index_path, json.dumps(index, ensure_ascii=False, indent=2, sort_keys=True) + "\n")