Files

541 lines
20 KiB
Python

#!/usr/bin/env python3
"""AI Workspace local service manager.
This is the profile-aware lifecycle layer for local capture/query services. It is
intentionally small and dependency-free so it can run before the future desktop
UI or MCP server exists.
"""
from __future__ import annotations
import argparse
import json
import os
import signal
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
RUNTIME_DIR = ROOT / ".aiw" / "runtime"
PID_DIR = RUNTIME_DIR / "pids"
LOG_DIR = RUNTIME_DIR / "logs"
STATE_DIR = RUNTIME_DIR / "state"
DEFAULT_LOG_MAX_BYTES = 5 * 1024 * 1024
DEFAULT_LOG_BACKUPS = 3
DEFAULT_STOP_TIMEOUT_SECONDS = 5.0
DEFAULT_SERVICE_PATHS = [
"/opt/homebrew/bin",
"/opt/homebrew/sbin",
"/usr/local/bin",
"/usr/local/sbin",
"/usr/bin",
"/bin",
"/usr/sbin",
"/sbin",
]
@dataclass(frozen=True)
class ServiceRef:
name: str
config: dict[str, Any]
def ensure_runtime() -> None:
for path in [PID_DIR, LOG_DIR, STATE_DIR]:
path.mkdir(parents=True, exist_ok=True)
def manifest_path(profile: str) -> Path:
return ROOT / "profiles" / profile / "services.json"
def load_manifest(profile: str) -> dict[str, Any]:
path = manifest_path(profile)
if not path.is_file():
raise SystemExit(f"services manifest not found: {path}")
return json.loads(path.read_text(encoding="utf-8"))
def validate_manifest(manifest: dict[str, Any]) -> list[str]:
errors: list[str] = []
services = manifest.get("services")
if not isinstance(services, dict):
return ["manifest must contain a services object"]
for name, config in services.items():
if not isinstance(config, dict):
errors.append(f"{name}: service config must be an object")
continue
command = config.get("command")
if config.get("enabled", True) and (not isinstance(command, list) or not command):
errors.append(f"{name}: enabled services require a non-empty command list")
kind = config.get("kind", "process")
if kind not in {"process", "app-launcher", "mcp"}:
errors.append(f"{name}: unsupported kind {kind!r}")
restart = config.get("restart", "never")
if restart not in {"never", "on-failure", "always"}:
errors.append(f"{name}: unsupported restart policy {restart!r}")
for dependency in config.get("depends_on") or []:
if dependency not in services:
errors.append(f"{name}: depends on unknown service {dependency!r}")
return errors
def service_items(manifest: dict[str, Any], include_disabled: bool = False) -> list[ServiceRef]:
services = manifest.get("services") or {}
refs: list[ServiceRef] = []
for name, config in services.items():
if not include_disabled and not config.get("enabled", True):
continue
refs.append(ServiceRef(name, config))
return refs
def select_services(manifest: dict[str, Any], names: list[str], group: str | None, include_disabled: bool = False) -> list[ServiceRef]:
refs = service_items(manifest, include_disabled=include_disabled)
by_name = {ref.name: ref for ref in refs}
if names:
selected: list[ServiceRef] = []
missing: list[str] = []
for name in names:
ref = by_name.get(name)
if ref is None:
missing.append(name)
else:
selected.append(ref)
if missing:
raise SystemExit("unknown or disabled service(s): " + ", ".join(missing))
return selected
if group:
return [ref for ref in refs if group in (ref.config.get("groups") or [])]
return refs
def pid_path(profile: str, service: str) -> Path:
return PID_DIR / profile / f"{service}.pid"
def state_path(profile: str, service: str) -> Path:
return STATE_DIR / profile / f"{service}.json"
def log_path(profile: str, service: str) -> Path:
return LOG_DIR / profile / f"{service}.log"
def resolve_workspace_path(raw: str) -> Path:
path = Path(raw).expanduser()
return path if path.is_absolute() else ROOT / path
def service_env(profile: str | None = None) -> dict[str, str]:
"""Return a robust environment for services launched from shells or GUI apps.
macOS login items do not reliably inherit the user's interactive shell PATH.
Homebrew-installed tools such as mitmdump commonly live under /opt/homebrew/bin
or /usr/local/bin, so make those paths available to both direct service
commands and nested scripts.
"""
env = os.environ.copy()
existing = [part for part in env.get("PATH", "").split(os.pathsep) if part]
merged: list[str] = []
for part in [*existing, *DEFAULT_SERVICE_PATHS]:
if part not in merged:
merged.append(part)
env["PATH"] = os.pathsep.join(merged)
env.setdefault("AIW_WORKSPACE_ROOT", str(ROOT))
if profile:
env.setdefault("AIW_PROJECT_PROFILE", profile)
return env
def command_exists(command: str, env: dict[str, str] | None = None) -> bool:
if not command:
return False
path = Path(command)
if path.is_absolute() or "/" in command:
resolved = resolve_workspace_path(command)
return resolved.exists() and os.access(resolved, os.X_OK)
return shutil_which(command, env=env) is not None
def rotate_log_if_needed(path: Path, max_bytes: int = DEFAULT_LOG_MAX_BYTES, backups: int = DEFAULT_LOG_BACKUPS) -> None:
if max_bytes <= 0 or backups <= 0 or not path.exists() or path.stat().st_size < max_bytes:
return
oldest = path.with_suffix(path.suffix + f".{backups}")
oldest.unlink(missing_ok=True)
for index in range(backups - 1, 0, -1):
src = path.with_suffix(path.suffix + f".{index}")
dst = path.with_suffix(path.suffix + f".{index + 1}")
if src.exists():
src.replace(dst)
path.replace(path.with_suffix(path.suffix + ".1"))
def read_pid(profile: str, service: str) -> int | None:
path = pid_path(profile, service)
if not path.is_file():
return None
try:
return int(path.read_text(encoding="utf-8").strip())
except ValueError:
return None
def is_running(pid: int | None) -> bool:
if not pid or pid <= 0:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
try:
result = subprocess.run(["ps", "-o", "stat=", "-p", str(pid)], check=False, capture_output=True, text=True)
if result.returncode != 0:
return False
state = result.stdout.strip()
if state.startswith("Z"):
return False
except OSError:
pass
return True
def write_state(profile: str, service: str, state: dict[str, Any]) -> None:
path = state_path(profile, service)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(state, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def read_state(profile: str, service: str) -> dict[str, Any]:
path = state_path(profile, service)
if not path.is_file():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def health_ok(config: dict[str, Any], timeout: float = 1.0) -> tuple[bool | None, str]:
health = config.get("health") or {}
kind = health.get("type")
if not kind:
return None, "no health check"
if kind == "tcp":
host = str(health.get("host") or "127.0.0.1")
port = int(health.get("port") or 0)
try:
with socket.create_connection((host, port), timeout=timeout):
return True, f"tcp {host}:{port} ok"
except OSError as error:
return False, f"tcp {host}:{port} failed: {error}"
if kind == "http":
url = str(health.get("url") or "")
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
ok = 200 <= int(response.status) < 400
return ok, f"http {url} status {response.status}"
except (urllib.error.URLError, TimeoutError, OSError) as error:
return False, f"http {url} failed: {error}"
return None, f"unknown health type: {kind}"
def service_status(profile: str, ref: ServiceRef) -> dict[str, Any]:
enabled = ref.config.get("enabled", True)
kind = ref.config.get("kind", "process")
command = ref.config.get("command") or []
pid = read_pid(profile, ref.name) if enabled and kind != "app-launcher" else None
running = is_running(pid)
ok, detail = health_ok(ref.config) if enabled else (None, "health skipped")
if not enabled:
label = "disabled"
elif kind == "app-launcher":
label = "launcher"
elif running and ok is not False:
label = "running"
elif running:
label = "unhealthy"
elif ok is True:
label = "externally running"
else:
label = "stopped"
return {
"name": ref.name,
"enabled": enabled,
"kind": kind,
"status": label,
"pid": pid,
"command": command,
"health": {"ok": ok, "detail": detail},
"state": read_state(profile, ref.name),
}
def wait_for_health(config: dict[str, Any], seconds: float = 8.0) -> tuple[bool | None, str]:
deadline = time.time() + seconds
last: tuple[bool | None, str] = (None, "no health check")
while time.time() <= deadline:
last = health_ok(config)
if last[0] is True or last[0] is None:
return last
time.sleep(0.4)
return last
def start_service(profile: str, ref: ServiceRef, manifest: dict[str, Any], started: set[str]) -> None:
if ref.name in started:
return
for dependency in ref.config.get("depends_on") or []:
dep_config = (manifest.get("services") or {}).get(dependency)
if not dep_config or not dep_config.get("enabled", True):
raise SystemExit(f"{ref.name} depends on missing/disabled service: {dependency}")
start_service(profile, ServiceRef(dependency, dep_config), manifest, started)
kind = ref.config.get("kind", "process")
command = ref.config.get("command") or []
env = service_env(profile)
if not command:
raise SystemExit(f"{ref.name} has no command")
if not command_exists(str(command[0]), env=env):
raise SystemExit(f"{ref.name} command is not executable or not found: {command[0]}")
if kind != "app-launcher":
pid = read_pid(profile, ref.name)
if is_running(pid):
ok, detail = health_ok(ref.config)
status = "running" if ok is not False else "running unhealthy"
print(f"{ref.name}: {status} ({detail})")
started.add(ref.name)
return
ok, detail = health_ok(ref.config)
if ok is True:
print(f"{ref.name}: externally running ({detail}); not starting duplicate")
started.add(ref.name)
return
path = log_path(profile, ref.name)
path.parent.mkdir(parents=True, exist_ok=True)
rotate_log_if_needed(path, int(ref.config.get("log_max_bytes", DEFAULT_LOG_MAX_BYTES)), int(ref.config.get("log_backups", DEFAULT_LOG_BACKUPS)))
with path.open("ab") as log_file:
log_file.write(f"\n--- start {time.strftime('%Y-%m-%d %H:%M:%S')} ---\n".encode("utf-8"))
if kind == "app-launcher":
result = subprocess.run(command, cwd=ROOT, env=env, stdout=log_file, stderr=subprocess.STDOUT, check=False)
write_state(profile, ref.name, {"last_launch_exit": result.returncode, "launched_at": time.time()})
print(f"{ref.name}: launched (exit {result.returncode})")
else:
process = subprocess.Popen(command, cwd=ROOT, env=env, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True)
pid_file = pid_path(profile, ref.name)
pid_file.parent.mkdir(parents=True, exist_ok=True)
pid_file.write_text(str(process.pid) + "\n", encoding="utf-8")
ok, detail = wait_for_health(ref.config)
state = "started" if ok is not False else "started but health check failed"
write_state(profile, ref.name, {"pid": process.pid, "started_at": time.time(), "health": detail})
print(f"{ref.name}: {state} pid={process.pid} ({detail})")
started.add(ref.name)
def stop_service(profile: str, ref: ServiceRef) -> None:
kind = ref.config.get("kind", "process")
if kind == "app-launcher":
print(f"{ref.name}: launcher service has no managed process")
return
pid = read_pid(profile, ref.name)
if not is_running(pid):
ok, detail = health_ok(ref.config)
if ok is True:
print(f"{ref.name}: externally running ({detail}); no managed pid to stop")
return
print(f"{ref.name}: not running")
pid_path(profile, ref.name).unlink(missing_ok=True)
return
assert pid is not None
try:
os.killpg(pid, signal.SIGTERM)
except ProcessLookupError:
pass
except PermissionError:
os.kill(pid, signal.SIGTERM)
deadline = time.time() + float(ref.config.get("stop_timeout_seconds", DEFAULT_STOP_TIMEOUT_SECONDS))
while time.time() < deadline and is_running(pid):
time.sleep(0.2)
if is_running(pid):
try:
os.killpg(pid, signal.SIGKILL)
except Exception:
os.kill(pid, signal.SIGKILL)
print(f"{ref.name}: killed pid={pid}")
else:
print(f"{ref.name}: stopped")
pid_path(profile, ref.name).unlink(missing_ok=True)
write_state(profile, ref.name, {"stopped_at": time.time()})
def status_service(profile: str, ref: ServiceRef) -> None:
status = service_status(profile, ref)
if not status["enabled"]:
print(f"{ref.name}: disabled")
return
if status["kind"] == "app-launcher":
state = status["state"]
launched = state.get("launched_at")
suffix = f"last launched {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(launched))}" if launched else "not launched by manager"
print(f"{ref.name}: launcher ({suffix})")
return
print(f"{ref.name}: {status['status']} pid={status['pid'] or '-'} ({status['health']['detail']})")
def status_report(profile: str, refs: list[ServiceRef]) -> dict[str, Any]:
"""Return lightweight machine-readable live service state."""
return {
"profile": profile,
"workspace": str(ROOT),
"runtime": str(RUNTIME_DIR),
"services": [service_status(profile, ref) for ref in refs],
}
def tail_log(profile: str, service: str, lines: int) -> None:
path = log_path(profile, service)
if not path.is_file():
print(f"no log file: {path}")
return
content = path.read_text(encoding="utf-8", errors="replace").splitlines()
for line in content[-lines:]:
print(line)
def doctor_report(profile: str, manifest: dict[str, Any]) -> dict[str, Any]:
errors = validate_manifest(manifest)
env = service_env(profile)
service_reports = []
for ref in service_items(manifest, include_disabled=True):
command = ref.config.get("command") or []
first = command[0] if command else ""
doctor = ref.config.get("doctor") or {}
checks = []
for command_name in doctor.get("required_commands") or []:
checks.append({"type": "required_command", "name": command_name, "ok": command_exists(command_name, env=env)})
for command_name in doctor.get("optional_commands") or []:
checks.append({"type": "optional_command", "name": command_name, "ok": command_exists(command_name, env=env)})
for raw_path in doctor.get("required_paths") or []:
checks.append({"type": "required_path", "name": str(raw_path), "ok": resolve_workspace_path(str(raw_path)).exists()})
for raw_path in doctor.get("optional_paths") or []:
checks.append({"type": "optional_path", "name": str(raw_path), "ok": resolve_workspace_path(str(raw_path)).exists()})
status = service_status(profile, ref)
status["command_ok"] = command_exists(first, env=env) if first else False
status["checks"] = checks
service_reports.append(status)
return {
"profile": profile,
"workspace": str(ROOT),
"manifest": str(manifest_path(profile)),
"runtime": str(RUNTIME_DIR),
"manifest_ok": not errors,
"manifest_errors": errors,
"services": service_reports,
}
def run_doctor(profile: str, manifest: dict[str, Any], json_output: bool = False) -> None:
report = doctor_report(profile, manifest)
if json_output:
print(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True))
return
print(f"AI Workspace doctor profile={profile}")
print(f"workspace: {ROOT}")
print(f"manifest: {manifest_path(profile)}")
ensure_runtime()
print(f"runtime: {RUNTIME_DIR}")
errors = report["manifest_errors"]
if errors:
print("manifest: invalid")
for error in errors:
print(f" ! {error}")
else:
print("manifest: ok")
for service in report["services"]:
enabled_text = "enabled" if service["enabled"] else "disabled"
if not service["enabled"]:
print(f"- {service['name']}: {enabled_text}; command={'ok' if service['command_ok'] else 'missing'}; health skipped")
continue
health_text = service["health"]["detail"] if service["health"]["ok"] is not None else "no health check"
print(f"- {service['name']}: {enabled_text}; command={'ok' if service['command_ok'] else 'missing'}; {health_text}")
for check in service["checks"]:
label = check["type"].replace("_", " ")
print(f" {label} {check['name']}: {'ok' if check['ok'] else 'missing'}")
def shutil_which(command: str, env: dict[str, str] | None = None) -> str | None:
paths = (env or os.environ).get("PATH", "").split(os.pathsep)
for directory in paths:
candidate = Path(directory) / command
if candidate.exists() and os.access(candidate, os.X_OK):
return str(candidate)
return None
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("action", choices=["start", "stop", "restart", "status", "logs", "doctor"])
parser.add_argument("services", nargs="*", help="Optional service names for start/stop/restart/status/logs.")
parser.add_argument("--profile", default=os.getenv("AIW_PROJECT_PROFILE", "fidelity"))
parser.add_argument("--group", default="", help="Start/stop/status services in a group, e.g. communication or inbox.")
parser.add_argument("--lines", type=int, default=80, help="Number of log lines for logs action.")
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON for supported actions such as doctor.")
args = parser.parse_args()
ensure_runtime()
manifest = load_manifest(args.profile)
if args.action == "doctor":
run_doctor(args.profile, manifest, json_output=args.json)
return
errors = validate_manifest(manifest)
if errors:
raise SystemExit("invalid services manifest:\n" + "\n".join(f"- {error}" for error in errors))
include_disabled = args.action == "status"
refs = select_services(manifest, args.services, args.group or None, include_disabled=include_disabled)
if args.action == "start":
started: set[str] = set()
for ref in refs:
start_service(args.profile, ref, manifest, started)
elif args.action == "stop":
for ref in reversed(refs):
stop_service(args.profile, ref)
elif args.action == "restart":
for ref in reversed(refs):
stop_service(args.profile, ref)
started = set()
for ref in refs:
start_service(args.profile, ref, manifest, started)
elif args.action == "status":
if args.json:
print(json.dumps(status_report(args.profile, refs), ensure_ascii=False, indent=2, sort_keys=True))
else:
for ref in refs:
status_service(args.profile, ref)
elif args.action == "logs":
if not args.services:
raise SystemExit("logs requires at least one service name")
for service in args.services:
print(f"==> {service} <==")
tail_log(args.profile, service, args.lines)
if __name__ == "__main__":
main()