#!/usr/bin/env python3 """Receive JPEG uploads from iPhone Shortcuts into local Mac inboxes.""" from __future__ import annotations import argparse import datetime as dt import json import os import shlex import subprocess import threading from dataclasses import dataclass from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from tempfile import NamedTemporaryFile from urllib.parse import parse_qs, urlparse WORKSPACE_DIR = Path(__file__).resolve().parents[2] SCRIPT_DIR = Path(__file__).resolve().parent COPY_FILES_HELPER = SCRIPT_DIR / "copy_files_to_clipboard.swift" COPY_FILES_HELPER_BIN = SCRIPT_DIR / "copy_files_to_clipboard" DEFAULT_OPENCODE_DIR = WORKSPACE_DIR / "ai" / "inbox" / "photos" DEFAULT_MATTERMOST_DIR = Path.home() / "Pictures" / "iPhone Inbox" CLIPBOARD_NONE = "none" CLIPBOARD_IMAGE = "image" CLIPBOARD_PATH = "path" CLIPBOARD_TERMINAL_PATH = "terminal-path" CLIPBOARD_FILE = "file" CLIPBOARD_FILES = "files" @dataclass(frozen=True) class Profile: name: str output_dir: Path clipboard: str notify: bool = True reveal: bool = False def env_flag(name: str, default: bool = False) -> bool: value = os.getenv(name) if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def env_path(name: str, default: Path) -> Path: value = os.getenv(name) return Path(value).expanduser() if value else default def profile_defaults() -> dict[str, Profile]: opencode_dir = env_path("IPHONE_PHOTO_OPENCODE_DIR", DEFAULT_OPENCODE_DIR) mattermost_dir = env_path("IPHONE_PHOTO_MATTERMOST_DIR", DEFAULT_MATTERMOST_DIR) general_dir = env_path("IPHONE_PHOTO_GENERAL_DIR", DEFAULT_MATTERMOST_DIR) return { "opencode": Profile("opencode", opencode_dir, CLIPBOARD_TERMINAL_PATH), "mattermost": Profile("mattermost", mattermost_dir, CLIPBOARD_FILES), "general": Profile("general", general_dir, CLIPBOARD_NONE), } def timestamp() -> str: return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f") def unique_path(output_dir: Path) -> Path: candidate = output_dir / f"iphone-{timestamp()}.jpg" counter = 1 while candidate.exists(): candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg" counter += 1 return candidate def looks_like_jpeg(data: bytes) -> bool: return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9") def normalize_profile(value: str) -> str: return value.strip().lower() or "opencode" def run_macos_action(command: list[str]) -> bool: try: result = subprocess.run(command, check=False, capture_output=True, text=True) except OSError as error: print(f"macOS action failed: {error}", flush=True) return False if result.returncode != 0: error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}" print(f"macOS action failed: {error}", flush=True) return False if env_flag("IPHONE_PHOTO_DEBUG"): output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part) if output: print(f"macOS action output: {output}", flush=True) return True def notify(title: str, message: str) -> bool: script = f"display notification {json.dumps(message)} with title {json.dumps(title)}" return run_macos_action(["osascript", "-e", script]) def reveal_in_finder(path: Path) -> bool: return run_macos_action(["open", "-R", str(path)]) def copy_image_to_clipboard(path: Path) -> bool: script = f"set the clipboard to (read (POSIX file {json.dumps(str(path))}) as JPEG picture)" return run_macos_action(["osascript", "-e", script]) def copy_file_to_clipboard(path: Path) -> bool: return copy_files_to_clipboard([path]) def copy_file_to_clipboard_with_applescript(path: Path) -> bool: script = f""" set theFile to POSIX file {json.dumps(str(path))} as alias tell application "Finder" set the clipboard to {{theFile}} end tell """ return run_macos_action(["osascript", "-e", script]) def copy_files_to_clipboard(paths: list[Path]) -> bool: if not paths: return True helper = COPY_FILES_HELPER if COPY_FILES_HELPER_BIN.exists(): source_mtime = COPY_FILES_HELPER.stat().st_mtime if COPY_FILES_HELPER.exists() else 0 helper_mtime = COPY_FILES_HELPER_BIN.stat().st_mtime if helper_mtime >= source_mtime: helper = COPY_FILES_HELPER_BIN else: print("compiled pasteboard helper is older than source; using Swift script", flush=True) if helper.exists(): if run_macos_action([str(helper), *[str(path) for path in paths]]): return True print("native pasteboard helper failed; falling back to AppleScript", flush=True) return copy_files_to_clipboard_with_applescript(paths) def copy_files_to_clipboard_with_applescript(paths: list[Path]) -> bool: if not paths: return True alias_lines = [ f"set end of theFiles to POSIX file {json.dumps(str(path))} as alias" for path in paths ] script = "\n".join( [ "set theFiles to {}", *alias_lines, 'tell application "Finder"', " set the clipboard to theFiles", "end tell", ] ) return run_macos_action(["osascript", "-e", script]) def copy_text_to_clipboard(text: str) -> bool: script = f"set the clipboard to {json.dumps(text)}" return run_macos_action(["osascript", "-e", script]) def terminal_path_reference(path: Path) -> str: return shlex.quote(str(path)) def paths_reference(paths: list[Path]) -> str: return "\n".join(str(path) for path in paths) def terminal_paths_reference(paths: list[Path]) -> str: return "\n".join(terminal_path_reference(path) for path in paths) def apply_clipboard_mode(mode: str, paths: list[Path]) -> bool: if mode == CLIPBOARD_NONE: return True if not paths: return True latest = paths[-1] if mode == CLIPBOARD_IMAGE: return copy_image_to_clipboard(latest) if mode == CLIPBOARD_PATH: return copy_text_to_clipboard(paths_reference(paths)) if mode == CLIPBOARD_TERMINAL_PATH: return copy_text_to_clipboard(terminal_paths_reference(paths)) if mode == CLIPBOARD_FILE: return copy_file_to_clipboard(latest) if mode == CLIPBOARD_FILES: return copy_files_to_clipboard(paths) print(f"unsupported clipboard mode: {mode}", flush=True) return False @dataclass class Batch: profile: Profile output_dir: Path paths: list[Path] started_at: dt.datetime updated_at: dt.datetime timer: threading.Timer | None = None @property def count(self) -> int: return len(self.paths) class BatchManager: def __init__(self, debounce_seconds: float) -> None: self.debounce_seconds = debounce_seconds self.lock = threading.Lock() self.batches: dict[str, Batch] = {} def add(self, profile: Profile, output_dir: Path, path: Path) -> tuple[int, list[Path]]: with self.lock: now = dt.datetime.now() batch = self.batches.get(profile.name) if batch is None: batch = Batch(profile, output_dir, [], now, now) self.batches[profile.name] = batch batch.profile = profile batch.output_dir = output_dir batch.paths.append(path) batch.updated_at = now if batch.timer is not None: batch.timer.cancel() batch.timer = threading.Timer(self.debounce_seconds, self.finish, args=(profile.name,)) batch.timer.daemon = True batch.timer.start() return batch.count, list(batch.paths) def finish(self, profile_name: str) -> None: with self.lock: batch = self.batches.pop(profile_name, None) if batch is None: return if batch.profile.notify: plural = "photo" if batch.count == 1 else "photos" message = f"{batch.profile.name}: {batch.count} {plural} ready; clipboard={batch.profile.clipboard}" if notify("iPhone Photo Inbox", message): print(f"batch notification sent profile={batch.profile.name} count={batch.count}", flush=True) print( f"batch finalized profile={batch.profile.name} count={batch.count} " f"dir={batch.output_dir}", flush=True, ) class UploadHandler(BaseHTTPRequestHandler): server_version = "iPhonePhotoInbox/2.0" def do_GET(self) -> None: if self.path == "/health": self.send_text(HTTPStatus.OK, "ok\n") return self.send_text(HTTPStatus.NOT_FOUND, "not found\n") def do_POST(self) -> None: parsed = urlparse(self.path) if parsed.path != "/upload": self.send_text(HTTPStatus.NOT_FOUND, "not found\n") return query = parse_qs(parsed.query) expected_token = self.server.upload_token supplied_token = query.get("token", [""])[0] if expected_token and supplied_token != expected_token: self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n") return profile_name = normalize_profile(query.get("profile", [self.server.default_profile])[0]) profile = self.server.profiles.get(profile_name) if profile is None: known = ", ".join(sorted(self.server.profiles)) self.send_text(HTTPStatus.BAD_REQUEST, f"unknown profile: {profile_name}; expected one of {known}\n") return content_length = self.headers.get("Content-Length") if content_length is None: self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n") return try: size = int(content_length) except ValueError: self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n") return if size <= 0 or size > self.server.max_bytes: self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n") return data = self.rfile.read(size) if not looks_like_jpeg(data): self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n") return output_dir = self.server.output_dir_override or profile.output_dir output_dir.mkdir(parents=True, exist_ok=True) target = unique_path(output_dir) with NamedTemporaryFile(dir=output_dir, delete=False) as tmp: tmp.write(data) temp_path = Path(tmp.name) temp_path.replace(target) if profile.reveal: if reveal_in_finder(target): print("revealed in Finder", flush=True) batch_count, batch_paths = self.server.batch_manager.add(profile, output_dir, target) if apply_clipboard_mode(profile.clipboard, batch_paths): print( f"clipboard mode applied: {profile.clipboard} " f"profile={profile.name} count={batch_count}", flush=True, ) self.send_text(HTTPStatus.CREATED, f"{target}\n") print(f"saved {target} profile={profile.name} batch_count={batch_count}", flush=True) def log_message(self, format: str, *args: object) -> None: print(f"{self.address_string()} - {format % args}", flush=True) def send_text(self, status: HTTPStatus, body: str) -> None: encoded = body.encode("utf-8") self.send_response(status) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(encoded))) self.end_headers() self.wfile.write(encoded) class UploadServer(ThreadingHTTPServer): def __init__( self, server_address: tuple[str, int], handler_class: type[BaseHTTPRequestHandler], profiles: dict[str, Profile], default_profile: str, output_dir_override: Path | None, upload_token: str, max_bytes: int, debounce_seconds: float, ) -> None: super().__init__(server_address, handler_class) self.profiles = profiles self.default_profile = default_profile self.output_dir_override = output_dir_override self.upload_token = upload_token self.max_bytes = max_bytes self.batch_manager = BatchManager(debounce_seconds) def apply_legacy_clipboard_overrides(profile: Profile) -> Profile: clipboard = profile.clipboard if env_flag("IPHONE_PHOTO_COPY"): clipboard = CLIPBOARD_IMAGE if env_flag("IPHONE_PHOTO_COPY_FILE"): clipboard = CLIPBOARD_FILE if env_flag("IPHONE_PHOTO_COPY_PATH"): clipboard = CLIPBOARD_PATH if env_flag("IPHONE_PHOTO_COPY_TERMINAL_PATH"): clipboard = CLIPBOARD_TERMINAL_PATH if env_flag("IPHONE_PHOTO_COPY_FILES"): clipboard = CLIPBOARD_FILES notify_enabled = env_flag("IPHONE_PHOTO_NOTIFY", profile.notify) reveal_enabled = env_flag("IPHONE_PHOTO_REVEAL", profile.reveal) return Profile(profile.name, profile.output_dir, clipboard, notify_enabled, reveal_enabled) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--host", default=os.getenv("IPHONE_PHOTO_HOST", "0.0.0.0")) parser.add_argument("--port", type=int, default=int(os.getenv("IPHONE_PHOTO_PORT", "8787"))) parser.add_argument("--profile", default=os.getenv("IPHONE_PHOTO_PROFILE", "opencode")) parser.add_argument("--output-dir", type=Path, default=os.getenv("IPHONE_PHOTO_OUTPUT_DIR")) parser.add_argument("--token", default=os.getenv("IPHONE_PHOTO_TOKEN", "")) parser.add_argument("--max-mb", type=int, default=int(os.getenv("IPHONE_PHOTO_MAX_MB", "30"))) parser.add_argument("--debounce-seconds", type=float, default=float(os.getenv("IPHONE_PHOTO_DEBOUNCE_SECONDS", "5"))) parser.add_argument( "--clipboard", choices=[CLIPBOARD_NONE, CLIPBOARD_IMAGE, CLIPBOARD_PATH, CLIPBOARD_TERMINAL_PATH, CLIPBOARD_FILE, CLIPBOARD_FILES], default=os.getenv("IPHONE_PHOTO_CLIPBOARD"), ) parser.add_argument("--no-notify", action="store_true", default=env_flag("IPHONE_PHOTO_NO_NOTIFY")) parser.add_argument("--reveal", action="store_true", default=env_flag("IPHONE_PHOTO_REVEAL")) return parser.parse_args() def build_profiles(args: argparse.Namespace) -> dict[str, Profile]: profiles = { name: apply_legacy_clipboard_overrides(profile) for name, profile in profile_defaults().items() } if args.clipboard: profiles = { name: Profile(profile.name, profile.output_dir, args.clipboard, profile.notify, profile.reveal) for name, profile in profiles.items() } if args.no_notify: profiles = { name: Profile(profile.name, profile.output_dir, profile.clipboard, False, profile.reveal) for name, profile in profiles.items() } if args.reveal: profiles = { name: Profile(profile.name, profile.output_dir, profile.clipboard, profile.notify, True) for name, profile in profiles.items() } return profiles def main() -> None: args = parse_args() profiles = build_profiles(args) default_profile = normalize_profile(args.profile) if default_profile not in profiles: known = ", ".join(sorted(profiles)) raise SystemExit(f"unknown default profile: {default_profile}; expected one of {known}") output_dir_override = Path(args.output_dir).expanduser().resolve() if args.output_dir else None server = UploadServer( (args.host, args.port), UploadHandler, profiles, default_profile, output_dir_override, args.token, args.max_mb * 1024 * 1024, args.debounce_seconds, ) print(f"listening on http://{args.host}:{args.port}/upload", flush=True) print(f"default profile: {default_profile}", flush=True) print(f"debounce seconds: {args.debounce_seconds:g}", flush=True) for profile in profiles.values(): output_dir = output_dir_override or profile.output_dir print( f"profile {profile.name}: dir={output_dir.expanduser().resolve()} " f"clipboard={profile.clipboard} notify={profile.notify} reveal={profile.reveal}", flush=True, ) if not args.token: print("warning: no token configured; anyone on this network can upload", flush=True) server.serve_forever() if __name__ == "__main__": main()