541 lines
20 KiB
Python
541 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""AI Workspace local service manager.
|
|
|
|
This is the profile-aware lifecycle layer for local capture/query services. It is
|
|
intentionally small and dependency-free so it can run before the future desktop
|
|
UI or MCP server exists.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
RUNTIME_DIR = ROOT / ".aiw" / "runtime"
|
|
PID_DIR = RUNTIME_DIR / "pids"
|
|
LOG_DIR = RUNTIME_DIR / "logs"
|
|
STATE_DIR = RUNTIME_DIR / "state"
|
|
DEFAULT_LOG_MAX_BYTES = 5 * 1024 * 1024
|
|
DEFAULT_LOG_BACKUPS = 3
|
|
DEFAULT_STOP_TIMEOUT_SECONDS = 5.0
|
|
DEFAULT_SERVICE_PATHS = [
|
|
"/opt/homebrew/bin",
|
|
"/opt/homebrew/sbin",
|
|
"/usr/local/bin",
|
|
"/usr/local/sbin",
|
|
"/usr/bin",
|
|
"/bin",
|
|
"/usr/sbin",
|
|
"/sbin",
|
|
]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ServiceRef:
|
|
name: str
|
|
config: dict[str, Any]
|
|
|
|
|
|
def ensure_runtime() -> None:
|
|
for path in [PID_DIR, LOG_DIR, STATE_DIR]:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def manifest_path(profile: str) -> Path:
|
|
return ROOT / "profiles" / profile / "services.json"
|
|
|
|
|
|
def load_manifest(profile: str) -> dict[str, Any]:
|
|
path = manifest_path(profile)
|
|
if not path.is_file():
|
|
raise SystemExit(f"services manifest not found: {path}")
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def validate_manifest(manifest: dict[str, Any]) -> list[str]:
|
|
errors: list[str] = []
|
|
services = manifest.get("services")
|
|
if not isinstance(services, dict):
|
|
return ["manifest must contain a services object"]
|
|
for name, config in services.items():
|
|
if not isinstance(config, dict):
|
|
errors.append(f"{name}: service config must be an object")
|
|
continue
|
|
command = config.get("command")
|
|
if config.get("enabled", True) and (not isinstance(command, list) or not command):
|
|
errors.append(f"{name}: enabled services require a non-empty command list")
|
|
kind = config.get("kind", "process")
|
|
if kind not in {"process", "app-launcher", "mcp"}:
|
|
errors.append(f"{name}: unsupported kind {kind!r}")
|
|
restart = config.get("restart", "never")
|
|
if restart not in {"never", "on-failure", "always"}:
|
|
errors.append(f"{name}: unsupported restart policy {restart!r}")
|
|
for dependency in config.get("depends_on") or []:
|
|
if dependency not in services:
|
|
errors.append(f"{name}: depends on unknown service {dependency!r}")
|
|
return errors
|
|
|
|
|
|
def service_items(manifest: dict[str, Any], include_disabled: bool = False) -> list[ServiceRef]:
|
|
services = manifest.get("services") or {}
|
|
refs: list[ServiceRef] = []
|
|
for name, config in services.items():
|
|
if not include_disabled and not config.get("enabled", True):
|
|
continue
|
|
refs.append(ServiceRef(name, config))
|
|
return refs
|
|
|
|
|
|
def select_services(manifest: dict[str, Any], names: list[str], group: str | None, include_disabled: bool = False) -> list[ServiceRef]:
|
|
refs = service_items(manifest, include_disabled=include_disabled)
|
|
by_name = {ref.name: ref for ref in refs}
|
|
if names:
|
|
selected: list[ServiceRef] = []
|
|
missing: list[str] = []
|
|
for name in names:
|
|
ref = by_name.get(name)
|
|
if ref is None:
|
|
missing.append(name)
|
|
else:
|
|
selected.append(ref)
|
|
if missing:
|
|
raise SystemExit("unknown or disabled service(s): " + ", ".join(missing))
|
|
return selected
|
|
if group:
|
|
return [ref for ref in refs if group in (ref.config.get("groups") or [])]
|
|
return refs
|
|
|
|
|
|
def pid_path(profile: str, service: str) -> Path:
|
|
return PID_DIR / profile / f"{service}.pid"
|
|
|
|
|
|
def state_path(profile: str, service: str) -> Path:
|
|
return STATE_DIR / profile / f"{service}.json"
|
|
|
|
|
|
def log_path(profile: str, service: str) -> Path:
|
|
return LOG_DIR / profile / f"{service}.log"
|
|
|
|
|
|
def resolve_workspace_path(raw: str) -> Path:
|
|
path = Path(raw).expanduser()
|
|
return path if path.is_absolute() else ROOT / path
|
|
|
|
|
|
def service_env(profile: str | None = None) -> dict[str, str]:
|
|
"""Return a robust environment for services launched from shells or GUI apps.
|
|
|
|
macOS login items do not reliably inherit the user's interactive shell PATH.
|
|
Homebrew-installed tools such as mitmdump commonly live under /opt/homebrew/bin
|
|
or /usr/local/bin, so make those paths available to both direct service
|
|
commands and nested scripts.
|
|
"""
|
|
env = os.environ.copy()
|
|
existing = [part for part in env.get("PATH", "").split(os.pathsep) if part]
|
|
merged: list[str] = []
|
|
for part in [*existing, *DEFAULT_SERVICE_PATHS]:
|
|
if part not in merged:
|
|
merged.append(part)
|
|
env["PATH"] = os.pathsep.join(merged)
|
|
env.setdefault("AIW_WORKSPACE_ROOT", str(ROOT))
|
|
if profile:
|
|
env.setdefault("AIW_PROJECT_PROFILE", profile)
|
|
return env
|
|
|
|
|
|
def command_exists(command: str, env: dict[str, str] | None = None) -> bool:
|
|
if not command:
|
|
return False
|
|
path = Path(command)
|
|
if path.is_absolute() or "/" in command:
|
|
resolved = resolve_workspace_path(command)
|
|
return resolved.exists() and os.access(resolved, os.X_OK)
|
|
return shutil_which(command, env=env) is not None
|
|
|
|
|
|
def rotate_log_if_needed(path: Path, max_bytes: int = DEFAULT_LOG_MAX_BYTES, backups: int = DEFAULT_LOG_BACKUPS) -> None:
|
|
if max_bytes <= 0 or backups <= 0 or not path.exists() or path.stat().st_size < max_bytes:
|
|
return
|
|
oldest = path.with_suffix(path.suffix + f".{backups}")
|
|
oldest.unlink(missing_ok=True)
|
|
for index in range(backups - 1, 0, -1):
|
|
src = path.with_suffix(path.suffix + f".{index}")
|
|
dst = path.with_suffix(path.suffix + f".{index + 1}")
|
|
if src.exists():
|
|
src.replace(dst)
|
|
path.replace(path.with_suffix(path.suffix + ".1"))
|
|
|
|
|
|
def read_pid(profile: str, service: str) -> int | None:
|
|
path = pid_path(profile, service)
|
|
if not path.is_file():
|
|
return None
|
|
try:
|
|
return int(path.read_text(encoding="utf-8").strip())
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def is_running(pid: int | None) -> bool:
|
|
if not pid or pid <= 0:
|
|
return False
|
|
try:
|
|
os.kill(pid, 0)
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return True
|
|
try:
|
|
result = subprocess.run(["ps", "-o", "stat=", "-p", str(pid)], check=False, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return False
|
|
state = result.stdout.strip()
|
|
if state.startswith("Z"):
|
|
return False
|
|
except OSError:
|
|
pass
|
|
return True
|
|
|
|
|
|
def write_state(profile: str, service: str, state: dict[str, Any]) -> None:
|
|
path = state_path(profile, service)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(state, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
|
|
|
|
def read_state(profile: str, service: str) -> dict[str, Any]:
|
|
path = state_path(profile, service)
|
|
if not path.is_file():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def health_ok(config: dict[str, Any], timeout: float = 1.0) -> tuple[bool | None, str]:
|
|
health = config.get("health") or {}
|
|
kind = health.get("type")
|
|
if not kind:
|
|
return None, "no health check"
|
|
if kind == "tcp":
|
|
host = str(health.get("host") or "127.0.0.1")
|
|
port = int(health.get("port") or 0)
|
|
try:
|
|
with socket.create_connection((host, port), timeout=timeout):
|
|
return True, f"tcp {host}:{port} ok"
|
|
except OSError as error:
|
|
return False, f"tcp {host}:{port} failed: {error}"
|
|
if kind == "http":
|
|
url = str(health.get("url") or "")
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=timeout) as response:
|
|
ok = 200 <= int(response.status) < 400
|
|
return ok, f"http {url} status {response.status}"
|
|
except (urllib.error.URLError, TimeoutError, OSError) as error:
|
|
return False, f"http {url} failed: {error}"
|
|
return None, f"unknown health type: {kind}"
|
|
|
|
|
|
def service_status(profile: str, ref: ServiceRef) -> dict[str, Any]:
|
|
enabled = ref.config.get("enabled", True)
|
|
kind = ref.config.get("kind", "process")
|
|
command = ref.config.get("command") or []
|
|
pid = read_pid(profile, ref.name) if enabled and kind != "app-launcher" else None
|
|
running = is_running(pid)
|
|
ok, detail = health_ok(ref.config) if enabled else (None, "health skipped")
|
|
if not enabled:
|
|
label = "disabled"
|
|
elif kind == "app-launcher":
|
|
label = "launcher"
|
|
elif running and ok is not False:
|
|
label = "running"
|
|
elif running:
|
|
label = "unhealthy"
|
|
elif ok is True:
|
|
label = "externally running"
|
|
else:
|
|
label = "stopped"
|
|
return {
|
|
"name": ref.name,
|
|
"enabled": enabled,
|
|
"kind": kind,
|
|
"status": label,
|
|
"pid": pid,
|
|
"command": command,
|
|
"health": {"ok": ok, "detail": detail},
|
|
"state": read_state(profile, ref.name),
|
|
}
|
|
|
|
|
|
def wait_for_health(config: dict[str, Any], seconds: float = 8.0) -> tuple[bool | None, str]:
|
|
deadline = time.time() + seconds
|
|
last: tuple[bool | None, str] = (None, "no health check")
|
|
while time.time() <= deadline:
|
|
last = health_ok(config)
|
|
if last[0] is True or last[0] is None:
|
|
return last
|
|
time.sleep(0.4)
|
|
return last
|
|
|
|
|
|
def start_service(profile: str, ref: ServiceRef, manifest: dict[str, Any], started: set[str]) -> None:
|
|
if ref.name in started:
|
|
return
|
|
for dependency in ref.config.get("depends_on") or []:
|
|
dep_config = (manifest.get("services") or {}).get(dependency)
|
|
if not dep_config or not dep_config.get("enabled", True):
|
|
raise SystemExit(f"{ref.name} depends on missing/disabled service: {dependency}")
|
|
start_service(profile, ServiceRef(dependency, dep_config), manifest, started)
|
|
|
|
kind = ref.config.get("kind", "process")
|
|
command = ref.config.get("command") or []
|
|
env = service_env(profile)
|
|
if not command:
|
|
raise SystemExit(f"{ref.name} has no command")
|
|
if not command_exists(str(command[0]), env=env):
|
|
raise SystemExit(f"{ref.name} command is not executable or not found: {command[0]}")
|
|
|
|
if kind != "app-launcher":
|
|
pid = read_pid(profile, ref.name)
|
|
if is_running(pid):
|
|
ok, detail = health_ok(ref.config)
|
|
status = "running" if ok is not False else "running unhealthy"
|
|
print(f"{ref.name}: {status} ({detail})")
|
|
started.add(ref.name)
|
|
return
|
|
ok, detail = health_ok(ref.config)
|
|
if ok is True:
|
|
print(f"{ref.name}: externally running ({detail}); not starting duplicate")
|
|
started.add(ref.name)
|
|
return
|
|
|
|
path = log_path(profile, ref.name)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
rotate_log_if_needed(path, int(ref.config.get("log_max_bytes", DEFAULT_LOG_MAX_BYTES)), int(ref.config.get("log_backups", DEFAULT_LOG_BACKUPS)))
|
|
with path.open("ab") as log_file:
|
|
log_file.write(f"\n--- start {time.strftime('%Y-%m-%d %H:%M:%S')} ---\n".encode("utf-8"))
|
|
if kind == "app-launcher":
|
|
result = subprocess.run(command, cwd=ROOT, env=env, stdout=log_file, stderr=subprocess.STDOUT, check=False)
|
|
write_state(profile, ref.name, {"last_launch_exit": result.returncode, "launched_at": time.time()})
|
|
print(f"{ref.name}: launched (exit {result.returncode})")
|
|
else:
|
|
process = subprocess.Popen(command, cwd=ROOT, env=env, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True)
|
|
pid_file = pid_path(profile, ref.name)
|
|
pid_file.parent.mkdir(parents=True, exist_ok=True)
|
|
pid_file.write_text(str(process.pid) + "\n", encoding="utf-8")
|
|
ok, detail = wait_for_health(ref.config)
|
|
state = "started" if ok is not False else "started but health check failed"
|
|
write_state(profile, ref.name, {"pid": process.pid, "started_at": time.time(), "health": detail})
|
|
print(f"{ref.name}: {state} pid={process.pid} ({detail})")
|
|
started.add(ref.name)
|
|
|
|
|
|
def stop_service(profile: str, ref: ServiceRef) -> None:
|
|
kind = ref.config.get("kind", "process")
|
|
if kind == "app-launcher":
|
|
print(f"{ref.name}: launcher service has no managed process")
|
|
return
|
|
pid = read_pid(profile, ref.name)
|
|
if not is_running(pid):
|
|
ok, detail = health_ok(ref.config)
|
|
if ok is True:
|
|
print(f"{ref.name}: externally running ({detail}); no managed pid to stop")
|
|
return
|
|
print(f"{ref.name}: not running")
|
|
pid_path(profile, ref.name).unlink(missing_ok=True)
|
|
return
|
|
assert pid is not None
|
|
try:
|
|
os.killpg(pid, signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
pass
|
|
except PermissionError:
|
|
os.kill(pid, signal.SIGTERM)
|
|
deadline = time.time() + float(ref.config.get("stop_timeout_seconds", DEFAULT_STOP_TIMEOUT_SECONDS))
|
|
while time.time() < deadline and is_running(pid):
|
|
time.sleep(0.2)
|
|
if is_running(pid):
|
|
try:
|
|
os.killpg(pid, signal.SIGKILL)
|
|
except Exception:
|
|
os.kill(pid, signal.SIGKILL)
|
|
print(f"{ref.name}: killed pid={pid}")
|
|
else:
|
|
print(f"{ref.name}: stopped")
|
|
pid_path(profile, ref.name).unlink(missing_ok=True)
|
|
write_state(profile, ref.name, {"stopped_at": time.time()})
|
|
|
|
|
|
def status_service(profile: str, ref: ServiceRef) -> None:
|
|
status = service_status(profile, ref)
|
|
if not status["enabled"]:
|
|
print(f"{ref.name}: disabled")
|
|
return
|
|
if status["kind"] == "app-launcher":
|
|
state = status["state"]
|
|
launched = state.get("launched_at")
|
|
suffix = f"last launched {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(launched))}" if launched else "not launched by manager"
|
|
print(f"{ref.name}: launcher ({suffix})")
|
|
return
|
|
print(f"{ref.name}: {status['status']} pid={status['pid'] or '-'} ({status['health']['detail']})")
|
|
|
|
|
|
def status_report(profile: str, refs: list[ServiceRef]) -> dict[str, Any]:
|
|
"""Return lightweight machine-readable live service state."""
|
|
return {
|
|
"profile": profile,
|
|
"workspace": str(ROOT),
|
|
"runtime": str(RUNTIME_DIR),
|
|
"services": [service_status(profile, ref) for ref in refs],
|
|
}
|
|
|
|
|
|
def tail_log(profile: str, service: str, lines: int) -> None:
|
|
path = log_path(profile, service)
|
|
if not path.is_file():
|
|
print(f"no log file: {path}")
|
|
return
|
|
content = path.read_text(encoding="utf-8", errors="replace").splitlines()
|
|
for line in content[-lines:]:
|
|
print(line)
|
|
|
|
|
|
def doctor_report(profile: str, manifest: dict[str, Any]) -> dict[str, Any]:
|
|
errors = validate_manifest(manifest)
|
|
env = service_env(profile)
|
|
service_reports = []
|
|
for ref in service_items(manifest, include_disabled=True):
|
|
command = ref.config.get("command") or []
|
|
first = command[0] if command else ""
|
|
doctor = ref.config.get("doctor") or {}
|
|
checks = []
|
|
for command_name in doctor.get("required_commands") or []:
|
|
checks.append({"type": "required_command", "name": command_name, "ok": command_exists(command_name, env=env)})
|
|
for command_name in doctor.get("optional_commands") or []:
|
|
checks.append({"type": "optional_command", "name": command_name, "ok": command_exists(command_name, env=env)})
|
|
for raw_path in doctor.get("required_paths") or []:
|
|
checks.append({"type": "required_path", "name": str(raw_path), "ok": resolve_workspace_path(str(raw_path)).exists()})
|
|
for raw_path in doctor.get("optional_paths") or []:
|
|
checks.append({"type": "optional_path", "name": str(raw_path), "ok": resolve_workspace_path(str(raw_path)).exists()})
|
|
status = service_status(profile, ref)
|
|
status["command_ok"] = command_exists(first, env=env) if first else False
|
|
status["checks"] = checks
|
|
service_reports.append(status)
|
|
return {
|
|
"profile": profile,
|
|
"workspace": str(ROOT),
|
|
"manifest": str(manifest_path(profile)),
|
|
"runtime": str(RUNTIME_DIR),
|
|
"manifest_ok": not errors,
|
|
"manifest_errors": errors,
|
|
"services": service_reports,
|
|
}
|
|
|
|
|
|
def run_doctor(profile: str, manifest: dict[str, Any], json_output: bool = False) -> None:
|
|
report = doctor_report(profile, manifest)
|
|
if json_output:
|
|
print(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True))
|
|
return
|
|
print(f"AI Workspace doctor profile={profile}")
|
|
print(f"workspace: {ROOT}")
|
|
print(f"manifest: {manifest_path(profile)}")
|
|
ensure_runtime()
|
|
print(f"runtime: {RUNTIME_DIR}")
|
|
errors = report["manifest_errors"]
|
|
if errors:
|
|
print("manifest: invalid")
|
|
for error in errors:
|
|
print(f" ! {error}")
|
|
else:
|
|
print("manifest: ok")
|
|
for service in report["services"]:
|
|
enabled_text = "enabled" if service["enabled"] else "disabled"
|
|
if not service["enabled"]:
|
|
print(f"- {service['name']}: {enabled_text}; command={'ok' if service['command_ok'] else 'missing'}; health skipped")
|
|
continue
|
|
health_text = service["health"]["detail"] if service["health"]["ok"] is not None else "no health check"
|
|
print(f"- {service['name']}: {enabled_text}; command={'ok' if service['command_ok'] else 'missing'}; {health_text}")
|
|
for check in service["checks"]:
|
|
label = check["type"].replace("_", " ")
|
|
print(f" {label} {check['name']}: {'ok' if check['ok'] else 'missing'}")
|
|
|
|
|
|
def shutil_which(command: str, env: dict[str, str] | None = None) -> str | None:
|
|
paths = (env or os.environ).get("PATH", "").split(os.pathsep)
|
|
for directory in paths:
|
|
candidate = Path(directory) / command
|
|
if candidate.exists() and os.access(candidate, os.X_OK):
|
|
return str(candidate)
|
|
return None
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("action", choices=["start", "stop", "restart", "status", "logs", "doctor"])
|
|
parser.add_argument("services", nargs="*", help="Optional service names for start/stop/restart/status/logs.")
|
|
parser.add_argument("--profile", default=os.getenv("AIW_PROJECT_PROFILE", "fidelity"))
|
|
parser.add_argument("--group", default="", help="Start/stop/status services in a group, e.g. communication or inbox.")
|
|
parser.add_argument("--lines", type=int, default=80, help="Number of log lines for logs action.")
|
|
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON for supported actions such as doctor.")
|
|
args = parser.parse_args()
|
|
|
|
ensure_runtime()
|
|
manifest = load_manifest(args.profile)
|
|
|
|
if args.action == "doctor":
|
|
run_doctor(args.profile, manifest, json_output=args.json)
|
|
return
|
|
|
|
errors = validate_manifest(manifest)
|
|
if errors:
|
|
raise SystemExit("invalid services manifest:\n" + "\n".join(f"- {error}" for error in errors))
|
|
|
|
include_disabled = args.action == "status"
|
|
refs = select_services(manifest, args.services, args.group or None, include_disabled=include_disabled)
|
|
|
|
if args.action == "start":
|
|
started: set[str] = set()
|
|
for ref in refs:
|
|
start_service(args.profile, ref, manifest, started)
|
|
elif args.action == "stop":
|
|
for ref in reversed(refs):
|
|
stop_service(args.profile, ref)
|
|
elif args.action == "restart":
|
|
for ref in reversed(refs):
|
|
stop_service(args.profile, ref)
|
|
started = set()
|
|
for ref in refs:
|
|
start_service(args.profile, ref, manifest, started)
|
|
elif args.action == "status":
|
|
if args.json:
|
|
print(json.dumps(status_report(args.profile, refs), ensure_ascii=False, indent=2, sort_keys=True))
|
|
else:
|
|
for ref in refs:
|
|
status_service(args.profile, ref)
|
|
elif args.action == "logs":
|
|
if not args.services:
|
|
raise SystemExit("logs requires at least one service name")
|
|
for service in args.services:
|
|
print(f"==> {service} <==")
|
|
tail_log(args.profile, service, args.lines)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|