feat: Enhance Mattermost extractor to support channel names and improve configuration options
This commit is contained in:
@@ -21,15 +21,19 @@ REQUEST_TIMEOUT = 15
|
||||
|
||||
LOGGER = logging.getLogger("mattermost_context")
|
||||
USER_CACHE: Dict[str, Dict[str, Any]] = {}
|
||||
CHANNEL_CACHE: Dict[str, Dict[str, Any]] = {}
|
||||
TEAM_CACHE: List[Dict[str, Any]] | None = None
|
||||
|
||||
MATTERMOST_URL = ""
|
||||
CHANNEL_IDS: List[str] = []
|
||||
CHANNEL_SPECS: List[Dict[str, str]] = []
|
||||
WINDOW_HOURS = DEFAULT_WINDOW_HOURS
|
||||
MAX_MESSAGES = DEFAULT_MAX_MESSAGES
|
||||
CUTOFF_TIMESTAMP_MS = 0
|
||||
OUTPUT_FILE = DEFAULT_OUTPUT_FILE
|
||||
REQUEST_HEADERS: Dict[str, str] = {}
|
||||
SSL_CONTEXT: ssl.SSLContext | None = None
|
||||
MATTERMOST_TEAM_NAME = ""
|
||||
MATTERMOST_TEAM_ID = ""
|
||||
|
||||
|
||||
class MattermostAPIError(RuntimeError):
|
||||
@@ -84,6 +88,36 @@ def parse_channel_ids(raw_value: str) -> List[str]:
|
||||
return channel_ids
|
||||
|
||||
|
||||
def looks_like_channel_id(value: str) -> bool:
|
||||
return len(value) == 26 and value.isalnum()
|
||||
|
||||
|
||||
def parse_channel_specs() -> List[Dict[str, str]]:
|
||||
raw_channels = os.getenv("CHANNELS", "").strip()
|
||||
raw_channel_names = os.getenv("CHANNEL_NAMES", "").strip()
|
||||
raw_channel_ids = os.getenv("CHANNEL_IDS", "").strip()
|
||||
|
||||
entries: List[str] = []
|
||||
if raw_channels:
|
||||
entries.extend(parse_channel_ids(raw_channels))
|
||||
if raw_channel_names:
|
||||
entries.extend(parse_channel_ids(raw_channel_names))
|
||||
if raw_channel_ids:
|
||||
entries.extend(parse_channel_ids(raw_channel_ids))
|
||||
|
||||
if not entries:
|
||||
raise ValueError("Configure at least one of CHANNELS, CHANNEL_NAMES, or CHANNEL_IDS.")
|
||||
|
||||
specs: List[Dict[str, str]] = []
|
||||
for entry in entries:
|
||||
if looks_like_channel_id(entry):
|
||||
specs.append({"kind": "id", "value": entry})
|
||||
else:
|
||||
specs.append({"kind": "name", "value": entry.lstrip("#")})
|
||||
|
||||
return specs
|
||||
|
||||
|
||||
def build_ssl_context() -> ssl.SSLContext:
|
||||
ca_bundle = os.getenv("MATTERMOST_CA_BUNDLE", "").strip()
|
||||
skip_tls_verify = parse_bool_env("MATTERMOST_SKIP_TLS_VERIFY", default=False)
|
||||
@@ -100,16 +134,19 @@ def build_ssl_context() -> ssl.SSLContext:
|
||||
|
||||
|
||||
def configure() -> None:
|
||||
global MATTERMOST_URL, CHANNEL_IDS, WINDOW_HOURS, MAX_MESSAGES, CUTOFF_TIMESTAMP_MS, OUTPUT_FILE
|
||||
global MATTERMOST_URL, CHANNEL_SPECS, WINDOW_HOURS, MAX_MESSAGES, CUTOFF_TIMESTAMP_MS, OUTPUT_FILE
|
||||
global REQUEST_HEADERS, SSL_CONTEXT
|
||||
global MATTERMOST_TEAM_NAME, MATTERMOST_TEAM_ID
|
||||
|
||||
load_dotenv_file()
|
||||
MATTERMOST_URL = require_env("MATTERMOST_URL").rstrip("/")
|
||||
token = require_env("MATTERMOST_TOKEN")
|
||||
CHANNEL_IDS = parse_channel_ids(require_env("CHANNEL_IDS"))
|
||||
CHANNEL_SPECS = parse_channel_specs()
|
||||
WINDOW_HOURS = int(os.getenv("MESSAGE_WINDOW_HOURS", str(DEFAULT_WINDOW_HOURS)))
|
||||
MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", str(DEFAULT_MAX_MESSAGES)))
|
||||
OUTPUT_FILE = os.getenv("MATTERMOST_OUTPUT_FILE", DEFAULT_OUTPUT_FILE).strip() or DEFAULT_OUTPUT_FILE
|
||||
MATTERMOST_TEAM_NAME = os.getenv("MATTERMOST_TEAM_NAME", "").strip()
|
||||
MATTERMOST_TEAM_ID = os.getenv("MATTERMOST_TEAM_ID", "").strip()
|
||||
|
||||
if WINDOW_HOURS <= 0:
|
||||
raise ValueError("MESSAGE_WINDOW_HOURS must be greater than 0.")
|
||||
@@ -193,6 +230,110 @@ def get_channel_posts(channel_id: str) -> List[Dict[str, Any]]:
|
||||
return collected
|
||||
|
||||
|
||||
def get_channel_by_id(channel_id: str) -> Dict[str, Any]:
|
||||
cache_key = f"id:{channel_id}"
|
||||
if cache_key in CHANNEL_CACHE:
|
||||
return CHANNEL_CACHE[cache_key]
|
||||
|
||||
channel_data = api_get_json(f"/api/v4/channels/{channel_id}")
|
||||
CHANNEL_CACHE[cache_key] = channel_data
|
||||
return channel_data
|
||||
|
||||
|
||||
def get_channel_by_name(channel_name: str) -> Dict[str, Any]:
|
||||
cache_key = f"name:{channel_name}"
|
||||
if cache_key in CHANNEL_CACHE:
|
||||
return CHANNEL_CACHE[cache_key]
|
||||
|
||||
channel_data: Dict[str, Any]
|
||||
if MATTERMOST_TEAM_ID or MATTERMOST_TEAM_NAME:
|
||||
channel_data = get_channel_by_name_for_team(channel_name)
|
||||
else:
|
||||
channel_data = find_channel_across_user_teams(channel_name)
|
||||
|
||||
CHANNEL_CACHE[cache_key] = channel_data
|
||||
CHANNEL_CACHE[f"id:{channel_data.get('id', '')}"] = channel_data
|
||||
return channel_data
|
||||
|
||||
|
||||
def get_channel_by_name_for_team(channel_name: str) -> Dict[str, Any]:
|
||||
if MATTERMOST_TEAM_ID:
|
||||
api_path = f"/api/v4/teams/{MATTERMOST_TEAM_ID}/channels/name/{parse.quote(channel_name, safe='')}"
|
||||
else:
|
||||
api_path = f"/api/v4/teams/name/{parse.quote(MATTERMOST_TEAM_NAME, safe='')}/channels/name/{parse.quote(channel_name, safe='')}"
|
||||
return api_get_json(api_path)
|
||||
|
||||
|
||||
def get_user_teams() -> List[Dict[str, Any]]:
|
||||
global TEAM_CACHE
|
||||
|
||||
if TEAM_CACHE is not None:
|
||||
return TEAM_CACHE
|
||||
|
||||
teams = api_get_json("/api/v4/users/me/teams")
|
||||
if not isinstance(teams, list):
|
||||
raise MattermostAPIError("Unexpected response while listing user teams.")
|
||||
|
||||
TEAM_CACHE = teams
|
||||
return TEAM_CACHE
|
||||
|
||||
|
||||
def get_user_channels_for_team(team_id: str) -> List[Dict[str, Any]]:
|
||||
channels = api_get_json(f"/api/v4/users/me/teams/{team_id}/channels")
|
||||
if not isinstance(channels, list):
|
||||
raise MattermostAPIError(f"Unexpected response while listing channels for team {team_id}.")
|
||||
return channels
|
||||
|
||||
|
||||
def find_channel_across_user_teams(channel_name: str) -> Dict[str, Any]:
|
||||
matches: List[Dict[str, Any]] = []
|
||||
|
||||
for team in get_user_teams():
|
||||
team_id = team.get("id", "")
|
||||
team_name = team.get("name", "")
|
||||
if not team_id:
|
||||
continue
|
||||
|
||||
try:
|
||||
channels = get_user_channels_for_team(team_id)
|
||||
except MattermostAPIError as exc:
|
||||
LOGGER.warning("Could not list channels for team %s: %s", team_name or team_id, exc)
|
||||
continue
|
||||
|
||||
for channel in channels:
|
||||
if channel.get("name") == channel_name:
|
||||
channel = dict(channel)
|
||||
channel["_resolved_team_name"] = team_name
|
||||
channel["_resolved_team_id"] = team_id
|
||||
matches.append(channel)
|
||||
|
||||
if not matches:
|
||||
raise MattermostAPIError(
|
||||
f"Unable to find channel named '{channel_name}' in the current user's accessible teams."
|
||||
)
|
||||
|
||||
if len(matches) > 1:
|
||||
teams = ", ".join(sorted({match.get("_resolved_team_name", match.get("_resolved_team_id", "unknown")) for match in matches}))
|
||||
raise MattermostAPIError(
|
||||
f"Channel name '{channel_name}' is ambiguous across teams: {teams}. Set MATTERMOST_TEAM_NAME or MATTERMOST_TEAM_ID."
|
||||
)
|
||||
|
||||
return matches[0]
|
||||
|
||||
|
||||
def resolve_channels() -> List[Dict[str, Any]]:
|
||||
resolved: List[Dict[str, Any]] = []
|
||||
|
||||
for spec in CHANNEL_SPECS:
|
||||
if spec["kind"] == "id":
|
||||
channel_data = get_channel_by_id(spec["value"])
|
||||
else:
|
||||
channel_data = get_channel_by_name(spec["value"])
|
||||
resolved.append(channel_data)
|
||||
|
||||
return resolved
|
||||
|
||||
|
||||
def get_user_info(user_id: str) -> Dict[str, Any]:
|
||||
if not user_id:
|
||||
return {"id": "", "username": "unknown"}
|
||||
@@ -237,7 +378,10 @@ def is_system_message(post: Dict[str, Any]) -> bool:
|
||||
def extract_messages() -> List[Dict[str, Any]]:
|
||||
all_messages: List[Dict[str, Any]] = []
|
||||
|
||||
for channel_id in CHANNEL_IDS:
|
||||
for channel in resolve_channels():
|
||||
channel_id = channel.get("id", "")
|
||||
channel_name = channel.get("name", "") or channel_id
|
||||
channel_display_name = channel.get("display_name", "") or channel_name
|
||||
raw_posts = get_channel_posts(channel_id)
|
||||
for post in raw_posts:
|
||||
if is_system_message(post):
|
||||
@@ -250,7 +394,9 @@ def extract_messages() -> List[Dict[str, Any]]:
|
||||
all_messages.append(
|
||||
{
|
||||
"channel_id": channel_id,
|
||||
"channel_ref": channel_id,
|
||||
"channel_ref": channel_name,
|
||||
"channel_name": channel_name,
|
||||
"channel_display_name": channel_display_name,
|
||||
"post_id": post.get("id", ""),
|
||||
"user_id": post.get("user_id", ""),
|
||||
"create_at": int(post.get("create_at", 0)),
|
||||
@@ -295,6 +441,7 @@ def format_messages(messages: List[Dict[str, Any]]) -> str:
|
||||
"source": "mattermost",
|
||||
"channel": channel_ref,
|
||||
"channel_id": message.get("channel_id", ""),
|
||||
"channel_display_name": message.get("channel_display_name", channel_ref),
|
||||
"post_id": post_id,
|
||||
"thread_id": thread_id,
|
||||
"root_id": root_id or None,
|
||||
|
||||
Reference in New Issue
Block a user