#!/usr/bin/env python3 """Receive JPEG uploads into a local Mac photo inbox.""" from __future__ import annotations import argparse import datetime as dt import json import os import subprocess import threading from dataclasses import dataclass from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from tempfile import NamedTemporaryFile from urllib.parse import parse_qs, urlparse SCRIPT_DIR = Path(__file__).resolve().parent COPY_FILES_HELPER = SCRIPT_DIR / "copy_files_to_clipboard.swift" COPY_FILES_HELPER_BIN = SCRIPT_DIR / "copy_files_to_clipboard" DEFAULT_ENV_FILE = SCRIPT_DIR / ".env" DEFAULT_OUTPUT_DIR = Path.home() / "Pictures" / "Photo Inbox" @dataclass(frozen=True) class Config: output_dir: Path clipboard: bool notify: bool reveal: bool @dataclass class Batch: output_dir: Path paths: list[Path] started_at: dt.datetime updated_at: dt.datetime timer: threading.Timer | None = None @property def count(self) -> int: return len(self.paths) def env_flag(name: str, default: bool = False) -> bool: value = os.getenv(name) if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def env_value(name: str) -> str | None: return os.getenv(name) def env_flag_value(name: str, default: bool = False) -> bool: value = env_value(name) if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def env_path(name: str, default: Path) -> Path: value = env_value(name) return Path(value).expanduser() if value else default def env_str(name: str, default: str) -> str: value = env_value(name) return value if value is not None else default def env_int(name: str, default: int) -> int: return int(env_str(name, str(default))) def env_float(name: str, default: float) -> float: return float(env_str(name, str(default))) def parse_env_line(line: str) -> tuple[str, str] | None: stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: return None key, value = stripped.split("=", 1) key = key.strip() value = value.strip() if not key: return None if key.startswith("export "): key = key.removeprefix("export ").strip() if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}: value = value[1:-1] return key, value def load_env_file(path: Path) -> None: if not path.exists(): return for line in path.read_text(encoding="utf-8").splitlines(): parsed = parse_env_line(line) if parsed is None: continue key, value = parsed os.environ.setdefault(key, value) def timestamp() -> str: return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f") def unique_path(output_dir: Path) -> Path: candidate = output_dir / f"iphone-{timestamp()}.jpg" counter = 1 while candidate.exists(): candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg" counter += 1 return candidate def looks_like_jpeg(data: bytes) -> bool: return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9") def run_macos_action(command: list[str]) -> bool: try: result = subprocess.run(command, check=False, capture_output=True, text=True) except OSError as error: print(f"macOS action failed: {error}", flush=True) return False if result.returncode != 0: error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}" print(f"macOS action failed: {error}", flush=True) return False if env_flag_value("PHOTO_INBOX_DEBUG"): output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part) if output: print(f"macOS action output: {output}", flush=True) return True def notify(title: str, message: str) -> bool: script = f"display notification {json.dumps(message)} with title {json.dumps(title)}" return run_macos_action(["osascript", "-e", script]) def reveal_in_finder(path: Path) -> bool: return run_macos_action(["open", "-R", str(path)]) def copy_files_to_clipboard(paths: list[Path]) -> bool: if not paths: return True helper = COPY_FILES_HELPER if COPY_FILES_HELPER_BIN.exists(): source_mtime = COPY_FILES_HELPER.stat().st_mtime if COPY_FILES_HELPER.exists() else 0 helper_mtime = COPY_FILES_HELPER_BIN.stat().st_mtime if helper_mtime >= source_mtime: helper = COPY_FILES_HELPER_BIN else: print("compiled pasteboard helper is older than source; using Swift script", flush=True) if helper.exists(): return run_macos_action([str(helper), *[str(path) for path in paths]]) print(f"native pasteboard helper missing: {COPY_FILES_HELPER_BIN}", flush=True) return False class BatchManager: def __init__(self, debounce_seconds: float, config: Config) -> None: self.debounce_seconds = debounce_seconds self.config = config self.lock = threading.Lock() self.batch: Batch | None = None def add(self, path: Path) -> tuple[int, list[Path]]: with self.lock: now = dt.datetime.now() if self.batch is None: self.batch = Batch(self.config.output_dir, [], now, now) self.batch.paths.append(path) self.batch.updated_at = now if self.batch.timer is not None: self.batch.timer.cancel() self.batch.timer = threading.Timer(self.debounce_seconds, self.finish) self.batch.timer.daemon = True self.batch.timer.start() return self.batch.count, list(self.batch.paths) def finish(self) -> None: with self.lock: batch = self.batch self.batch = None if batch is None: return if self.config.notify: plural = "photo" if batch.count == 1 else "photos" message = f"{batch.count} {plural} ready; clipboard={'on' if self.config.clipboard else 'off'}" if notify("Photo Inbox", message): print(f"batch notification sent count={batch.count}", flush=True) print(f"batch finalized count={batch.count} dir={batch.output_dir}", flush=True) class UploadHandler(BaseHTTPRequestHandler): server_version = "iPhonePhotoInbox/3.0" def do_GET(self) -> None: if self.path == "/health": self.send_text(HTTPStatus.OK, "ok\n") return self.send_text(HTTPStatus.NOT_FOUND, "not found\n") def do_POST(self) -> None: parsed = urlparse(self.path) if parsed.path != "/upload": self.send_text(HTTPStatus.NOT_FOUND, "not found\n") return query = parse_qs(parsed.query) expected_token = self.server.upload_token supplied_token = query.get("token", [""])[0] if expected_token and supplied_token != expected_token: self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n") return content_length = self.headers.get("Content-Length") if content_length is None: self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n") return try: size = int(content_length) except ValueError: self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n") return if size <= 0 or size > self.server.max_bytes: self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n") return data = self.rfile.read(size) if not looks_like_jpeg(data): self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n") return output_dir = self.server.config.output_dir output_dir.mkdir(parents=True, exist_ok=True) target = unique_path(output_dir) with NamedTemporaryFile(dir=output_dir, delete=False) as tmp: tmp.write(data) temp_path = Path(tmp.name) temp_path.replace(target) if self.server.config.reveal: if reveal_in_finder(target): print("revealed in Finder", flush=True) batch_count, batch_paths = self.server.batch_manager.add(target) if self.server.config.clipboard: if copy_files_to_clipboard(batch_paths): print(f"clipboard updated count={batch_count}", flush=True) self.send_text(HTTPStatus.CREATED, f"{target}\n") print(f"saved {target} batch_count={batch_count}", flush=True) def log_message(self, format: str, *args: object) -> None: print(f"{self.address_string()} - {format % args}", flush=True) def send_text(self, status: HTTPStatus, body: str) -> None: encoded = body.encode("utf-8") self.send_response(status) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() self.wfile.write(encoded) class UploadServer(ThreadingHTTPServer): def __init__( self, server_address: tuple[str, int], handler_class: type[BaseHTTPRequestHandler], config: Config, upload_token: str, max_bytes: int, debounce_seconds: float, ) -> None: super().__init__(server_address, handler_class) self.config = config self.upload_token = upload_token self.max_bytes = max_bytes self.batch_manager = BatchManager(debounce_seconds, config) def parse_args() -> argparse.Namespace: env_file = Path(env_str("PHOTO_INBOX_ENV_FILE", str(DEFAULT_ENV_FILE))).expanduser() load_env_file(env_file) parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--host", default=env_str("PHOTO_INBOX_HOST", "0.0.0.0")) parser.add_argument("--port", type=int, default=env_int("PHOTO_INBOX_PORT", 8787)) parser.add_argument("--output-dir", type=Path, default=env_path("PHOTO_INBOX_DIR", DEFAULT_OUTPUT_DIR)) parser.add_argument("--token", default=env_str("PHOTO_INBOX_TOKEN", "")) parser.add_argument("--max-mb", type=int, default=env_int("PHOTO_INBOX_MAX_MB", 30)) parser.add_argument("--debounce-seconds", type=float, default=env_float("PHOTO_INBOX_DEBOUNCE_SECONDS", 5)) parser.add_argument("--clipboard", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_CLIPBOARD", True)) parser.add_argument("--notify", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_NOTIFY", True)) parser.add_argument("--reveal", action="store_true", default=env_flag_value("PHOTO_INBOX_REVEAL")) return parser.parse_args() def build_config(args: argparse.Namespace) -> Config: return Config( output_dir=args.output_dir.expanduser().resolve(), clipboard=args.clipboard, notify=args.notify, reveal=args.reveal, ) def main() -> None: args = parse_args() config = build_config(args) server = UploadServer( (args.host, args.port), UploadHandler, config, args.token, args.max_mb * 1024 * 1024, args.debounce_seconds, ) print(f"listening on http://{args.host}:{args.port}/upload", flush=True) print(f"saving to: {config.output_dir}", flush=True) print(f"debounce seconds: {args.debounce_seconds:g}", flush=True) print(f"clipboard: {config.clipboard}", flush=True) print(f"notify: {config.notify}", flush=True) print(f"reveal: {config.reveal}", flush=True) if not args.token: print("warning: no token configured; anyone on this network can upload", flush=True) server.serve_forever() if __name__ == "__main__": main()