Files
fidelity-ai-workspace/scripts/aiw/indexer.py

262 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""Dependency-free local indexer for AI Workspace canonical Markdown memory.
This is intentionally a small lexical/hybrid-ready index. It keeps
`project-knowledge/` as the source of truth and writes a derived, disposable
JSONL index under `.aiw/indexes/<profile>/`.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import re
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_PROFILE = "fidelity"
MAX_CHARS = 1800
OVERLAP_CHARS = 180
sys.path.insert(0, str(Path(__file__).resolve().parent))
import profile as aiw_profile # noqa: E402
@dataclass(frozen=True)
class Chunk:
chunk_id: str
path: str
heading: str
text: str
mtime: float
sha256: str
def project_knowledge_dir(profile: str) -> Path:
return aiw_profile.knowledge_dir(profile, root=ROOT)
def index_dir(profile: str) -> Path:
return aiw_profile.index_dir(profile, root=ROOT)
def rel(path: Path) -> str:
return aiw_profile.relative_to_root(path, root=ROOT)
def index_path(profile: str) -> Path:
return index_dir(profile) / "project-knowledge.jsonl"
def manifest_path(profile: str) -> Path:
return index_dir(profile) / "manifest.json"
def normalize_space(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def tokens(text: str) -> set[str]:
return {item for item in re.findall(r"[a-z0-9][a-z0-9_-]{1,}", text.lower()) if len(item) > 1}
def iter_markdown_files(base: Path) -> list[Path]:
files: list[Path] = []
for path in sorted(base.rglob("*.md")):
rel = path.relative_to(base)
if str(rel).startswith("09-templates/"):
continue
files.append(path)
return files
def heading_for_line(line: str, current: str) -> str:
stripped = line.strip()
if stripped.startswith("#"):
return stripped.lstrip("#").strip() or current
return current
def split_sections(text: str) -> list[tuple[str, str]]:
sections: list[tuple[str, list[str]]] = [("", [])]
current_heading = ""
for line in text.splitlines():
new_heading = heading_for_line(line, current_heading)
if new_heading != current_heading and line.strip().startswith("#"):
current_heading = new_heading
sections.append((current_heading, [line]))
else:
sections[-1][1].append(line)
return [(heading, "\n".join(lines).strip()) for heading, lines in sections if "\n".join(lines).strip()]
def chunk_text(section_text: str, max_chars: int = MAX_CHARS, overlap_chars: int = OVERLAP_CHARS) -> list[str]:
text = section_text.strip()
if len(text) <= max_chars:
return [text] if text else []
chunks: list[str] = []
start = 0
while start < len(text):
end = min(len(text), start + max_chars)
if end < len(text):
boundary = max(text.rfind("\n\n", start, end), text.rfind(". ", start, end))
if boundary > start + max_chars // 2:
end = boundary + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
if end >= len(text):
break
start = max(0, end - overlap_chars)
return chunks
def build_chunks(profile: str) -> list[Chunk]:
base = project_knowledge_dir(profile)
chunks: list[Chunk] = []
for path in iter_markdown_files(base):
raw = path.read_text(encoding="utf-8", errors="replace")
rel_path = rel(path)
digest = hashlib.sha256(raw.encode("utf-8", errors="replace")).hexdigest()
mtime = path.stat().st_mtime
for section_index, (heading, section) in enumerate(split_sections(raw)):
for chunk_index, chunk in enumerate(chunk_text(section)):
chunk_digest = hashlib.sha256(f"{rel_path}\n{section_index}\n{chunk_index}\n{chunk}".encode("utf-8")).hexdigest()[:16]
chunks.append(Chunk(chunk_id=chunk_digest, path=rel_path, heading=heading, text=chunk, mtime=mtime, sha256=digest))
return chunks
def write_index(profile: str) -> dict[str, Any]:
out_dir = index_dir(profile)
out_dir.mkdir(parents=True, exist_ok=True)
chunks = build_chunks(profile)
with index_path(profile).open("w", encoding="utf-8") as handle:
for chunk in chunks:
handle.write(json.dumps(chunk.__dict__, ensure_ascii=False, sort_keys=True) + "\n")
files = sorted({chunk.path for chunk in chunks})
manifest = {
"profile": profile,
"source": rel(project_knowledge_dir(profile)),
"canonical": False,
"derived_from": "project-knowledge",
"index_type": "lexical-markdown-chunks",
"created_at": datetime.now(timezone.utc).isoformat(),
"file_count": len(files),
"chunk_count": len(chunks),
"index_path": rel(index_path(profile)),
}
manifest_path(profile).write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return manifest
def read_index(profile: str) -> list[dict[str, Any]]:
path = index_path(profile)
if not path.is_file():
return []
rows: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
if not line.strip():
continue
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
continue
return rows
def score_chunk(query: str, query_tokens: set[str], chunk: dict[str, Any]) -> float:
text = str(chunk.get("text") or "")
haystack = f"{chunk.get('path', '')} {chunk.get('heading', '')} {text}".lower()
exact = haystack.count(query.lower())
chunk_tokens = tokens(haystack)
overlap = len(query_tokens & chunk_tokens)
if exact == 0 and overlap == 0:
return 0.0
heading_bonus = 1.5 if query.lower() in str(chunk.get("heading") or "").lower() else 0.0
path_bonus = 1.0 if query.lower() in str(chunk.get("path") or "").lower() else 0.0
return exact * 5.0 + overlap * 1.25 + heading_bonus + path_bonus
def snippet_for(query: str, text: str, width: int = 520) -> str:
lowered = text.lower()
index = lowered.find(query.lower()) if query else -1
if index < 0:
query_terms = tokens(query)
candidates = [lowered.find(term) for term in query_terms if lowered.find(term) >= 0]
index = min(candidates) if candidates else 0
start = max(0, index - width // 2)
end = min(len(text), start + width)
return normalize_space(text[start:end])
def search_index(profile: str, query: str, limit: int = 10) -> dict[str, Any]:
query = query.strip()
if not query:
raise SystemExit("query is required")
rows = read_index(profile)
query_tokens = tokens(query)
scored: list[tuple[float, dict[str, Any]]] = []
for row in rows:
score = score_chunk(query, query_tokens, row)
if score > 0:
scored.append((score, row))
scored.sort(key=lambda item: (-item[0], item[1].get("path", ""), item[1].get("chunk_id", "")))
matches = []
for score, row in scored[:limit]:
matches.append({
"score": round(score, 3),
"path": row.get("path"),
"heading": row.get("heading"),
"chunk_id": row.get("chunk_id"),
"snippet": snippet_for(query, str(row.get("text") or "")),
"mtime": row.get("mtime"),
"sha256": row.get("sha256"),
})
manifest = {}
if manifest_path(profile).is_file():
manifest = json.loads(manifest_path(profile).read_text(encoding="utf-8"))
return {"profile": profile, "query": query, "canonical": False, "source": "derived-index", "manifest": manifest, "matches": matches}
def status(profile: str) -> dict[str, Any]:
manifest_file = manifest_path(profile)
if not manifest_file.is_file():
return {"profile": profile, "indexed": False, "index_path": rel(index_path(profile))}
manifest = json.loads(manifest_file.read_text(encoding="utf-8"))
path = index_path(profile)
manifest["indexed"] = path.is_file()
manifest["index_bytes"] = path.stat().st_size if path.is_file() else 0
manifest["age_seconds"] = int(time.time() - datetime.fromisoformat(manifest["created_at"]).timestamp()) if manifest.get("created_at") else None
return manifest
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
subparsers = parser.add_subparsers(dest="command", required=True)
for name in ["build", "status"]:
command = subparsers.add_parser(name)
command.add_argument("--profile", default=DEFAULT_PROFILE)
search = subparsers.add_parser("search")
search.add_argument("query")
search.add_argument("--profile", default=DEFAULT_PROFILE)
search.add_argument("--limit", type=int, default=10)
args = parser.parse_args()
if args.command == "build":
payload = write_index(args.profile)
elif args.command == "search":
payload = search_index(args.profile, args.query, limit=max(1, min(args.limit, 50)))
else:
payload = status(args.profile)
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
if __name__ == "__main__":
main()