Files
fidelity-ai-workspace/scripts/slack/import_slack_export.py
david.delagneau 8026da5719 Refactor AI workspace for improved context management and communication integration
- Introduced new commands and skills for workspace memory curation, professional communication, and status reporting.
- Updated existing commands to utilize new skills and improve clarity in instructions.
- Created a new workspace context command to load reusable core and active project profile.
- Enhanced Mattermost inbox integration with support for generic environment variables.
- Established a clear separation between project-independent core logic and project-specific profiles.
- Improved documentation across various files to reflect changes in workflow and command usage.
- Added operational memory management rules to ensure accurate context promotion and correction.
- Updated README and workflow documents to guide users in utilizing the new structure effectively.
2026-04-16 08:35:53 -06:00

647 lines
22 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import re
import sys
from collections import Counter
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence
JIRA_RE = re.compile(r"\b[A-Z][A-Z0-9]+-\d+\b")
HIGH_SIGNAL_PATTERNS = [
re.compile(pattern, re.IGNORECASE)
for pattern in [
r"\bdecision\b",
r"\broot cause\b",
r"\bapproved?\b",
r"\bpoints?\b",
r"\bepic\b",
r"\bregression\b",
r"\bauth(?:enticated)?\b",
r"\breproduc(?:e|ible|ibility)\b",
r"\bgraphql\b",
r"\bapollo\b",
r"\brest\b",
r"\bxflow\b",
r"\bfid4\b",
r"\bfeature flag\b",
r"\btitle\b",
r"\bscope\b",
r"\bowner(?:ship)?\b",
r"\brollout\b",
r"\bmigration\b",
r"\bdependency\b",
r"\bcontract\b",
r"\blifecycle\b",
r"\bswiftui\b",
r"\bbug\b",
r"\bissue\b",
r"\bincident\b",
r"\bfix(?:ed)?\b",
r"\bvalidation\b",
r"\bdob\b",
r"\bteenidentitycheck\b",
r"\bdone\b",
r"\bin progress\b",
r"\bblocked?\b",
]
]
TOPIC_PATTERNS = {
"xflow_swiftui": re.compile(
r"\b(xflow|swiftui|viewmaker|delegate|lifecycle|navigation|next button|markdown modal|validation)\b",
re.IGNORECASE,
),
"rest_graphql": re.compile(
r"\b(rest|graphql|apollo|feature flag|transport)\b",
re.IGNORECASE,
),
"pipeline_ci": re.compile(
r"\b(apex|apexkit|pipeline|ci|preview macro|analytics|build|archive|sampleapp)\b",
re.IGNORECASE,
),
"auth_repro": re.compile(
r"\b(auth|authenticated|non-auth|reproduc|teenidentitycheck|dob|regression|external report)\b",
re.IGNORECASE,
),
"process_communication": re.compile(
r"\b(approved|title|description|scope|points|jira|pr|wording|send this|manager)\b",
re.IGNORECASE,
),
}
ROLE_HINT_PATTERNS = {
"manager_or_lead": re.compile(
r"\b(approved|use this for the description|send it to|did you make the story|can start on this|estimate|points)\b",
re.IGNORECASE,
),
"xflow_ios_engineer": re.compile(
r"\b(xflow|swiftui|viewmaker|delegate|lifecycle|navigation|validation|next button)\b",
re.IGNORECASE,
),
"build_pipeline_engineer": re.compile(
r"\b(apex|apexkit|pipeline|ci|analytics|preview macro|archive|sampleapp|jenkins|sonarqube)\b",
re.IGNORECASE,
),
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Import a Slack export into workspace-friendly JSONL and summary artifacts."
)
parser.add_argument("--export-path", required=True, help="Path to the Slack export root.")
parser.add_argument(
"--channels",
default="",
help="Comma-separated channel names to import. When omitted, auto-detect channels by prefix.",
)
parser.add_argument(
"--channel-prefix",
default=os.getenv("AIW_CHANNEL_PREFIX", "fidelity"),
help="Default channel prefix to auto-detect when --channels is omitted.",
)
parser.add_argument(
"--all-channels",
action="store_true",
help="Import every channel folder instead of using prefix-based auto-detection.",
)
parser.add_argument(
"--since",
default="",
help="Optional lower bound date in YYYY-MM-DD.",
)
parser.add_argument(
"--until",
default="",
help="Optional upper bound date in YYYY-MM-DD.",
)
parser.add_argument(
"--max-messages",
type=int,
default=0,
help="Maximum number of messages to emit after filtering. 0 means auto-tune based on export size.",
)
parser.add_argument(
"--recent-days",
type=int,
default=180,
help="Recent window used for prioritizing current relevance when selecting from very large archives.",
)
parser.add_argument(
"--output-dir",
default="scripts/slack/generated",
help="Directory where generated artifacts will be written.",
)
return parser.parse_args()
def parse_date(raw: str) -> Optional[datetime]:
if not raw:
return None
return datetime.strptime(raw, "%Y-%m-%d")
def load_users(export_root: Path) -> Dict[str, Dict[str, str]]:
users_path = export_root / "users.json"
if not users_path.exists():
return {}
try:
payload = json.loads(users_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
user_map: Dict[str, Dict[str, str]] = {}
for user in payload:
user_id = user.get("id", "")
profile = user.get("profile", {}) or {}
handle = user.get("name") or user_id
display_name = profile.get("real_name") or profile.get("display_name") or handle
if user_id:
user_map[user_id] = {
"handle": handle,
"display_name": display_name,
}
return user_map
def available_channels(export_root: Path) -> List[str]:
return sorted(path.name for path in export_root.iterdir() if path.is_dir())
def resolve_channels(export_root: Path, channels: List[str], channel_prefix: str, all_channels: bool) -> List[str]:
if channels:
return channels
available = available_channels(export_root)
if all_channels:
return available
prefix = channel_prefix.strip().lstrip("#")
if prefix:
matched = [name for name in available if name.lower().startswith(prefix.lower())]
if matched:
return matched
return available
def iter_channel_files(export_root: Path, channels: Sequence[str]) -> Iterable[tuple[str, Path]]:
if channels:
candidates = [export_root / name for name in channels]
else:
candidates = [path for path in export_root.iterdir() if path.is_dir()]
for channel_dir in sorted(candidates):
if not channel_dir.is_dir():
continue
for day_file in sorted(channel_dir.glob("*.json")):
yield channel_dir.name, day_file
def date_in_range(day_file: Path, since: Optional[datetime], until: Optional[datetime]) -> bool:
try:
file_day = datetime.strptime(day_file.stem, "%Y-%m-%d")
except ValueError:
return False
if since and file_day < since:
return False
if until and file_day > until:
return False
return True
def resolve_user_identity(message: Dict[str, Any], user_map: Dict[str, Dict[str, str]]) -> Dict[str, str]:
user_id = message.get("user", "")
if user_id and user_id in user_map:
return user_map[user_id]
if message.get("username"):
username = str(message["username"])
return {"handle": username, "display_name": username}
fallback = user_id or "unknown"
return {"handle": fallback, "display_name": fallback}
def message_timestamp(ts_value: str) -> str:
try:
ts_float = float(ts_value)
except (TypeError, ValueError):
return ""
return datetime.fromtimestamp(ts_float).astimezone().isoformat()
def message_datetime(ts_value: str) -> Optional[datetime]:
try:
ts_float = float(ts_value)
except (TypeError, ValueError):
return None
return datetime.fromtimestamp(ts_float).astimezone()
def normalize_message(channel: str, raw: Dict[str, Any], user_map: Dict[str, Dict[str, str]]) -> Optional[Dict[str, Any]]:
text = (raw.get("text") or "").strip()
if not text:
return None
subtype = raw.get("subtype", "")
identity = resolve_user_identity(raw, user_map)
record: Dict[str, Any] = {
"source": "slack",
"channel": channel,
"timestamp": message_timestamp(str(raw.get("ts", ""))),
"username": identity["display_name"],
"slack_handle": identity["handle"],
"message": text,
"type": subtype or "message",
"thread_ts": raw.get("thread_ts") or None,
}
if raw.get("reply_count") is not None:
record["reply_count"] = raw.get("reply_count")
if raw.get("ts"):
record["message_id"] = str(raw["ts"])
return record
def auto_max_messages(total_messages: int) -> int:
if total_messages <= 1500:
return total_messages
if total_messages <= 8000:
return 2500
if total_messages <= 25000:
return 4000
if total_messages <= 80000:
return 6500
return 8000
def score_message(item: Dict[str, Any], recent_cutoff: Optional[datetime]) -> int:
score = 0
text = item.get("message", "")
username = item.get("username", "")
timestamp = item.get("timestamp", "")
jira_matches = JIRA_RE.findall(text)
score += len(jira_matches) * 8
for pattern in HIGH_SIGNAL_PATTERNS:
if pattern.search(text):
score += 4
if item.get("thread_ts"):
score += 1
if item.get("reply_count"):
score += 1
if username and username != "unknown":
score += 1
try:
message_dt = datetime.fromisoformat(timestamp) if timestamp else None
except ValueError:
message_dt = None
if recent_cutoff and message_dt and message_dt >= recent_cutoff:
score += 6
if len(text) >= 120:
score += 2
return score
def message_key(item: Dict[str, Any]) -> str:
return item.get("message_id") or "|".join(
[
item.get("channel", ""),
item.get("timestamp", ""),
item.get("username", ""),
item.get("message", ""),
]
)
def message_year(item: Dict[str, Any]) -> str:
timestamp = item.get("timestamp", "")
try:
return str(datetime.fromisoformat(timestamp).year)
except ValueError:
return "unknown"
def add_unique_items(
selected: List[Dict[str, Any]],
seen: set[str],
candidates: Sequence[Dict[str, Any]],
limit: int,
) -> None:
if limit <= 0:
return
for item in candidates:
if len(selected) >= limit:
return
key = message_key(item)
if key in seen:
continue
seen.add(key)
selected.append(item)
def select_messages(messages: List[Dict[str, Any]], max_messages: int, recent_days: int) -> List[Dict[str, Any]]:
if len(messages) <= max_messages:
return messages
recent_cutoff = datetime.now().astimezone() - timedelta(days=recent_days)
scored_messages: List[Dict[str, Any]] = []
recent_messages: List[Dict[str, Any]] = []
channel_year_buckets: Dict[tuple[str, str], List[Dict[str, Any]]] = {}
jira_buckets: Dict[str, List[Dict[str, Any]]] = {}
for item in messages:
item["_score"] = score_message(item, recent_cutoff)
scored_messages.append(item)
if item["_score"] > 0:
bucket_key = (item.get("channel", ""), message_year(item))
channel_year_buckets.setdefault(bucket_key, []).append(item)
jira_ids = JIRA_RE.findall(item.get("message", ""))
for jira_id in jira_ids:
jira_buckets.setdefault(jira_id, []).append(item)
try:
message_dt = datetime.fromisoformat(item.get("timestamp", ""))
except ValueError:
message_dt = None
if message_dt and message_dt >= recent_cutoff:
recent_messages.append(item)
selected: List[Dict[str, Any]] = []
seen: set[str] = set()
recent_budget = min(len(recent_messages), max(max_messages // 5, 300))
add_unique_items(selected, seen, recent_messages[-recent_budget:], recent_budget)
channel_year_keys = sorted(channel_year_buckets.keys(), key=lambda key: (key[1], key[0]))
remaining_after_recent = max_messages - len(selected)
coverage_budget = min(
remaining_after_recent,
max(remaining_after_recent // 3, min(len(channel_year_keys) * 2, remaining_after_recent)),
)
if channel_year_keys and coverage_budget > 0:
per_bucket = max(1, min(4, coverage_budget // len(channel_year_keys) or 1))
for bucket_key in channel_year_keys:
bucket_items = sorted(
channel_year_buckets[bucket_key],
key=lambda item: (item.get("_score", 0), item.get("timestamp", "")),
reverse=True,
)
add_unique_items(selected, seen, bucket_items, min(max_messages, len(selected) + per_bucket))
remaining_after_coverage = max_messages - len(selected)
jira_ranked = sorted(
jira_buckets.items(),
key=lambda pair: (
max(item.get("_score", 0) for item in pair[1]),
len(pair[1]),
pair[0],
),
reverse=True,
)
jira_budget = min(remaining_after_coverage, max(remaining_after_coverage // 4, 150))
if jira_budget > 0:
for _, bucket_items in jira_ranked:
ranked_items = sorted(
bucket_items,
key=lambda item: (item.get("_score", 0), item.get("timestamp", "")),
reverse=True,
)
add_unique_items(selected, seen, ranked_items, min(max_messages, len(selected) + 2))
if len(selected) >= recent_budget + coverage_budget + jira_budget:
break
ranked_messages = sorted(
scored_messages,
key=lambda item: (item.get("_score", 0), item.get("timestamp", "")),
reverse=True,
)
add_unique_items(selected, seen, ranked_messages, max_messages)
for item in selected:
item.pop("_score", None)
selected.sort(key=lambda item: item.get("timestamp", ""))
return selected
def collect_messages(
export_root: Path,
channels: Sequence[str],
since: Optional[datetime],
until: Optional[datetime],
max_messages: int,
recent_days: int,
) -> List[Dict[str, Any]]:
user_map = load_users(export_root)
messages: List[Dict[str, Any]] = []
for channel, day_file in iter_channel_files(export_root, channels):
if not date_in_range(day_file, since, until):
continue
try:
payload = json.loads(day_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
continue
if not isinstance(payload, list):
continue
for raw in payload:
record = normalize_message(channel, raw, user_map)
if record:
messages.append(record)
messages.sort(key=lambda item: item.get("timestamp", ""))
tuned_max = max_messages or auto_max_messages(len(messages))
return select_messages(messages, tuned_max, recent_days)
def write_jsonl(messages: List[Dict[str, Any]], path: Path) -> None:
lines = [json.dumps(item, ensure_ascii=False) for item in messages]
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def build_summary(messages: List[Dict[str, Any]], channels: Sequence[str]) -> str:
channel_counter = Counter(item["channel"] for item in messages)
user_counter = Counter(item["username"] for item in messages)
jira_counter = Counter()
year_counter = Counter()
topic_counter = Counter()
user_topic_counter: Dict[str, Counter] = {}
user_channel_counter: Dict[str, Counter] = {}
user_year_counter: Dict[str, Counter] = {}
user_signal_counter = Counter()
user_examples: Dict[str, List[str]] = {}
user_handles: Dict[str, str] = {}
for item in messages:
message = item.get("message", "")
username = item.get("username", "")
user_handles.setdefault(username, item.get("slack_handle", username))
jira_counter.update(JIRA_RE.findall(message))
year = message_year(item)
year_counter.update([year])
user_channel_counter.setdefault(username, Counter()).update([item["channel"]])
user_year_counter.setdefault(username, Counter()).update([year])
topics = [name for name, pattern in TOPIC_PATTERNS.items() if pattern.search(message)]
if topics:
topic_counter.update(topics)
user_signal_counter[username] += len(topics) + len(JIRA_RE.findall(message))
user_topic_counter.setdefault(username, Counter()).update(topics)
example = f"{item['timestamp']} {item['channel']}: {message[:140].replace(chr(10), ' ')}"
user_examples.setdefault(username, [])
if len(user_examples[username]) < 2 and example not in user_examples[username]:
user_examples[username].append(example)
role_hits = 0
for pattern in ROLE_HINT_PATTERNS.values():
if pattern.search(message):
role_hits += 1
if role_hits:
user_signal_counter[username] += role_hits
first_timestamp = messages[0]["timestamp"] if messages else "n/a"
last_timestamp = messages[-1]["timestamp"] if messages else "n/a"
lines = [
"# Slack Import Summary",
"",
f"- Messages imported: {len(messages)}",
f"- Channels imported: {', '.join(channels) if channels else 'all detected channels'}",
f"- Time span covered: {first_timestamp} -> {last_timestamp}",
"",
"## Top Channels",
]
for channel, count in channel_counter.most_common(10):
lines.append(f"- {channel}: {count}")
lines.extend(["", "## Top Participants"])
for username, count in user_counter.most_common(10):
lines.append(f"- {username}: {count}")
lines.extend(["", "## Jira IDs Mentioned"])
if jira_counter:
for jira_id, count in jira_counter.most_common(20):
lines.append(f"- {jira_id}: {count}")
else:
lines.append("- None detected")
lines.extend(["", "## Historical Coverage"])
for year, count in year_counter.most_common():
lines.append(f"- {year}: {count}")
lines.extend(["", "## Topic Signals"])
if topic_counter:
for topic, count in topic_counter.most_common():
lines.append(f"- {topic}: {count}")
else:
lines.append("- No topic patterns matched")
lines.extend(["", "## People Worth Reviewing"])
ranked_people = sorted(
user_counter,
key=lambda username: (
user_signal_counter[username],
len(user_channel_counter.get(username, {})),
len(user_year_counter.get(username, {})),
user_counter[username],
),
reverse=True,
)
candidate_count = 0
for username in ranked_people:
handle = user_handles.get(username, username).lower()
if handle in {"uslackbot", "internal trackit report", "jirabot", "geekbot"}:
continue
channels_seen = len(user_channel_counter.get(username, {}))
years_seen = len(user_year_counter.get(username, {}))
signal = user_signal_counter[username]
if signal < 12 and user_counter[username] < 20:
continue
top_topics = ", ".join(
topic for topic, _ in user_topic_counter.get(username, Counter()).most_common(3)
) or "general project discussion"
name_label = username if handle == username else f"{username} (Slack: {handle})"
lines.append(
f"- {name_label}: {user_counter[username]} messages, signal={signal}, channels={channels_seen}, years={years_seen}, topics={top_topics}"
)
for example in user_examples.get(username, []):
lines.append(f" Evidence: {example}")
candidate_count += 1
if candidate_count >= 8:
break
lines.extend(["", "## Import Guidance"])
lines.append("- Create or update person files for repeated humans with multi-channel or multi-year involvement.")
lines.append("- Prefer storing exact role only when the archive states it clearly; otherwise store relationship and collaboration pattern.")
lines.append("- Promote repeated Jira/title/scope/approval patterns when they still clarify current project understanding.")
lines.append("- Keep old status-only updates archive-only unless they alter current context.")
lines.extend(
[
"",
"## Guidance",
"- Treat this archive as historical context, not current truth.",
"- The importer preserves recent context and older high-signal evidence across channels and years.",
"- Prefer promoting durable patterns, repeated approvals, role mappings, Jira references, and architectural context.",
"- Avoid promoting outdated status unless it still affects current understanding.",
]
)
return "\n".join(lines) + "\n"
def main() -> int:
args = parse_args()
export_root = Path(args.export_path).expanduser().resolve()
if not export_root.exists():
print(f"Export path not found: {export_root}", file=sys.stderr)
return 1
channels = [item.strip().lstrip("#") for item in args.channels.split(",") if item.strip()]
channels = resolve_channels(export_root, channels, args.channel_prefix, args.all_channels)
since = parse_date(args.since)
until = parse_date(args.until)
messages = collect_messages(
export_root=export_root,
channels=channels,
since=since,
until=until,
max_messages=args.max_messages,
recent_days=args.recent_days,
)
output_dir = Path(args.output_dir).expanduser().resolve()
jsonl_path = output_dir / "slack_context.jsonl"
summary_path = output_dir / "slack_summary.md"
write_jsonl(messages, jsonl_path)
summary_path.write_text(build_summary(messages, channels), encoding="utf-8")
print(f"Imported {len(messages)} Slack messages")
print(f"Wrote JSONL: {jsonl_path}")
print(f"Wrote summary: {summary_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())