#!/usr/bin/env python3 """Read local Mattermost mirror evidence for agent prompts. The proxy mirror under ai/inbox/mattermost-mirror is the preferred Mattermost evidence source when present. This reader gives commands a stable way to load a small focused view and fall back to the older sync artifacts when the mirror is not available. """ from __future__ import annotations import argparse import json from datetime import date, datetime, timedelta from pathlib import Path ROOT = Path(__file__).resolve().parents[2] MIRROR_DIR = ROOT / "ai" / "inbox" / "mattermost-mirror" LEGACY_LATEST = ROOT / "ai" / "inbox" / "mattermost-latest.md" LEGACY_GENERATED = ROOT / "scripts" / "mattermost" / "generated" / "mattermost_context.jsonl" def previous_workday(today: date) -> date: day = today - timedelta(days=1) while day.weekday() >= 5: day -= timedelta(days=1) return day def daily_by_date_path(day: date) -> Path: return MIRROR_DIR / "by-date" / f"{day:%Y}" / f"{day:%m}" / f"{day:%Y-%m-%d}.jsonl" def print_file(path: Path) -> bool: if path.is_file() and path.stat().st_size > 0: print(path.read_text(encoding="utf-8")) return True return False def read_jsonl(path: Path) -> list[dict]: records = [] if not path.is_file(): return records for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: continue return records def print_jsonl(records: list[dict]) -> bool: if not records: return False for record in records: print(json.dumps(record, ensure_ascii=False, sort_keys=True)) return True def fallback() -> None: if print_file(LEGACY_LATEST): return if print_file(LEGACY_GENERATED): return print("No Mattermost context available.") def mode_latest() -> None: print("## Mattermost mirror latest context") if print_file(MIRROR_DIR / "latest.md"): return records = read_jsonl(MIRROR_DIR / "latest.jsonl") if print_jsonl(records): return print("No proxy mirror latest context available; falling back to legacy sync artifacts.") fallback() def mode_previous_workday(today_raw: str | None) -> None: today = datetime.strptime(today_raw, "%Y-%m-%d").date() if today_raw else date.today() day = previous_workday(today) path = daily_by_date_path(day) print(f"## Mattermost mirror previous-workday context ({day.isoformat()})") records = read_jsonl(path) if print_jsonl(records): return print("No proxy mirror previous-workday context available; falling back to latest context.") mode_latest() def mode_standup(today_raw: str | None) -> None: mode_previous_workday(today_raw) print("\n## Mattermost mirror latest context") latest_md = MIRROR_DIR / "latest.md" if latest_md.is_file() and latest_md.stat().st_size > 0: print(latest_md.read_text(encoding="utf-8")) def mode_focused() -> None: records = read_jsonl(MIRROR_DIR / "latest.jsonl") if not records: records = read_jsonl(LEGACY_GENERATED) if not records: print("No Mattermost context available.") return manager_names = {"jeff", "jeff.dewitte"} manager_records = [record for record in records if str(record.get("username", "")).lower() in manager_names] focused = manager_records[-10:] if manager_records else records[-15:] print_jsonl(focused) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--mode", choices=["latest", "previous-workday", "standup", "focused"], default="latest") parser.add_argument("--today", default="") args = parser.parse_args() if args.mode == "latest": mode_latest() elif args.mode == "previous-workday": mode_previous_workday(args.today or None) elif args.mode == "standup": mode_standup(args.today or None) elif args.mode == "focused": mode_focused() if __name__ == "__main__": main()