feat: Implement Slack export importer and enhance README documentation for historical context recovery

This commit is contained in:
2026-04-09 16:01:30 -06:00
parent 332fbca1c5
commit dab9dabd92
7 changed files with 584 additions and 0 deletions

View File

@@ -0,0 +1,411 @@
#!/usr/bin/env python3
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence
JIRA_RE = re.compile(r"\b[A-Z][A-Z0-9]+-\d+\b")
HIGH_SIGNAL_PATTERNS = [
re.compile(pattern, re.IGNORECASE)
for pattern in [
r"\broot cause\b",
r"\bapproved?\b",
r"\bpoints?\b",
r"\bepic\b",
r"\bregression\b",
r"\bauth(?:enticated)?\b",
r"\breproduc(?:e|ible|ibility)\b",
r"\bgraphql\b",
r"\bapollo\b",
r"\brest\b",
r"\bxflow\b",
r"\bfid4\b",
r"\bfeature flag\b",
r"\btitle\b",
r"\bscope\b",
r"\bdone\b",
r"\bin progress\b",
r"\bblocked?\b",
]
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Import a Slack export into workspace-friendly JSONL and summary artifacts."
)
parser.add_argument("--export-path", required=True, help="Path to the Slack export root.")
parser.add_argument(
"--channels",
default="",
help="Comma-separated channel names to import. When omitted, auto-detect channels by prefix.",
)
parser.add_argument(
"--channel-prefix",
default="fidelity",
help="Default channel prefix to auto-detect when --channels is omitted.",
)
parser.add_argument(
"--all-channels",
action="store_true",
help="Import every channel folder instead of using prefix-based auto-detection.",
)
parser.add_argument(
"--since",
default="",
help="Optional lower bound date in YYYY-MM-DD.",
)
parser.add_argument(
"--until",
default="",
help="Optional upper bound date in YYYY-MM-DD.",
)
parser.add_argument(
"--max-messages",
type=int,
default=0,
help="Maximum number of messages to emit after filtering. 0 means auto-tune based on export size.",
)
parser.add_argument(
"--recent-days",
type=int,
default=180,
help="Recent window used for prioritizing current relevance when selecting from very large archives.",
)
parser.add_argument(
"--output-dir",
default="scripts/slack/generated",
help="Directory where generated artifacts will be written.",
)
return parser.parse_args()
def parse_date(raw: str) -> Optional[datetime]:
if not raw:
return None
return datetime.strptime(raw, "%Y-%m-%d")
def load_users(export_root: Path) -> Dict[str, str]:
users_path = export_root / "users.json"
if not users_path.exists():
return {}
try:
payload = json.loads(users_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
user_map: Dict[str, str] = {}
for user in payload:
user_id = user.get("id", "")
profile = user.get("profile", {}) or {}
name = (
user.get("name")
or profile.get("display_name")
or profile.get("real_name")
or user_id
)
if user_id:
user_map[user_id] = name
return user_map
def available_channels(export_root: Path) -> List[str]:
return sorted(path.name for path in export_root.iterdir() if path.is_dir())
def resolve_channels(export_root: Path, channels: List[str], channel_prefix: str, all_channels: bool) -> List[str]:
if channels:
return channels
available = available_channels(export_root)
if all_channels:
return available
prefix = channel_prefix.strip().lstrip("#")
if prefix:
matched = [name for name in available if name.lower().startswith(prefix.lower())]
if matched:
return matched
return available
def iter_channel_files(export_root: Path, channels: Sequence[str]) -> Iterable[tuple[str, Path]]:
if channels:
candidates = [export_root / name for name in channels]
else:
candidates = [path for path in export_root.iterdir() if path.is_dir()]
for channel_dir in sorted(candidates):
if not channel_dir.is_dir():
continue
for day_file in sorted(channel_dir.glob("*.json")):
yield channel_dir.name, day_file
def date_in_range(day_file: Path, since: Optional[datetime], until: Optional[datetime]) -> bool:
try:
file_day = datetime.strptime(day_file.stem, "%Y-%m-%d")
except ValueError:
return False
if since and file_day < since:
return False
if until and file_day > until:
return False
return True
def resolve_username(message: Dict[str, Any], user_map: Dict[str, str]) -> str:
user_id = message.get("user", "")
if user_id and user_id in user_map:
return user_map[user_id]
if message.get("username"):
return str(message["username"])
return user_id or "unknown"
def message_timestamp(ts_value: str) -> str:
try:
ts_float = float(ts_value)
except (TypeError, ValueError):
return ""
return datetime.fromtimestamp(ts_float).astimezone().isoformat()
def message_datetime(ts_value: str) -> Optional[datetime]:
try:
ts_float = float(ts_value)
except (TypeError, ValueError):
return None
return datetime.fromtimestamp(ts_float).astimezone()
def normalize_message(channel: str, raw: Dict[str, Any], user_map: Dict[str, str]) -> Optional[Dict[str, Any]]:
text = (raw.get("text") or "").strip()
if not text:
return None
subtype = raw.get("subtype", "")
record: Dict[str, Any] = {
"source": "slack",
"channel": channel,
"timestamp": message_timestamp(str(raw.get("ts", ""))),
"username": resolve_username(raw, user_map),
"message": text,
"type": subtype or "message",
"thread_ts": raw.get("thread_ts") or None,
}
if raw.get("reply_count") is not None:
record["reply_count"] = raw.get("reply_count")
if raw.get("ts"):
record["message_id"] = str(raw["ts"])
return record
def auto_max_messages(total_messages: int) -> int:
if total_messages <= 1500:
return total_messages
if total_messages <= 8000:
return 2500
if total_messages <= 25000:
return 4000
return 6000
def score_message(item: Dict[str, Any], recent_cutoff: Optional[datetime]) -> int:
score = 0
text = item.get("message", "")
username = item.get("username", "")
timestamp = item.get("timestamp", "")
jira_matches = JIRA_RE.findall(text)
score += len(jira_matches) * 8
for pattern in HIGH_SIGNAL_PATTERNS:
if pattern.search(text):
score += 4
if item.get("thread_ts"):
score += 1
if item.get("reply_count"):
score += 1
if username and username != "unknown":
score += 1
try:
message_dt = datetime.fromisoformat(timestamp) if timestamp else None
except ValueError:
message_dt = None
if recent_cutoff and message_dt and message_dt >= recent_cutoff:
score += 6
if len(text) >= 120:
score += 2
return score
def select_messages(messages: List[Dict[str, Any]], max_messages: int, recent_days: int) -> List[Dict[str, Any]]:
if len(messages) <= max_messages:
return messages
recent_cutoff = datetime.now().astimezone() - timedelta(days=recent_days)
recent_messages = []
older_messages = []
for item in messages:
try:
message_dt = datetime.fromisoformat(item.get("timestamp", ""))
except ValueError:
message_dt = None
if message_dt and message_dt >= recent_cutoff:
recent_messages.append(item)
else:
older_messages.append(item)
recent_budget = min(len(recent_messages), max(max_messages // 2, 1000))
selected_recent = recent_messages[-recent_budget:]
remaining_budget = max_messages - len(selected_recent)
scored_older = sorted(
older_messages,
key=lambda item: (score_message(item, recent_cutoff), item.get("timestamp", "")),
reverse=True,
)
selected_older = scored_older[:remaining_budget]
selected = selected_recent + selected_older
selected.sort(key=lambda item: item.get("timestamp", ""))
return selected
def collect_messages(
export_root: Path,
channels: Sequence[str],
since: Optional[datetime],
until: Optional[datetime],
max_messages: int,
recent_days: int,
) -> List[Dict[str, Any]]:
user_map = load_users(export_root)
messages: List[Dict[str, Any]] = []
for channel, day_file in iter_channel_files(export_root, channels):
if not date_in_range(day_file, since, until):
continue
try:
payload = json.loads(day_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
continue
if not isinstance(payload, list):
continue
for raw in payload:
record = normalize_message(channel, raw, user_map)
if record:
messages.append(record)
messages.sort(key=lambda item: item.get("timestamp", ""))
tuned_max = max_messages or auto_max_messages(len(messages))
return select_messages(messages, tuned_max, recent_days)
def write_jsonl(messages: List[Dict[str, Any]], path: Path) -> None:
lines = [json.dumps(item, ensure_ascii=False) for item in messages]
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def build_summary(messages: List[Dict[str, Any]], channels: Sequence[str]) -> str:
channel_counter = Counter(item["channel"] for item in messages)
user_counter = Counter(item["username"] for item in messages)
jira_counter = Counter()
for item in messages:
jira_counter.update(JIRA_RE.findall(item.get("message", "")))
lines = [
"# Slack Import Summary",
"",
f"- Messages imported: {len(messages)}",
f"- Channels imported: {', '.join(channels) if channels else 'all detected channels'}",
"",
"## Top Channels",
]
for channel, count in channel_counter.most_common(10):
lines.append(f"- {channel}: {count}")
lines.extend(["", "## Top Participants"])
for username, count in user_counter.most_common(10):
lines.append(f"- {username}: {count}")
lines.extend(["", "## Jira IDs Mentioned"])
if jira_counter:
for jira_id, count in jira_counter.most_common(20):
lines.append(f"- {jira_id}: {count}")
else:
lines.append("- None detected")
lines.extend(
[
"",
"## Guidance",
"- Treat this archive as historical context, not current truth.",
"- Prefer promoting durable patterns, repeated approvals, role mappings, Jira references, and architectural context.",
"- Avoid promoting outdated status unless it still affects current understanding.",
]
)
return "\n".join(lines) + "\n"
def main() -> int:
args = parse_args()
export_root = Path(args.export_path).expanduser().resolve()
if not export_root.exists():
print(f"Export path not found: {export_root}", file=sys.stderr)
return 1
channels = [item.strip().lstrip("#") for item in args.channels.split(",") if item.strip()]
channels = resolve_channels(export_root, channels, args.channel_prefix, args.all_channels)
since = parse_date(args.since)
until = parse_date(args.until)
messages = collect_messages(
export_root=export_root,
channels=channels,
since=since,
until=until,
max_messages=args.max_messages,
recent_days=args.recent_days,
)
output_dir = Path(args.output_dir).expanduser().resolve()
jsonl_path = output_dir / "slack_context.jsonl"
summary_path = output_dir / "slack_summary.md"
write_jsonl(messages, jsonl_path)
summary_path.write_text(build_summary(messages, channels), encoding="utf-8")
print(f"Imported {len(messages)} Slack messages")
print(f"Wrote JSONL: {jsonl_path}")
print(f"Wrote summary: {summary_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())