#!/usr/bin/env python3 """AI Workspace local service manager. This is the profile-aware lifecycle layer for local capture/query services. It is intentionally small and dependency-free so it can run before the future desktop UI or MCP server exists. """ from __future__ import annotations import argparse import json import os import signal import socket import subprocess import sys import time import urllib.error import urllib.request from dataclasses import dataclass from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] RUNTIME_DIR = ROOT / ".aiw" / "runtime" PID_DIR = RUNTIME_DIR / "pids" LOG_DIR = RUNTIME_DIR / "logs" STATE_DIR = RUNTIME_DIR / "state" DEFAULT_LOG_MAX_BYTES = 5 * 1024 * 1024 DEFAULT_LOG_BACKUPS = 3 DEFAULT_STOP_TIMEOUT_SECONDS = 5.0 @dataclass(frozen=True) class ServiceRef: name: str config: dict[str, Any] def ensure_runtime() -> None: for path in [PID_DIR, LOG_DIR, STATE_DIR]: path.mkdir(parents=True, exist_ok=True) def manifest_path(profile: str) -> Path: return ROOT / "profiles" / profile / "services.json" def load_manifest(profile: str) -> dict[str, Any]: path = manifest_path(profile) if not path.is_file(): raise SystemExit(f"services manifest not found: {path}") return json.loads(path.read_text(encoding="utf-8")) def validate_manifest(manifest: dict[str, Any]) -> list[str]: errors: list[str] = [] services = manifest.get("services") if not isinstance(services, dict): return ["manifest must contain a services object"] for name, config in services.items(): if not isinstance(config, dict): errors.append(f"{name}: service config must be an object") continue command = config.get("command") if config.get("enabled", True) and (not isinstance(command, list) or not command): errors.append(f"{name}: enabled services require a non-empty command list") kind = config.get("kind", "process") if kind not in {"process", "app-launcher", "mcp"}: errors.append(f"{name}: unsupported kind {kind!r}") restart = config.get("restart", "never") if restart not in {"never", "on-failure", "always"}: errors.append(f"{name}: unsupported restart policy {restart!r}") for dependency in config.get("depends_on") or []: if dependency not in services: errors.append(f"{name}: depends on unknown service {dependency!r}") return errors def service_items(manifest: dict[str, Any], include_disabled: bool = False) -> list[ServiceRef]: services = manifest.get("services") or {} refs: list[ServiceRef] = [] for name, config in services.items(): if not include_disabled and not config.get("enabled", True): continue refs.append(ServiceRef(name, config)) return refs def select_services(manifest: dict[str, Any], names: list[str], group: str | None, include_disabled: bool = False) -> list[ServiceRef]: refs = service_items(manifest, include_disabled=include_disabled) by_name = {ref.name: ref for ref in refs} if names: selected: list[ServiceRef] = [] missing: list[str] = [] for name in names: ref = by_name.get(name) if ref is None: missing.append(name) else: selected.append(ref) if missing: raise SystemExit("unknown or disabled service(s): " + ", ".join(missing)) return selected if group: return [ref for ref in refs if group in (ref.config.get("groups") or [])] return refs def pid_path(profile: str, service: str) -> Path: return PID_DIR / profile / f"{service}.pid" def state_path(profile: str, service: str) -> Path: return STATE_DIR / profile / f"{service}.json" def log_path(profile: str, service: str) -> Path: return LOG_DIR / profile / f"{service}.log" def resolve_workspace_path(raw: str) -> Path: path = Path(raw).expanduser() return path if path.is_absolute() else ROOT / path def command_exists(command: str) -> bool: if not command: return False path = Path(command) if path.is_absolute() or "/" in command: resolved = resolve_workspace_path(command) return resolved.exists() and os.access(resolved, os.X_OK) return shutil_which(command) is not None def rotate_log_if_needed(path: Path, max_bytes: int = DEFAULT_LOG_MAX_BYTES, backups: int = DEFAULT_LOG_BACKUPS) -> None: if max_bytes <= 0 or backups <= 0 or not path.exists() or path.stat().st_size < max_bytes: return oldest = path.with_suffix(path.suffix + f".{backups}") oldest.unlink(missing_ok=True) for index in range(backups - 1, 0, -1): src = path.with_suffix(path.suffix + f".{index}") dst = path.with_suffix(path.suffix + f".{index + 1}") if src.exists(): src.replace(dst) path.replace(path.with_suffix(path.suffix + ".1")) def read_pid(profile: str, service: str) -> int | None: path = pid_path(profile, service) if not path.is_file(): return None try: return int(path.read_text(encoding="utf-8").strip()) except ValueError: return None def is_running(pid: int | None) -> bool: if not pid or pid <= 0: return False try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: return True try: result = subprocess.run(["ps", "-o", "stat=", "-p", str(pid)], check=False, capture_output=True, text=True) if result.returncode != 0: return False state = result.stdout.strip() if state.startswith("Z"): return False except OSError: pass return True def write_state(profile: str, service: str, state: dict[str, Any]) -> None: path = state_path(profile, service) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(state, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8") def read_state(profile: str, service: str) -> dict[str, Any]: path = state_path(profile, service) if not path.is_file(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} def health_ok(config: dict[str, Any], timeout: float = 1.0) -> tuple[bool | None, str]: health = config.get("health") or {} kind = health.get("type") if not kind: return None, "no health check" if kind == "tcp": host = str(health.get("host") or "127.0.0.1") port = int(health.get("port") or 0) try: with socket.create_connection((host, port), timeout=timeout): return True, f"tcp {host}:{port} ok" except OSError as error: return False, f"tcp {host}:{port} failed: {error}" if kind == "http": url = str(health.get("url") or "") try: with urllib.request.urlopen(url, timeout=timeout) as response: ok = 200 <= int(response.status) < 400 return ok, f"http {url} status {response.status}" except (urllib.error.URLError, TimeoutError, OSError) as error: return False, f"http {url} failed: {error}" return None, f"unknown health type: {kind}" def wait_for_health(config: dict[str, Any], seconds: float = 8.0) -> tuple[bool | None, str]: deadline = time.time() + seconds last: tuple[bool | None, str] = (None, "no health check") while time.time() <= deadline: last = health_ok(config) if last[0] is True or last[0] is None: return last time.sleep(0.4) return last def start_service(profile: str, ref: ServiceRef, manifest: dict[str, Any], started: set[str]) -> None: if ref.name in started: return for dependency in ref.config.get("depends_on") or []: dep_config = (manifest.get("services") or {}).get(dependency) if not dep_config or not dep_config.get("enabled", True): raise SystemExit(f"{ref.name} depends on missing/disabled service: {dependency}") start_service(profile, ServiceRef(dependency, dep_config), manifest, started) kind = ref.config.get("kind", "process") command = ref.config.get("command") or [] if not command: raise SystemExit(f"{ref.name} has no command") if not command_exists(str(command[0])): raise SystemExit(f"{ref.name} command is not executable or not found: {command[0]}") if kind != "app-launcher": pid = read_pid(profile, ref.name) if is_running(pid): ok, detail = health_ok(ref.config) status = "running" if ok is not False else "running unhealthy" print(f"{ref.name}: {status} ({detail})") started.add(ref.name) return ok, detail = health_ok(ref.config) if ok is True: print(f"{ref.name}: externally running ({detail}); not starting duplicate") started.add(ref.name) return path = log_path(profile, ref.name) path.parent.mkdir(parents=True, exist_ok=True) rotate_log_if_needed(path, int(ref.config.get("log_max_bytes", DEFAULT_LOG_MAX_BYTES)), int(ref.config.get("log_backups", DEFAULT_LOG_BACKUPS))) with path.open("ab") as log_file: log_file.write(f"\n--- start {time.strftime('%Y-%m-%d %H:%M:%S')} ---\n".encode("utf-8")) if kind == "app-launcher": result = subprocess.run(command, cwd=ROOT, stdout=log_file, stderr=subprocess.STDOUT, check=False) write_state(profile, ref.name, {"last_launch_exit": result.returncode, "launched_at": time.time()}) print(f"{ref.name}: launched (exit {result.returncode})") else: process = subprocess.Popen(command, cwd=ROOT, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True) pid_file = pid_path(profile, ref.name) pid_file.parent.mkdir(parents=True, exist_ok=True) pid_file.write_text(str(process.pid) + "\n", encoding="utf-8") ok, detail = wait_for_health(ref.config) state = "started" if ok is not False else "started but health check failed" write_state(profile, ref.name, {"pid": process.pid, "started_at": time.time(), "health": detail}) print(f"{ref.name}: {state} pid={process.pid} ({detail})") started.add(ref.name) def stop_service(profile: str, ref: ServiceRef) -> None: kind = ref.config.get("kind", "process") if kind == "app-launcher": print(f"{ref.name}: launcher service has no managed process") return pid = read_pid(profile, ref.name) if not is_running(pid): ok, detail = health_ok(ref.config) if ok is True: print(f"{ref.name}: externally running ({detail}); no managed pid to stop") return print(f"{ref.name}: not running") pid_path(profile, ref.name).unlink(missing_ok=True) return assert pid is not None try: os.killpg(pid, signal.SIGTERM) except ProcessLookupError: pass except PermissionError: os.kill(pid, signal.SIGTERM) deadline = time.time() + float(ref.config.get("stop_timeout_seconds", DEFAULT_STOP_TIMEOUT_SECONDS)) while time.time() < deadline and is_running(pid): time.sleep(0.2) if is_running(pid): try: os.killpg(pid, signal.SIGKILL) except Exception: os.kill(pid, signal.SIGKILL) print(f"{ref.name}: killed pid={pid}") else: print(f"{ref.name}: stopped") pid_path(profile, ref.name).unlink(missing_ok=True) write_state(profile, ref.name, {"stopped_at": time.time()}) def status_service(profile: str, ref: ServiceRef) -> None: enabled = ref.config.get("enabled", True) kind = ref.config.get("kind", "process") if not enabled: print(f"{ref.name}: disabled") return if kind == "app-launcher": state = read_state(profile, ref.name) launched = state.get("launched_at") suffix = f"last launched {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(launched))}" if launched else "not launched by manager" print(f"{ref.name}: launcher ({suffix})") return pid = read_pid(profile, ref.name) running = is_running(pid) ok, detail = health_ok(ref.config) if running and ok is not False: label = "running" elif running: label = "unhealthy" elif ok is True: label = "externally running" else: label = "stopped" print(f"{ref.name}: {label} pid={pid or '-'} ({detail})") def tail_log(profile: str, service: str, lines: int) -> None: path = log_path(profile, service) if not path.is_file(): print(f"no log file: {path}") return content = path.read_text(encoding="utf-8", errors="replace").splitlines() for line in content[-lines:]: print(line) def run_doctor(profile: str, manifest: dict[str, Any]) -> None: print(f"AI Workspace doctor profile={profile}") print(f"workspace: {ROOT}") print(f"manifest: {manifest_path(profile)}") ensure_runtime() print(f"runtime: {RUNTIME_DIR}") errors = validate_manifest(manifest) if errors: print("manifest: invalid") for error in errors: print(f" ! {error}") else: print("manifest: ok") for ref in service_items(manifest, include_disabled=True): enabled = ref.config.get("enabled", True) command = ref.config.get("command") or [] first = command[0] if command else "" command_ok = command_exists(first) if first else False enabled_text = "enabled" if enabled else "disabled" if not enabled: print(f"- {ref.name}: {enabled_text}; command={'ok' if command_ok else 'missing'}; health skipped") continue ok, detail = health_ok(ref.config) health_text = detail if ok is not None else "no health check" print(f"- {ref.name}: {enabled_text}; command={'ok' if command_ok else 'missing'}; {health_text}") doctor = ref.config.get("doctor") or {} for command_name in doctor.get("required_commands") or []: print(f" required command {command_name}: {'ok' if command_exists(command_name) else 'missing'}") for command_name in doctor.get("optional_commands") or []: print(f" optional command {command_name}: {'ok' if command_exists(command_name) else 'missing'}") for raw_path in doctor.get("required_paths") or []: path = resolve_workspace_path(str(raw_path)) print(f" required path {raw_path}: {'ok' if path.exists() else 'missing'}") for raw_path in doctor.get("optional_paths") or []: path = resolve_workspace_path(str(raw_path)) print(f" optional path {raw_path}: {'ok' if path.exists() else 'missing'}") def shutil_which(command: str) -> str | None: paths = os.environ.get("PATH", "").split(os.pathsep) for directory in paths: candidate = Path(directory) / command if candidate.exists() and os.access(candidate, os.X_OK): return str(candidate) return None def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("action", choices=["start", "stop", "restart", "status", "logs", "doctor"]) parser.add_argument("services", nargs="*", help="Optional service names for start/stop/restart/status/logs.") parser.add_argument("--profile", default=os.getenv("AIW_PROJECT_PROFILE", "fidelity")) parser.add_argument("--group", default="", help="Start/stop/status services in a group, e.g. communication or inbox.") parser.add_argument("--lines", type=int, default=80, help="Number of log lines for logs action.") args = parser.parse_args() ensure_runtime() manifest = load_manifest(args.profile) if args.action == "doctor": run_doctor(args.profile, manifest) return errors = validate_manifest(manifest) if errors: raise SystemExit("invalid services manifest:\n" + "\n".join(f"- {error}" for error in errors)) include_disabled = args.action == "status" refs = select_services(manifest, args.services, args.group or None, include_disabled=include_disabled) if args.action == "start": started: set[str] = set() for ref in refs: start_service(args.profile, ref, manifest, started) elif args.action == "stop": for ref in reversed(refs): stop_service(args.profile, ref) elif args.action == "restart": for ref in reversed(refs): stop_service(args.profile, ref) started = set() for ref in refs: start_service(args.profile, ref, manifest, started) elif args.action == "status": for ref in refs: status_service(args.profile, ref) elif args.action == "logs": if not args.services: raise SystemExit("logs requires at least one service name") for service in args.services: print(f"==> {service} <==") tail_log(args.profile, service, args.lines) if __name__ == "__main__": main()