#!/usr/bin/env python3 import json import logging import os import re import ssl import sys from argparse import ArgumentParser, Namespace from datetime import date, datetime, time, timedelta from pathlib import Path from typing import Any, Dict, List from urllib import error, parse, request SCRIPT_DIR = Path(__file__).resolve().parent DEFAULT_WINDOW_HOURS = 24 DEFAULT_MAX_MESSAGES = 200 MAX_PER_PAGE = 200 DEFAULT_OUTPUT_FILE = str(SCRIPT_DIR / "generated" / "mattermost_context.jsonl") REQUEST_TIMEOUT = 15 DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") LOGGER = logging.getLogger("mattermost_context") USER_CACHE: Dict[str, Dict[str, Any]] = {} CHANNEL_CACHE: Dict[str, Dict[str, Any]] = {} TEAM_CACHE: List[Dict[str, Any]] | None = None MATTERMOST_URL = "" CHANNEL_SPECS: List[Dict[str, str]] = [] WINDOW_HOURS = DEFAULT_WINDOW_HOURS MAX_MESSAGES = DEFAULT_MAX_MESSAGES CUTOFF_TIMESTAMP_MS = 0 RANGE_START_TIMESTAMP_MS = 0 RANGE_END_TIMESTAMP_MS = 0 OUTPUT_FILE = DEFAULT_OUTPUT_FILE REQUEST_HEADERS: Dict[str, str] = {} SSL_CONTEXT: ssl.SSLContext | None = None MATTERMOST_TEAM_NAME = "" MATTERMOST_TEAM_ID = "" class MattermostAPIError(RuntimeError): pass def parse_args() -> Namespace: parser = ArgumentParser(description="Extract Mattermost messages as JSONL context.") parser.add_argument( "--previous-workday", action="store_true", help="Fetch the latest prior calendar day with Mattermost activity instead of a fixed recent window.", ) parser.add_argument( "--today", default=date.today().isoformat(), help="Reference date in YYYY-MM-DD format. Defaults to today.", ) parser.add_argument( "--max-lookback-days", type=int, default=int(os.getenv("MATTERMOST_MAX_LOOKBACK_DAYS", "7")), help="Maximum days to search backward with --previous-workday.", ) parser.add_argument( "--window-hours", type=int, default=0, help="Override MESSAGE_WINDOW_HOURS for normal recent-window mode.", ) parser.add_argument( "--output-file", default="", help="Override MATTERMOST_OUTPUT_FILE.", ) return parser.parse_args() def parse_iso_date(raw_value: str) -> date: if not DATE_RE.match(raw_value): raise ValueError(f"Invalid date '{raw_value}'. Use YYYY-MM-DD.") return datetime.strptime(raw_value, "%Y-%m-%d").date() def parse_bool_env(name: str, default: bool = False) -> bool: raw_value = os.getenv(name) if raw_value is None: return default return raw_value.strip().lower() in {"1", "true", "yes", "on"} def load_dotenv_file(path: Path | None = None) -> None: dotenv_path = path or (SCRIPT_DIR / ".env") if not dotenv_path.exists(): return with dotenv_path.open("r", encoding="utf-8") as file_handle: for raw_line in file_handle: line = raw_line.strip() if not line or line.startswith("#"): continue if line.startswith("export "): line = line[len("export ") :].strip() if "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip() if not key: continue if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: value = value[1:-1] os.environ.setdefault(key, value) def require_env(name: str) -> str: value = os.getenv(name, "").strip() if not value: raise ValueError(f"Missing required environment variable: {name}") return value def parse_channel_ids(raw_value: str) -> List[str]: normalized = raw_value.replace("\n", ",") channel_ids = [item.strip() for item in normalized.split(",") if item.strip()] if not channel_ids: raise ValueError("CHANNEL_IDS must contain at least one channel id.") return channel_ids def looks_like_channel_id(value: str) -> bool: return len(value) == 26 and value.isalnum() def parse_channel_specs() -> List[Dict[str, str]]: raw_channels = os.getenv("CHANNELS", "").strip() raw_channel_names = os.getenv("CHANNEL_NAMES", "").strip() raw_channel_ids = os.getenv("CHANNEL_IDS", "").strip() entries: List[str] = [] if raw_channels: entries.extend(parse_channel_ids(raw_channels)) if raw_channel_names: entries.extend(parse_channel_ids(raw_channel_names)) if raw_channel_ids: entries.extend(parse_channel_ids(raw_channel_ids)) if not entries: raise ValueError("Configure at least one of CHANNELS, CHANNEL_NAMES, or CHANNEL_IDS.") specs: List[Dict[str, str]] = [] for entry in entries: if looks_like_channel_id(entry): specs.append({"kind": "id", "value": entry}) else: specs.append({"kind": "name", "value": entry.lstrip("#")}) return specs def build_ssl_context() -> ssl.SSLContext: ca_bundle = os.getenv("MATTERMOST_CA_BUNDLE", "").strip() skip_tls_verify = parse_bool_env("MATTERMOST_SKIP_TLS_VERIFY", default=False) if skip_tls_verify: LOGGER.warning("TLS certificate verification is disabled via MATTERMOST_SKIP_TLS_VERIFY.") return ssl._create_unverified_context() if ca_bundle: LOGGER.info("Using custom CA bundle from MATTERMOST_CA_BUNDLE: %s", ca_bundle) return ssl.create_default_context(cafile=ca_bundle) return ssl.create_default_context() def configure(args: Namespace) -> None: global MATTERMOST_URL, CHANNEL_SPECS, WINDOW_HOURS, MAX_MESSAGES, CUTOFF_TIMESTAMP_MS, OUTPUT_FILE global RANGE_START_TIMESTAMP_MS, RANGE_END_TIMESTAMP_MS, REQUEST_HEADERS, SSL_CONTEXT global MATTERMOST_TEAM_NAME, MATTERMOST_TEAM_ID load_dotenv_file() MATTERMOST_URL = require_env("MATTERMOST_URL").rstrip("/") token = require_env("MATTERMOST_TOKEN") CHANNEL_SPECS = parse_channel_specs() WINDOW_HOURS = args.window_hours or int(os.getenv("MESSAGE_WINDOW_HOURS", str(DEFAULT_WINDOW_HOURS))) MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", str(DEFAULT_MAX_MESSAGES))) OUTPUT_FILE = args.output_file or os.getenv("MATTERMOST_OUTPUT_FILE", DEFAULT_OUTPUT_FILE).strip() or DEFAULT_OUTPUT_FILE MATTERMOST_TEAM_NAME = os.getenv("MATTERMOST_TEAM_NAME", "").strip() MATTERMOST_TEAM_ID = os.getenv("MATTERMOST_TEAM_ID", "").strip() if WINDOW_HOURS <= 0: raise ValueError("MESSAGE_WINDOW_HOURS must be greater than 0.") if MAX_MESSAGES <= 0: raise ValueError("MAX_MESSAGES must be greater than 0.") cutoff = datetime.now().astimezone() - timedelta(hours=WINDOW_HOURS) CUTOFF_TIMESTAMP_MS = int(cutoff.timestamp() * 1000) RANGE_START_TIMESTAMP_MS = 0 RANGE_END_TIMESTAMP_MS = 0 REQUEST_HEADERS = { "Authorization": f"Bearer {token}", "Accept": "application/json", "Content-Type": "application/json", } SSL_CONTEXT = build_ssl_context() def api_get_json(api_path: str, params: Dict[str, Any] | None = None) -> Dict[str, Any]: query = f"?{parse.urlencode(params)}" if params else "" url = f"{MATTERMOST_URL}{api_path}{query}" req = request.Request(url, headers=REQUEST_HEADERS, method="GET") try: with request.urlopen(req, timeout=REQUEST_TIMEOUT, context=SSL_CONTEXT) as response: charset = response.headers.get_content_charset() or "utf-8" payload = response.read().decode(charset) except error.HTTPError as exc: body = "" try: body = exc.read().decode("utf-8", errors="replace") except Exception: body = "" raise MattermostAPIError(f"HTTP {exc.code} for {api_path}: {body or exc.reason}") from exc except error.URLError as exc: reason = exc.reason if hasattr(exc, "reason") else str(exc) raise MattermostAPIError(f"Request failed for {api_path}: {reason}") from exc try: return json.loads(payload) except json.JSONDecodeError as exc: raise MattermostAPIError(f"Invalid JSON returned by {api_path}") from exc def get_channel_posts(channel_id: str) -> List[Dict[str, Any]]: collected: List[Dict[str, Any]] = [] page = 0 per_page = min(MAX_PER_PAGE, MAX_MESSAGES) start_timestamp_ms = RANGE_START_TIMESTAMP_MS or CUTOFF_TIMESTAMP_MS end_timestamp_ms = RANGE_END_TIMESTAMP_MS while len(collected) < MAX_MESSAGES: payload = api_get_json( f"/api/v4/channels/{channel_id}/posts", {"page": page, "per_page": per_page}, ) order = payload.get("order", []) posts_by_id = payload.get("posts", {}) if not order: break reached_cutoff = False for post_id in order: post = posts_by_id.get(post_id) if not post: continue created_at = int(post.get("create_at", 0)) if created_at < start_timestamp_ms: reached_cutoff = True continue if end_timestamp_ms and created_at >= end_timestamp_ms: continue collected.append(post) if len(collected) >= MAX_MESSAGES: break if reached_cutoff or len(order) < per_page: break page += 1 LOGGER.info("Fetched %s raw posts from channel %s", len(collected), channel_id) return collected def get_channel_by_id(channel_id: str) -> Dict[str, Any]: cache_key = f"id:{channel_id}" if cache_key in CHANNEL_CACHE: return CHANNEL_CACHE[cache_key] channel_data = api_get_json(f"/api/v4/channels/{channel_id}") CHANNEL_CACHE[cache_key] = channel_data return channel_data def get_channel_by_name(channel_name: str) -> Dict[str, Any]: cache_key = f"name:{channel_name}" if cache_key in CHANNEL_CACHE: return CHANNEL_CACHE[cache_key] channel_data: Dict[str, Any] if MATTERMOST_TEAM_ID or MATTERMOST_TEAM_NAME: channel_data = get_channel_by_name_for_team(channel_name) else: channel_data = find_channel_across_user_teams(channel_name) CHANNEL_CACHE[cache_key] = channel_data CHANNEL_CACHE[f"id:{channel_data.get('id', '')}"] = channel_data return channel_data def get_channel_by_name_for_team(channel_name: str) -> Dict[str, Any]: if MATTERMOST_TEAM_ID: api_path = f"/api/v4/teams/{MATTERMOST_TEAM_ID}/channels/name/{parse.quote(channel_name, safe='')}" else: api_path = f"/api/v4/teams/name/{parse.quote(MATTERMOST_TEAM_NAME, safe='')}/channels/name/{parse.quote(channel_name, safe='')}" return api_get_json(api_path) def get_user_teams() -> List[Dict[str, Any]]: global TEAM_CACHE if TEAM_CACHE is not None: return TEAM_CACHE teams = api_get_json("/api/v4/users/me/teams") if not isinstance(teams, list): raise MattermostAPIError("Unexpected response while listing user teams.") TEAM_CACHE = teams return TEAM_CACHE def get_user_channels_for_team(team_id: str) -> List[Dict[str, Any]]: channels = api_get_json(f"/api/v4/users/me/teams/{team_id}/channels") if not isinstance(channels, list): raise MattermostAPIError(f"Unexpected response while listing channels for team {team_id}.") return channels def find_channel_across_user_teams(channel_name: str) -> Dict[str, Any]: matches: List[Dict[str, Any]] = [] for team in get_user_teams(): team_id = team.get("id", "") team_name = team.get("name", "") if not team_id: continue try: channels = get_user_channels_for_team(team_id) except MattermostAPIError as exc: LOGGER.warning("Could not list channels for team %s: %s", team_name or team_id, exc) continue for channel in channels: if channel.get("name") == channel_name: channel = dict(channel) channel["_resolved_team_name"] = team_name channel["_resolved_team_id"] = team_id matches.append(channel) if not matches: raise MattermostAPIError( f"Unable to find channel named '{channel_name}' in the current user's accessible teams." ) if len(matches) > 1: teams = ", ".join(sorted({match.get("_resolved_team_name", match.get("_resolved_team_id", "unknown")) for match in matches})) raise MattermostAPIError( f"Channel name '{channel_name}' is ambiguous across teams: {teams}. Set MATTERMOST_TEAM_NAME or MATTERMOST_TEAM_ID." ) return matches[0] def resolve_channels() -> List[Dict[str, Any]]: resolved: List[Dict[str, Any]] = [] for spec in CHANNEL_SPECS: if spec["kind"] == "id": channel_data = get_channel_by_id(spec["value"]) else: channel_data = get_channel_by_name(spec["value"]) resolved.append(channel_data) return resolved def get_user_info(user_id: str) -> Dict[str, Any]: if not user_id: return {"id": "", "username": "unknown"} if user_id in USER_CACHE: return USER_CACHE[user_id] try: user_data = api_get_json(f"/api/v4/users/{user_id}") except MattermostAPIError as exc: LOGGER.error("Could not fetch user %s: %s", user_id, exc) fallback = {"id": user_id, "username": user_id} USER_CACHE[user_id] = fallback return fallback USER_CACHE[user_id] = user_data return user_data def build_user_map(messages: List[Dict[str, Any]]) -> Dict[str, str]: user_map: Dict[str, str] = {} user_ids = sorted({message["user_id"] for message in messages if message.get("user_id")}) for user_id in user_ids: user_info = get_user_info(user_id) username = ( user_info.get("username") or user_info.get("nickname") or user_info.get("first_name") or user_id ) user_map[user_id] = username return user_map def is_system_message(post: Dict[str, Any]) -> bool: post_type = (post.get("type") or "").strip() return post_type.startswith("system_") def extract_messages(resolved_channels: List[Dict[str, Any]] | None = None) -> List[Dict[str, Any]]: all_messages: List[Dict[str, Any]] = [] for channel in resolved_channels or resolve_channels(): channel_id = channel.get("id", "") channel_name = channel.get("name", "") or channel_id channel_display_name = channel.get("display_name", "") or channel_name raw_posts = get_channel_posts(channel_id) for post in raw_posts: if is_system_message(post): continue message = (post.get("message") or "").strip() if not message: continue all_messages.append( { "channel_id": channel_id, "channel_ref": channel_name, "channel_name": channel_name, "channel_display_name": channel_display_name, "post_id": post.get("id", ""), "user_id": post.get("user_id", ""), "create_at": int(post.get("create_at", 0)), "message": message.replace("\r\n", "\n"), "root_id": post.get("root_id", ""), "reply_count": int(post.get("reply_count", 0)), } ) all_messages.sort(key=lambda item: item["create_at"]) if len(all_messages) > MAX_MESSAGES: all_messages = all_messages[-MAX_MESSAGES:] user_map = build_user_map(all_messages) for message in all_messages: message["username"] = user_map.get(message["user_id"], message["user_id"] or "unknown") LOGGER.info("Prepared %s messages after filtering", len(all_messages)) return all_messages def day_range_ms(day: date) -> tuple[int, int]: start = datetime.combine(day, time.min).astimezone() end = start + timedelta(days=1) return int(start.timestamp() * 1000), int(end.timestamp() * 1000) def set_fetch_range(start_ms: int, end_ms: int) -> None: global RANGE_START_TIMESTAMP_MS, RANGE_END_TIMESTAMP_MS RANGE_START_TIMESTAMP_MS = start_ms RANGE_END_TIMESTAMP_MS = end_ms def extract_previous_workday_messages(args: Namespace) -> tuple[List[Dict[str, Any]], date | None, int]: today = parse_iso_date(args.today) resolved_channels = resolve_channels() max_lookback_days = args.max_lookback_days if max_lookback_days <= 0: raise ValueError("--max-lookback-days must be greater than 0.") for skipped_days in range(max_lookback_days): candidate_day = today - timedelta(days=skipped_days + 1) start_ms, end_ms = day_range_ms(candidate_day) set_fetch_range(start_ms, end_ms) messages = extract_messages(resolved_channels) if messages: return messages, candidate_day, skipped_days LOGGER.info("No messages found for %s; expanding lookback.", candidate_day.isoformat()) return [], None, max_lookback_days def format_messages(messages: List[Dict[str, Any]]) -> str: lines: List[str] = [] for message in messages: timestamp = datetime.fromtimestamp(message["create_at"] / 1000).astimezone() username = message.get("username", "unknown") post_id = message.get("post_id", "") root_id = message.get("root_id", "") thread_id = root_id or post_id or "unknown" reply_count = int(message.get("reply_count", 0)) if root_id: message_kind = "thread_reply" elif reply_count > 0: message_kind = "thread_root" else: message_kind = "channel_post" channel_ref = message.get("channel_ref", message.get("channel_id", "unknown")) record = { "source": "mattermost", "channel": channel_ref, "channel_id": message.get("channel_id", ""), "channel_display_name": message.get("channel_display_name", channel_ref), "post_id": post_id, "thread_id": thread_id, "root_id": root_id or None, "type": message_kind, "timestamp": timestamp.isoformat(), "username": username, "message": message["message"], } if root_id: record["reply_to"] = root_id if reply_count > 0: record["reply_count"] = reply_count lines.append(json.dumps(record, ensure_ascii=False, sort_keys=False)) return "\n".join(lines) def save_to_file(text: str) -> None: output_path = Path(OUTPUT_FILE).expanduser().resolve() output_path.parent.mkdir(parents=True, exist_ok=True) if text and not text.endswith("\n"): text = f"{text}\n" output_path.write_text(text, encoding="utf-8") def main() -> int: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") try: args = parse_args() configure(args) if args.previous_workday: messages, selected_day, skipped_days = extract_previous_workday_messages(args) if selected_day: LOGGER.info( "Selected previous workday %s after skipping %s inactive calendar day(s).", selected_day.isoformat(), skipped_days, ) else: LOGGER.info("No previous workday messages found within %s day(s).", skipped_days) else: messages = extract_messages() output = format_messages(messages) print(output) save_to_file(output) LOGGER.info("Saved context to %s", OUTPUT_FILE) except ValueError as exc: LOGGER.error("%s", exc) return 1 except MattermostAPIError as exc: LOGGER.error("%s", exc) return 1 except Exception as exc: LOGGER.exception("Unexpected error: %s", exc) return 1 return 0 if __name__ == "__main__": sys.exit(main())