feat: implement local indexer for project-knowledge and add memory hybrid search functionality
This commit is contained in:
258
scripts/aiw/indexer.py
Normal file
258
scripts/aiw/indexer.py
Normal file
@@ -0,0 +1,258 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Dependency-free local indexer for AI Workspace canonical Markdown memory.
|
||||
|
||||
This is intentionally a small lexical/hybrid-ready index. It keeps
|
||||
`project-knowledge/` as the source of truth and writes a derived, disposable
|
||||
JSONL index under `.aiw/indexes/<profile>/`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
INDEX_ROOT = ROOT / ".aiw" / "indexes"
|
||||
DEFAULT_PROFILE = "fidelity"
|
||||
MAX_CHARS = 1800
|
||||
OVERLAP_CHARS = 180
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Chunk:
|
||||
chunk_id: str
|
||||
path: str
|
||||
heading: str
|
||||
text: str
|
||||
mtime: float
|
||||
sha256: str
|
||||
|
||||
|
||||
def project_knowledge_dir(profile: str) -> Path:
|
||||
profile_base = ROOT / "profiles" / profile
|
||||
candidate = profile_base / "project-knowledge"
|
||||
if candidate.exists():
|
||||
return candidate
|
||||
return ROOT / "project-knowledge"
|
||||
|
||||
|
||||
def index_dir(profile: str) -> Path:
|
||||
return INDEX_ROOT / profile
|
||||
|
||||
|
||||
def index_path(profile: str) -> Path:
|
||||
return index_dir(profile) / "project-knowledge.jsonl"
|
||||
|
||||
|
||||
def manifest_path(profile: str) -> Path:
|
||||
return index_dir(profile) / "manifest.json"
|
||||
|
||||
|
||||
def normalize_space(text: str) -> str:
|
||||
return re.sub(r"\s+", " ", text).strip()
|
||||
|
||||
|
||||
def tokens(text: str) -> set[str]:
|
||||
return {item for item in re.findall(r"[a-z0-9][a-z0-9_-]{1,}", text.lower()) if len(item) > 1}
|
||||
|
||||
|
||||
def iter_markdown_files(base: Path) -> list[Path]:
|
||||
files: list[Path] = []
|
||||
for path in sorted(base.rglob("*.md")):
|
||||
rel = path.relative_to(base)
|
||||
if str(rel).startswith("09-templates/"):
|
||||
continue
|
||||
files.append(path)
|
||||
return files
|
||||
|
||||
|
||||
def heading_for_line(line: str, current: str) -> str:
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
return stripped.lstrip("#").strip() or current
|
||||
return current
|
||||
|
||||
|
||||
def split_sections(text: str) -> list[tuple[str, str]]:
|
||||
sections: list[tuple[str, list[str]]] = [("", [])]
|
||||
current_heading = ""
|
||||
for line in text.splitlines():
|
||||
new_heading = heading_for_line(line, current_heading)
|
||||
if new_heading != current_heading and line.strip().startswith("#"):
|
||||
current_heading = new_heading
|
||||
sections.append((current_heading, [line]))
|
||||
else:
|
||||
sections[-1][1].append(line)
|
||||
return [(heading, "\n".join(lines).strip()) for heading, lines in sections if "\n".join(lines).strip()]
|
||||
|
||||
|
||||
def chunk_text(section_text: str, max_chars: int = MAX_CHARS, overlap_chars: int = OVERLAP_CHARS) -> list[str]:
|
||||
text = section_text.strip()
|
||||
if len(text) <= max_chars:
|
||||
return [text] if text else []
|
||||
chunks: list[str] = []
|
||||
start = 0
|
||||
while start < len(text):
|
||||
end = min(len(text), start + max_chars)
|
||||
if end < len(text):
|
||||
boundary = max(text.rfind("\n\n", start, end), text.rfind(". ", start, end))
|
||||
if boundary > start + max_chars // 2:
|
||||
end = boundary + 1
|
||||
chunk = text[start:end].strip()
|
||||
if chunk:
|
||||
chunks.append(chunk)
|
||||
if end >= len(text):
|
||||
break
|
||||
start = max(0, end - overlap_chars)
|
||||
return chunks
|
||||
|
||||
|
||||
def build_chunks(profile: str) -> list[Chunk]:
|
||||
base = project_knowledge_dir(profile)
|
||||
chunks: list[Chunk] = []
|
||||
for path in iter_markdown_files(base):
|
||||
raw = path.read_text(encoding="utf-8", errors="replace")
|
||||
rel = str(path.relative_to(ROOT))
|
||||
digest = hashlib.sha256(raw.encode("utf-8", errors="replace")).hexdigest()
|
||||
mtime = path.stat().st_mtime
|
||||
for section_index, (heading, section) in enumerate(split_sections(raw)):
|
||||
for chunk_index, chunk in enumerate(chunk_text(section)):
|
||||
chunk_digest = hashlib.sha256(f"{rel}\n{section_index}\n{chunk_index}\n{chunk}".encode("utf-8")).hexdigest()[:16]
|
||||
chunks.append(Chunk(chunk_id=chunk_digest, path=rel, heading=heading, text=chunk, mtime=mtime, sha256=digest))
|
||||
return chunks
|
||||
|
||||
|
||||
def write_index(profile: str) -> dict[str, Any]:
|
||||
out_dir = index_dir(profile)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
chunks = build_chunks(profile)
|
||||
with index_path(profile).open("w", encoding="utf-8") as handle:
|
||||
for chunk in chunks:
|
||||
handle.write(json.dumps(chunk.__dict__, ensure_ascii=False, sort_keys=True) + "\n")
|
||||
files = sorted({chunk.path for chunk in chunks})
|
||||
manifest = {
|
||||
"profile": profile,
|
||||
"source": str(project_knowledge_dir(profile).relative_to(ROOT)),
|
||||
"canonical": False,
|
||||
"derived_from": "project-knowledge",
|
||||
"index_type": "lexical-markdown-chunks",
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"file_count": len(files),
|
||||
"chunk_count": len(chunks),
|
||||
"index_path": str(index_path(profile).relative_to(ROOT)),
|
||||
}
|
||||
manifest_path(profile).write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||||
return manifest
|
||||
|
||||
|
||||
def read_index(profile: str) -> list[dict[str, Any]]:
|
||||
path = index_path(profile)
|
||||
if not path.is_file():
|
||||
return []
|
||||
rows: list[dict[str, Any]] = []
|
||||
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
rows.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return rows
|
||||
|
||||
|
||||
def score_chunk(query: str, query_tokens: set[str], chunk: dict[str, Any]) -> float:
|
||||
text = str(chunk.get("text") or "")
|
||||
haystack = f"{chunk.get('path', '')} {chunk.get('heading', '')} {text}".lower()
|
||||
exact = haystack.count(query.lower())
|
||||
chunk_tokens = tokens(haystack)
|
||||
overlap = len(query_tokens & chunk_tokens)
|
||||
if exact == 0 and overlap == 0:
|
||||
return 0.0
|
||||
heading_bonus = 1.5 if query.lower() in str(chunk.get("heading") or "").lower() else 0.0
|
||||
path_bonus = 1.0 if query.lower() in str(chunk.get("path") or "").lower() else 0.0
|
||||
return exact * 5.0 + overlap * 1.25 + heading_bonus + path_bonus
|
||||
|
||||
|
||||
def snippet_for(query: str, text: str, width: int = 520) -> str:
|
||||
lowered = text.lower()
|
||||
index = lowered.find(query.lower()) if query else -1
|
||||
if index < 0:
|
||||
query_terms = tokens(query)
|
||||
candidates = [lowered.find(term) for term in query_terms if lowered.find(term) >= 0]
|
||||
index = min(candidates) if candidates else 0
|
||||
start = max(0, index - width // 2)
|
||||
end = min(len(text), start + width)
|
||||
return normalize_space(text[start:end])
|
||||
|
||||
|
||||
def search_index(profile: str, query: str, limit: int = 10) -> dict[str, Any]:
|
||||
query = query.strip()
|
||||
if not query:
|
||||
raise SystemExit("query is required")
|
||||
rows = read_index(profile)
|
||||
query_tokens = tokens(query)
|
||||
scored: list[tuple[float, dict[str, Any]]] = []
|
||||
for row in rows:
|
||||
score = score_chunk(query, query_tokens, row)
|
||||
if score > 0:
|
||||
scored.append((score, row))
|
||||
scored.sort(key=lambda item: (-item[0], item[1].get("path", ""), item[1].get("chunk_id", "")))
|
||||
matches = []
|
||||
for score, row in scored[:limit]:
|
||||
matches.append({
|
||||
"score": round(score, 3),
|
||||
"path": row.get("path"),
|
||||
"heading": row.get("heading"),
|
||||
"chunk_id": row.get("chunk_id"),
|
||||
"snippet": snippet_for(query, str(row.get("text") or "")),
|
||||
"mtime": row.get("mtime"),
|
||||
"sha256": row.get("sha256"),
|
||||
})
|
||||
manifest = {}
|
||||
if manifest_path(profile).is_file():
|
||||
manifest = json.loads(manifest_path(profile).read_text(encoding="utf-8"))
|
||||
return {"profile": profile, "query": query, "canonical": False, "source": "derived-index", "manifest": manifest, "matches": matches}
|
||||
|
||||
|
||||
def status(profile: str) -> dict[str, Any]:
|
||||
manifest_file = manifest_path(profile)
|
||||
if not manifest_file.is_file():
|
||||
return {"profile": profile, "indexed": False, "index_path": str(index_path(profile).relative_to(ROOT))}
|
||||
manifest = json.loads(manifest_file.read_text(encoding="utf-8"))
|
||||
path = index_path(profile)
|
||||
manifest["indexed"] = path.is_file()
|
||||
manifest["index_bytes"] = path.stat().st_size if path.is_file() else 0
|
||||
manifest["age_seconds"] = int(time.time() - datetime.fromisoformat(manifest["created_at"]).timestamp()) if manifest.get("created_at") else None
|
||||
return manifest
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
for name in ["build", "status"]:
|
||||
command = subparsers.add_parser(name)
|
||||
command.add_argument("--profile", default=DEFAULT_PROFILE)
|
||||
search = subparsers.add_parser("search")
|
||||
search.add_argument("query")
|
||||
search.add_argument("--profile", default=DEFAULT_PROFILE)
|
||||
search.add_argument("--limit", type=int, default=10)
|
||||
args = parser.parse_args()
|
||||
if args.command == "build":
|
||||
payload = write_index(args.profile)
|
||||
elif args.command == "search":
|
||||
payload = search_index(args.profile, args.query, limit=max(1, min(args.limit, 50)))
|
||||
else:
|
||||
payload = status(args.profile)
|
||||
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user