#!/usr/bin/env python3 """Dependency-free local indexer for AI Workspace canonical Markdown memory. This is intentionally a small lexical/hybrid-ready index. It keeps `project-knowledge/` as the source of truth and writes a derived, disposable JSONL index under `.aiw/indexes//`. """ from __future__ import annotations import argparse import hashlib import json import re import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] INDEX_ROOT = ROOT / ".aiw" / "indexes" DEFAULT_PROFILE = "fidelity" MAX_CHARS = 1800 OVERLAP_CHARS = 180 @dataclass(frozen=True) class Chunk: chunk_id: str path: str heading: str text: str mtime: float sha256: str def project_knowledge_dir(profile: str) -> Path: profile_base = ROOT / "profiles" / profile candidate = profile_base / "project-knowledge" if candidate.exists(): return candidate return ROOT / "project-knowledge" def index_dir(profile: str) -> Path: return INDEX_ROOT / profile def index_path(profile: str) -> Path: return index_dir(profile) / "project-knowledge.jsonl" def manifest_path(profile: str) -> Path: return index_dir(profile) / "manifest.json" def normalize_space(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def tokens(text: str) -> set[str]: return {item for item in re.findall(r"[a-z0-9][a-z0-9_-]{1,}", text.lower()) if len(item) > 1} def iter_markdown_files(base: Path) -> list[Path]: files: list[Path] = [] for path in sorted(base.rglob("*.md")): rel = path.relative_to(base) if str(rel).startswith("09-templates/"): continue files.append(path) return files def heading_for_line(line: str, current: str) -> str: stripped = line.strip() if stripped.startswith("#"): return stripped.lstrip("#").strip() or current return current def split_sections(text: str) -> list[tuple[str, str]]: sections: list[tuple[str, list[str]]] = [("", [])] current_heading = "" for line in text.splitlines(): new_heading = heading_for_line(line, current_heading) if new_heading != current_heading and line.strip().startswith("#"): current_heading = new_heading sections.append((current_heading, [line])) else: sections[-1][1].append(line) return [(heading, "\n".join(lines).strip()) for heading, lines in sections if "\n".join(lines).strip()] def chunk_text(section_text: str, max_chars: int = MAX_CHARS, overlap_chars: int = OVERLAP_CHARS) -> list[str]: text = section_text.strip() if len(text) <= max_chars: return [text] if text else [] chunks: list[str] = [] start = 0 while start < len(text): end = min(len(text), start + max_chars) if end < len(text): boundary = max(text.rfind("\n\n", start, end), text.rfind(". ", start, end)) if boundary > start + max_chars // 2: end = boundary + 1 chunk = text[start:end].strip() if chunk: chunks.append(chunk) if end >= len(text): break start = max(0, end - overlap_chars) return chunks def build_chunks(profile: str) -> list[Chunk]: base = project_knowledge_dir(profile) chunks: list[Chunk] = [] for path in iter_markdown_files(base): raw = path.read_text(encoding="utf-8", errors="replace") rel = str(path.relative_to(ROOT)) digest = hashlib.sha256(raw.encode("utf-8", errors="replace")).hexdigest() mtime = path.stat().st_mtime for section_index, (heading, section) in enumerate(split_sections(raw)): for chunk_index, chunk in enumerate(chunk_text(section)): chunk_digest = hashlib.sha256(f"{rel}\n{section_index}\n{chunk_index}\n{chunk}".encode("utf-8")).hexdigest()[:16] chunks.append(Chunk(chunk_id=chunk_digest, path=rel, heading=heading, text=chunk, mtime=mtime, sha256=digest)) return chunks def write_index(profile: str) -> dict[str, Any]: out_dir = index_dir(profile) out_dir.mkdir(parents=True, exist_ok=True) chunks = build_chunks(profile) with index_path(profile).open("w", encoding="utf-8") as handle: for chunk in chunks: handle.write(json.dumps(chunk.__dict__, ensure_ascii=False, sort_keys=True) + "\n") files = sorted({chunk.path for chunk in chunks}) manifest = { "profile": profile, "source": str(project_knowledge_dir(profile).relative_to(ROOT)), "canonical": False, "derived_from": "project-knowledge", "index_type": "lexical-markdown-chunks", "created_at": datetime.now(timezone.utc).isoformat(), "file_count": len(files), "chunk_count": len(chunks), "index_path": str(index_path(profile).relative_to(ROOT)), } manifest_path(profile).write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8") return manifest def read_index(profile: str) -> list[dict[str, Any]]: path = index_path(profile) if not path.is_file(): return [] rows: list[dict[str, Any]] = [] for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): if not line.strip(): continue try: rows.append(json.loads(line)) except json.JSONDecodeError: continue return rows def score_chunk(query: str, query_tokens: set[str], chunk: dict[str, Any]) -> float: text = str(chunk.get("text") or "") haystack = f"{chunk.get('path', '')} {chunk.get('heading', '')} {text}".lower() exact = haystack.count(query.lower()) chunk_tokens = tokens(haystack) overlap = len(query_tokens & chunk_tokens) if exact == 0 and overlap == 0: return 0.0 heading_bonus = 1.5 if query.lower() in str(chunk.get("heading") or "").lower() else 0.0 path_bonus = 1.0 if query.lower() in str(chunk.get("path") or "").lower() else 0.0 return exact * 5.0 + overlap * 1.25 + heading_bonus + path_bonus def snippet_for(query: str, text: str, width: int = 520) -> str: lowered = text.lower() index = lowered.find(query.lower()) if query else -1 if index < 0: query_terms = tokens(query) candidates = [lowered.find(term) for term in query_terms if lowered.find(term) >= 0] index = min(candidates) if candidates else 0 start = max(0, index - width // 2) end = min(len(text), start + width) return normalize_space(text[start:end]) def search_index(profile: str, query: str, limit: int = 10) -> dict[str, Any]: query = query.strip() if not query: raise SystemExit("query is required") rows = read_index(profile) query_tokens = tokens(query) scored: list[tuple[float, dict[str, Any]]] = [] for row in rows: score = score_chunk(query, query_tokens, row) if score > 0: scored.append((score, row)) scored.sort(key=lambda item: (-item[0], item[1].get("path", ""), item[1].get("chunk_id", ""))) matches = [] for score, row in scored[:limit]: matches.append({ "score": round(score, 3), "path": row.get("path"), "heading": row.get("heading"), "chunk_id": row.get("chunk_id"), "snippet": snippet_for(query, str(row.get("text") or "")), "mtime": row.get("mtime"), "sha256": row.get("sha256"), }) manifest = {} if manifest_path(profile).is_file(): manifest = json.loads(manifest_path(profile).read_text(encoding="utf-8")) return {"profile": profile, "query": query, "canonical": False, "source": "derived-index", "manifest": manifest, "matches": matches} def status(profile: str) -> dict[str, Any]: manifest_file = manifest_path(profile) if not manifest_file.is_file(): return {"profile": profile, "indexed": False, "index_path": str(index_path(profile).relative_to(ROOT))} manifest = json.loads(manifest_file.read_text(encoding="utf-8")) path = index_path(profile) manifest["indexed"] = path.is_file() manifest["index_bytes"] = path.stat().st_size if path.is_file() else 0 manifest["age_seconds"] = int(time.time() - datetime.fromisoformat(manifest["created_at"]).timestamp()) if manifest.get("created_at") else None return manifest def main() -> None: parser = argparse.ArgumentParser(description=__doc__) subparsers = parser.add_subparsers(dest="command", required=True) for name in ["build", "status"]: command = subparsers.add_parser(name) command.add_argument("--profile", default=DEFAULT_PROFILE) search = subparsers.add_parser("search") search.add_argument("query") search.add_argument("--profile", default=DEFAULT_PROFILE) search.add_argument("--limit", type=int, default=10) args = parser.parse_args() if args.command == "build": payload = write_index(args.profile) elif args.command == "search": payload = search_index(args.profile, args.query, limit=max(1, min(args.limit, 50))) else: payload = status(args.profile) print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)) if __name__ == "__main__": main()