#!/usr/bin/env python3 import argparse import json import re import sys from collections import Counter from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence JIRA_RE = re.compile(r"\b[A-Z][A-Z0-9]+-\d+\b") HIGH_SIGNAL_PATTERNS = [ re.compile(pattern, re.IGNORECASE) for pattern in [ r"\bdecision\b", r"\broot cause\b", r"\bapproved?\b", r"\bpoints?\b", r"\bepic\b", r"\bregression\b", r"\bauth(?:enticated)?\b", r"\breproduc(?:e|ible|ibility)\b", r"\bgraphql\b", r"\bapollo\b", r"\brest\b", r"\bxflow\b", r"\bfid4\b", r"\bfeature flag\b", r"\btitle\b", r"\bscope\b", r"\bowner(?:ship)?\b", r"\brollout\b", r"\bmigration\b", r"\bdependency\b", r"\bcontract\b", r"\blifecycle\b", r"\bswiftui\b", r"\bbug\b", r"\bissue\b", r"\bincident\b", r"\bfix(?:ed)?\b", r"\bvalidation\b", r"\bdob\b", r"\bteenidentitycheck\b", r"\bdone\b", r"\bin progress\b", r"\bblocked?\b", ] ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Import a Slack export into workspace-friendly JSONL and summary artifacts." ) parser.add_argument("--export-path", required=True, help="Path to the Slack export root.") parser.add_argument( "--channels", default="", help="Comma-separated channel names to import. When omitted, auto-detect channels by prefix.", ) parser.add_argument( "--channel-prefix", default="fidelity", help="Default channel prefix to auto-detect when --channels is omitted.", ) parser.add_argument( "--all-channels", action="store_true", help="Import every channel folder instead of using prefix-based auto-detection.", ) parser.add_argument( "--since", default="", help="Optional lower bound date in YYYY-MM-DD.", ) parser.add_argument( "--until", default="", help="Optional upper bound date in YYYY-MM-DD.", ) parser.add_argument( "--max-messages", type=int, default=0, help="Maximum number of messages to emit after filtering. 0 means auto-tune based on export size.", ) parser.add_argument( "--recent-days", type=int, default=180, help="Recent window used for prioritizing current relevance when selecting from very large archives.", ) parser.add_argument( "--output-dir", default="scripts/slack/generated", help="Directory where generated artifacts will be written.", ) return parser.parse_args() def parse_date(raw: str) -> Optional[datetime]: if not raw: return None return datetime.strptime(raw, "%Y-%m-%d") def load_users(export_root: Path) -> Dict[str, str]: users_path = export_root / "users.json" if not users_path.exists(): return {} try: payload = json.loads(users_path.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} user_map: Dict[str, str] = {} for user in payload: user_id = user.get("id", "") profile = user.get("profile", {}) or {} name = ( user.get("name") or profile.get("display_name") or profile.get("real_name") or user_id ) if user_id: user_map[user_id] = name return user_map def available_channels(export_root: Path) -> List[str]: return sorted(path.name for path in export_root.iterdir() if path.is_dir()) def resolve_channels(export_root: Path, channels: List[str], channel_prefix: str, all_channels: bool) -> List[str]: if channels: return channels available = available_channels(export_root) if all_channels: return available prefix = channel_prefix.strip().lstrip("#") if prefix: matched = [name for name in available if name.lower().startswith(prefix.lower())] if matched: return matched return available def iter_channel_files(export_root: Path, channels: Sequence[str]) -> Iterable[tuple[str, Path]]: if channels: candidates = [export_root / name for name in channels] else: candidates = [path for path in export_root.iterdir() if path.is_dir()] for channel_dir in sorted(candidates): if not channel_dir.is_dir(): continue for day_file in sorted(channel_dir.glob("*.json")): yield channel_dir.name, day_file def date_in_range(day_file: Path, since: Optional[datetime], until: Optional[datetime]) -> bool: try: file_day = datetime.strptime(day_file.stem, "%Y-%m-%d") except ValueError: return False if since and file_day < since: return False if until and file_day > until: return False return True def resolve_username(message: Dict[str, Any], user_map: Dict[str, str]) -> str: user_id = message.get("user", "") if user_id and user_id in user_map: return user_map[user_id] if message.get("username"): return str(message["username"]) return user_id or "unknown" def message_timestamp(ts_value: str) -> str: try: ts_float = float(ts_value) except (TypeError, ValueError): return "" return datetime.fromtimestamp(ts_float).astimezone().isoformat() def message_datetime(ts_value: str) -> Optional[datetime]: try: ts_float = float(ts_value) except (TypeError, ValueError): return None return datetime.fromtimestamp(ts_float).astimezone() def normalize_message(channel: str, raw: Dict[str, Any], user_map: Dict[str, str]) -> Optional[Dict[str, Any]]: text = (raw.get("text") or "").strip() if not text: return None subtype = raw.get("subtype", "") record: Dict[str, Any] = { "source": "slack", "channel": channel, "timestamp": message_timestamp(str(raw.get("ts", ""))), "username": resolve_username(raw, user_map), "message": text, "type": subtype or "message", "thread_ts": raw.get("thread_ts") or None, } if raw.get("reply_count") is not None: record["reply_count"] = raw.get("reply_count") if raw.get("ts"): record["message_id"] = str(raw["ts"]) return record def auto_max_messages(total_messages: int) -> int: if total_messages <= 1500: return total_messages if total_messages <= 8000: return 2500 if total_messages <= 25000: return 4000 if total_messages <= 80000: return 6500 return 8000 def score_message(item: Dict[str, Any], recent_cutoff: Optional[datetime]) -> int: score = 0 text = item.get("message", "") username = item.get("username", "") timestamp = item.get("timestamp", "") jira_matches = JIRA_RE.findall(text) score += len(jira_matches) * 8 for pattern in HIGH_SIGNAL_PATTERNS: if pattern.search(text): score += 4 if item.get("thread_ts"): score += 1 if item.get("reply_count"): score += 1 if username and username != "unknown": score += 1 try: message_dt = datetime.fromisoformat(timestamp) if timestamp else None except ValueError: message_dt = None if recent_cutoff and message_dt and message_dt >= recent_cutoff: score += 6 if len(text) >= 120: score += 2 return score def message_key(item: Dict[str, Any]) -> str: return item.get("message_id") or "|".join( [ item.get("channel", ""), item.get("timestamp", ""), item.get("username", ""), item.get("message", ""), ] ) def message_year(item: Dict[str, Any]) -> str: timestamp = item.get("timestamp", "") try: return str(datetime.fromisoformat(timestamp).year) except ValueError: return "unknown" def add_unique_items( selected: List[Dict[str, Any]], seen: set[str], candidates: Sequence[Dict[str, Any]], limit: int, ) -> None: if limit <= 0: return for item in candidates: if len(selected) >= limit: return key = message_key(item) if key in seen: continue seen.add(key) selected.append(item) def select_messages(messages: List[Dict[str, Any]], max_messages: int, recent_days: int) -> List[Dict[str, Any]]: if len(messages) <= max_messages: return messages recent_cutoff = datetime.now().astimezone() - timedelta(days=recent_days) scored_messages: List[Dict[str, Any]] = [] recent_messages: List[Dict[str, Any]] = [] channel_year_buckets: Dict[tuple[str, str], List[Dict[str, Any]]] = {} jira_buckets: Dict[str, List[Dict[str, Any]]] = {} for item in messages: item["_score"] = score_message(item, recent_cutoff) scored_messages.append(item) if item["_score"] > 0: bucket_key = (item.get("channel", ""), message_year(item)) channel_year_buckets.setdefault(bucket_key, []).append(item) jira_ids = JIRA_RE.findall(item.get("message", "")) for jira_id in jira_ids: jira_buckets.setdefault(jira_id, []).append(item) try: message_dt = datetime.fromisoformat(item.get("timestamp", "")) except ValueError: message_dt = None if message_dt and message_dt >= recent_cutoff: recent_messages.append(item) selected: List[Dict[str, Any]] = [] seen: set[str] = set() recent_budget = min(len(recent_messages), max(max_messages // 5, 300)) add_unique_items(selected, seen, recent_messages[-recent_budget:], recent_budget) channel_year_keys = sorted(channel_year_buckets.keys(), key=lambda key: (key[1], key[0])) remaining_after_recent = max_messages - len(selected) coverage_budget = min( remaining_after_recent, max(remaining_after_recent // 3, min(len(channel_year_keys) * 2, remaining_after_recent)), ) if channel_year_keys and coverage_budget > 0: per_bucket = max(1, min(4, coverage_budget // len(channel_year_keys) or 1)) for bucket_key in channel_year_keys: bucket_items = sorted( channel_year_buckets[bucket_key], key=lambda item: (item.get("_score", 0), item.get("timestamp", "")), reverse=True, ) add_unique_items(selected, seen, bucket_items, min(max_messages, len(selected) + per_bucket)) remaining_after_coverage = max_messages - len(selected) jira_ranked = sorted( jira_buckets.items(), key=lambda pair: ( max(item.get("_score", 0) for item in pair[1]), len(pair[1]), pair[0], ), reverse=True, ) jira_budget = min(remaining_after_coverage, max(remaining_after_coverage // 4, 150)) if jira_budget > 0: for _, bucket_items in jira_ranked: ranked_items = sorted( bucket_items, key=lambda item: (item.get("_score", 0), item.get("timestamp", "")), reverse=True, ) add_unique_items(selected, seen, ranked_items, min(max_messages, len(selected) + 2)) if len(selected) >= recent_budget + coverage_budget + jira_budget: break ranked_messages = sorted( scored_messages, key=lambda item: (item.get("_score", 0), item.get("timestamp", "")), reverse=True, ) add_unique_items(selected, seen, ranked_messages, max_messages) for item in selected: item.pop("_score", None) selected.sort(key=lambda item: item.get("timestamp", "")) return selected def collect_messages( export_root: Path, channels: Sequence[str], since: Optional[datetime], until: Optional[datetime], max_messages: int, recent_days: int, ) -> List[Dict[str, Any]]: user_map = load_users(export_root) messages: List[Dict[str, Any]] = [] for channel, day_file in iter_channel_files(export_root, channels): if not date_in_range(day_file, since, until): continue try: payload = json.loads(day_file.read_text(encoding="utf-8")) except json.JSONDecodeError: continue if not isinstance(payload, list): continue for raw in payload: record = normalize_message(channel, raw, user_map) if record: messages.append(record) messages.sort(key=lambda item: item.get("timestamp", "")) tuned_max = max_messages or auto_max_messages(len(messages)) return select_messages(messages, tuned_max, recent_days) def write_jsonl(messages: List[Dict[str, Any]], path: Path) -> None: lines = [json.dumps(item, ensure_ascii=False) for item in messages] path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8") def build_summary(messages: List[Dict[str, Any]], channels: Sequence[str]) -> str: channel_counter = Counter(item["channel"] for item in messages) user_counter = Counter(item["username"] for item in messages) jira_counter = Counter() year_counter = Counter() for item in messages: jira_counter.update(JIRA_RE.findall(item.get("message", ""))) year_counter.update([message_year(item)]) first_timestamp = messages[0]["timestamp"] if messages else "n/a" last_timestamp = messages[-1]["timestamp"] if messages else "n/a" lines = [ "# Slack Import Summary", "", f"- Messages imported: {len(messages)}", f"- Channels imported: {', '.join(channels) if channels else 'all detected channels'}", f"- Time span covered: {first_timestamp} -> {last_timestamp}", "", "## Top Channels", ] for channel, count in channel_counter.most_common(10): lines.append(f"- {channel}: {count}") lines.extend(["", "## Top Participants"]) for username, count in user_counter.most_common(10): lines.append(f"- {username}: {count}") lines.extend(["", "## Jira IDs Mentioned"]) if jira_counter: for jira_id, count in jira_counter.most_common(20): lines.append(f"- {jira_id}: {count}") else: lines.append("- None detected") lines.extend(["", "## Historical Coverage"]) for year, count in year_counter.most_common(): lines.append(f"- {year}: {count}") lines.extend( [ "", "## Guidance", "- Treat this archive as historical context, not current truth.", "- The importer preserves recent context and older high-signal evidence across channels and years.", "- Prefer promoting durable patterns, repeated approvals, role mappings, Jira references, and architectural context.", "- Avoid promoting outdated status unless it still affects current understanding.", ] ) return "\n".join(lines) + "\n" def main() -> int: args = parse_args() export_root = Path(args.export_path).expanduser().resolve() if not export_root.exists(): print(f"Export path not found: {export_root}", file=sys.stderr) return 1 channels = [item.strip().lstrip("#") for item in args.channels.split(",") if item.strip()] channels = resolve_channels(export_root, channels, args.channel_prefix, args.all_channels) since = parse_date(args.since) until = parse_date(args.until) messages = collect_messages( export_root=export_root, channels=channels, since=since, until=until, max_messages=args.max_messages, recent_days=args.recent_days, ) output_dir = Path(args.output_dir).expanduser().resolve() jsonl_path = output_dir / "slack_context.jsonl" summary_path = output_dir / "slack_summary.md" write_jsonl(messages, jsonl_path) summary_path.write_text(build_summary(messages, channels), encoding="utf-8") print(f"Imported {len(messages)} Slack messages") print(f"Wrote JSONL: {jsonl_path}") print(f"Wrote summary: {summary_path}") return 0 if __name__ == "__main__": raise SystemExit(main())