feat: update iPhone photo inbox scripts with environment file support and refactor batch handling

This commit is contained in:
2026-05-15 08:43:19 -06:00
parent 456a4c3381
commit 8950cfcdf0
6 changed files with 288 additions and 391 deletions

384
scripts/iphone-photo-inbox/receiver.py Executable file → Normal file
View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Receive JPEG uploads from iPhone Shortcuts into local Mac inboxes."""
"""Receive JPEG uploads into a local Mac photo inbox."""
from __future__ import annotations
@@ -7,7 +7,6 @@ import argparse
import datetime as dt
import json
import os
import shlex
import subprocess
import threading
from dataclasses import dataclass
@@ -18,28 +17,32 @@ from tempfile import NamedTemporaryFile
from urllib.parse import parse_qs, urlparse
WORKSPACE_DIR = Path(__file__).resolve().parents[2]
SCRIPT_DIR = Path(__file__).resolve().parent
COPY_FILES_HELPER = SCRIPT_DIR / "copy_files_to_clipboard.swift"
COPY_FILES_HELPER_BIN = SCRIPT_DIR / "copy_files_to_clipboard"
DEFAULT_OPENCODE_DIR = WORKSPACE_DIR / "ai" / "inbox" / "photos"
DEFAULT_MATTERMOST_DIR = Path.home() / "Pictures" / "iPhone Inbox"
CLIPBOARD_NONE = "none"
CLIPBOARD_IMAGE = "image"
CLIPBOARD_PATH = "path"
CLIPBOARD_TERMINAL_PATH = "terminal-path"
CLIPBOARD_FILE = "file"
CLIPBOARD_FILES = "files"
DEFAULT_ENV_FILE = SCRIPT_DIR / ".env"
DEFAULT_OUTPUT_DIR = Path.home() / "Pictures" / "Photo Inbox"
@dataclass(frozen=True)
class Profile:
name: str
class Config:
output_dir: Path
clipboard: str
notify: bool = True
reveal: bool = False
clipboard: bool
notify: bool
reveal: bool
@dataclass
class Batch:
output_dir: Path
paths: list[Path]
started_at: dt.datetime
updated_at: dt.datetime
timer: threading.Timer | None = None
@property
def count(self) -> int:
return len(self.paths)
def env_flag(name: str, default: bool = False) -> bool:
@@ -49,20 +52,62 @@ def env_flag(name: str, default: bool = False) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"}
def env_value(name: str) -> str | None:
return os.getenv(name)
def env_flag_value(name: str, default: bool = False) -> bool:
value = env_value(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def env_path(name: str, default: Path) -> Path:
value = os.getenv(name)
value = env_value(name)
return Path(value).expanduser() if value else default
def profile_defaults() -> dict[str, Profile]:
opencode_dir = env_path("IPHONE_PHOTO_OPENCODE_DIR", DEFAULT_OPENCODE_DIR)
mattermost_dir = env_path("IPHONE_PHOTO_MATTERMOST_DIR", DEFAULT_MATTERMOST_DIR)
general_dir = env_path("IPHONE_PHOTO_GENERAL_DIR", DEFAULT_MATTERMOST_DIR)
return {
"opencode": Profile("opencode", opencode_dir, CLIPBOARD_TERMINAL_PATH),
"mattermost": Profile("mattermost", mattermost_dir, CLIPBOARD_FILES),
"general": Profile("general", general_dir, CLIPBOARD_NONE),
}
def env_str(name: str, default: str) -> str:
value = env_value(name)
return value if value is not None else default
def env_int(name: str, default: int) -> int:
return int(env_str(name, str(default)))
def env_float(name: str, default: float) -> float:
return float(env_str(name, str(default)))
def parse_env_line(line: str) -> tuple[str, str] | None:
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
return None
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
return None
if key.startswith("export "):
key = key.removeprefix("export ").strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}:
value = value[1:-1]
return key, value
def load_env_file(path: Path) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
parsed = parse_env_line(line)
if parsed is None:
continue
key, value = parsed
os.environ.setdefault(key, value)
def timestamp() -> str:
@@ -82,10 +127,6 @@ def looks_like_jpeg(data: bytes) -> bool:
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
def normalize_profile(value: str) -> str:
return value.strip().lower() or "opencode"
def run_macos_action(command: list[str]) -> bool:
try:
result = subprocess.run(command, check=False, capture_output=True, text=True)
@@ -98,7 +139,7 @@ def run_macos_action(command: list[str]) -> bool:
print(f"macOS action failed: {error}", flush=True)
return False
if env_flag("IPHONE_PHOTO_DEBUG"):
if env_flag_value("PHOTO_INBOX_DEBUG"):
output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
if output:
print(f"macOS action output: {output}", flush=True)
@@ -115,25 +156,6 @@ def reveal_in_finder(path: Path) -> bool:
return run_macos_action(["open", "-R", str(path)])
def copy_image_to_clipboard(path: Path) -> bool:
script = f"set the clipboard to (read (POSIX file {json.dumps(str(path))}) as JPEG picture)"
return run_macos_action(["osascript", "-e", script])
def copy_file_to_clipboard(path: Path) -> bool:
return copy_files_to_clipboard([path])
def copy_file_to_clipboard_with_applescript(path: Path) -> bool:
script = f"""
set theFile to POSIX file {json.dumps(str(path))} as alias
tell application "Finder"
set the clipboard to {{theFile}}
end tell
"""
return run_macos_action(["osascript", "-e", script])
def copy_files_to_clipboard(paths: list[Path]) -> bool:
if not paths:
return True
@@ -148,133 +170,54 @@ def copy_files_to_clipboard(paths: list[Path]) -> bool:
print("compiled pasteboard helper is older than source; using Swift script", flush=True)
if helper.exists():
if run_macos_action([str(helper), *[str(path) for path in paths]]):
return True
print("native pasteboard helper failed; falling back to AppleScript", flush=True)
return run_macos_action([str(helper), *[str(path) for path in paths]])
return copy_files_to_clipboard_with_applescript(paths)
def copy_files_to_clipboard_with_applescript(paths: list[Path]) -> bool:
if not paths:
return True
alias_lines = [
f"set end of theFiles to POSIX file {json.dumps(str(path))} as alias"
for path in paths
]
script = "\n".join(
[
"set theFiles to {}",
*alias_lines,
'tell application "Finder"',
" set the clipboard to theFiles",
"end tell",
]
)
return run_macos_action(["osascript", "-e", script])
def copy_text_to_clipboard(text: str) -> bool:
script = f"set the clipboard to {json.dumps(text)}"
return run_macos_action(["osascript", "-e", script])
def terminal_path_reference(path: Path) -> str:
return shlex.quote(str(path))
def paths_reference(paths: list[Path]) -> str:
return "\n".join(str(path) for path in paths)
def terminal_paths_reference(paths: list[Path]) -> str:
return "\n".join(terminal_path_reference(path) for path in paths)
def apply_clipboard_mode(mode: str, paths: list[Path]) -> bool:
if mode == CLIPBOARD_NONE:
return True
if not paths:
return True
latest = paths[-1]
if mode == CLIPBOARD_IMAGE:
return copy_image_to_clipboard(latest)
if mode == CLIPBOARD_PATH:
return copy_text_to_clipboard(paths_reference(paths))
if mode == CLIPBOARD_TERMINAL_PATH:
return copy_text_to_clipboard(terminal_paths_reference(paths))
if mode == CLIPBOARD_FILE:
return copy_file_to_clipboard(latest)
if mode == CLIPBOARD_FILES:
return copy_files_to_clipboard(paths)
print(f"unsupported clipboard mode: {mode}", flush=True)
print(f"native pasteboard helper missing: {COPY_FILES_HELPER_BIN}", flush=True)
return False
@dataclass
class Batch:
profile: Profile
output_dir: Path
paths: list[Path]
started_at: dt.datetime
updated_at: dt.datetime
timer: threading.Timer | None = None
@property
def count(self) -> int:
return len(self.paths)
class BatchManager:
def __init__(self, debounce_seconds: float) -> None:
def __init__(self, debounce_seconds: float, config: Config) -> None:
self.debounce_seconds = debounce_seconds
self.config = config
self.lock = threading.Lock()
self.batches: dict[str, Batch] = {}
self.batch: Batch | None = None
def add(self, profile: Profile, output_dir: Path, path: Path) -> tuple[int, list[Path]]:
def add(self, path: Path) -> tuple[int, list[Path]]:
with self.lock:
now = dt.datetime.now()
batch = self.batches.get(profile.name)
if batch is None:
batch = Batch(profile, output_dir, [], now, now)
self.batches[profile.name] = batch
if self.batch is None:
self.batch = Batch(self.config.output_dir, [], now, now)
batch.profile = profile
batch.output_dir = output_dir
batch.paths.append(path)
batch.updated_at = now
self.batch.paths.append(path)
self.batch.updated_at = now
if batch.timer is not None:
batch.timer.cancel()
batch.timer = threading.Timer(self.debounce_seconds, self.finish, args=(profile.name,))
batch.timer.daemon = True
batch.timer.start()
if self.batch.timer is not None:
self.batch.timer.cancel()
self.batch.timer = threading.Timer(self.debounce_seconds, self.finish)
self.batch.timer.daemon = True
self.batch.timer.start()
return batch.count, list(batch.paths)
return self.batch.count, list(self.batch.paths)
def finish(self, profile_name: str) -> None:
def finish(self) -> None:
with self.lock:
batch = self.batches.pop(profile_name, None)
batch = self.batch
self.batch = None
if batch is None:
return
if batch.profile.notify:
if self.config.notify:
plural = "photo" if batch.count == 1 else "photos"
message = f"{batch.profile.name}: {batch.count} {plural} ready; clipboard={batch.profile.clipboard}"
if notify("iPhone Photo Inbox", message):
print(f"batch notification sent profile={batch.profile.name} count={batch.count}", flush=True)
message = f"{batch.count} {plural} ready; clipboard={'on' if self.config.clipboard else 'off'}"
if notify("Photo Inbox", message):
print(f"batch notification sent count={batch.count}", flush=True)
print(
f"batch finalized profile={batch.profile.name} count={batch.count} "
f"dir={batch.output_dir}",
flush=True,
)
print(f"batch finalized count={batch.count} dir={batch.output_dir}", flush=True)
class UploadHandler(BaseHTTPRequestHandler):
server_version = "iPhonePhotoInbox/2.0"
server_version = "iPhonePhotoInbox/3.0"
def do_GET(self) -> None:
if self.path == "/health":
@@ -295,13 +238,6 @@ class UploadHandler(BaseHTTPRequestHandler):
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
return
profile_name = normalize_profile(query.get("profile", [self.server.default_profile])[0])
profile = self.server.profiles.get(profile_name)
if profile is None:
known = ", ".join(sorted(self.server.profiles))
self.send_text(HTTPStatus.BAD_REQUEST, f"unknown profile: {profile_name}; expected one of {known}\n")
return
content_length = self.headers.get("Content-Length")
if content_length is None:
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
@@ -322,7 +258,7 @@ class UploadHandler(BaseHTTPRequestHandler):
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
return
output_dir = self.server.output_dir_override or profile.output_dir
output_dir = self.server.config.output_dir
output_dir.mkdir(parents=True, exist_ok=True)
target = unique_path(output_dir)
with NamedTemporaryFile(dir=output_dir, delete=False) as tmp:
@@ -330,20 +266,17 @@ class UploadHandler(BaseHTTPRequestHandler):
temp_path = Path(tmp.name)
temp_path.replace(target)
if profile.reveal:
if self.server.config.reveal:
if reveal_in_finder(target):
print("revealed in Finder", flush=True)
batch_count, batch_paths = self.server.batch_manager.add(profile, output_dir, target)
if apply_clipboard_mode(profile.clipboard, batch_paths):
print(
f"clipboard mode applied: {profile.clipboard} "
f"profile={profile.name} count={batch_count}",
flush=True,
)
batch_count, batch_paths = self.server.batch_manager.add(target)
if self.server.config.clipboard:
if copy_files_to_clipboard(batch_paths):
print(f"clipboard updated count={batch_count}", flush=True)
self.send_text(HTTPStatus.CREATED, f"{target}\n")
print(f"saved {target} profile={profile.name} batch_count={batch_count}", flush=True)
print(f"saved {target} batch_count={batch_count}", flush=True)
def log_message(self, format: str, *args: object) -> None:
print(f"{self.address_string()} - {format % args}", flush=True)
@@ -362,112 +295,61 @@ class UploadServer(ThreadingHTTPServer):
self,
server_address: tuple[str, int],
handler_class: type[BaseHTTPRequestHandler],
profiles: dict[str, Profile],
default_profile: str,
output_dir_override: Path | None,
config: Config,
upload_token: str,
max_bytes: int,
debounce_seconds: float,
) -> None:
super().__init__(server_address, handler_class)
self.profiles = profiles
self.default_profile = default_profile
self.output_dir_override = output_dir_override
self.config = config
self.upload_token = upload_token
self.max_bytes = max_bytes
self.batch_manager = BatchManager(debounce_seconds)
def apply_legacy_clipboard_overrides(profile: Profile) -> Profile:
clipboard = profile.clipboard
if env_flag("IPHONE_PHOTO_COPY"):
clipboard = CLIPBOARD_IMAGE
if env_flag("IPHONE_PHOTO_COPY_FILE"):
clipboard = CLIPBOARD_FILE
if env_flag("IPHONE_PHOTO_COPY_PATH"):
clipboard = CLIPBOARD_PATH
if env_flag("IPHONE_PHOTO_COPY_TERMINAL_PATH"):
clipboard = CLIPBOARD_TERMINAL_PATH
if env_flag("IPHONE_PHOTO_COPY_FILES"):
clipboard = CLIPBOARD_FILES
notify_enabled = env_flag("IPHONE_PHOTO_NOTIFY", profile.notify)
reveal_enabled = env_flag("IPHONE_PHOTO_REVEAL", profile.reveal)
return Profile(profile.name, profile.output_dir, clipboard, notify_enabled, reveal_enabled)
self.batch_manager = BatchManager(debounce_seconds, config)
def parse_args() -> argparse.Namespace:
env_file = Path(env_str("PHOTO_INBOX_ENV_FILE", str(DEFAULT_ENV_FILE))).expanduser()
load_env_file(env_file)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default=os.getenv("IPHONE_PHOTO_HOST", "0.0.0.0"))
parser.add_argument("--port", type=int, default=int(os.getenv("IPHONE_PHOTO_PORT", "8787")))
parser.add_argument("--profile", default=os.getenv("IPHONE_PHOTO_PROFILE", "opencode"))
parser.add_argument("--output-dir", type=Path, default=os.getenv("IPHONE_PHOTO_OUTPUT_DIR"))
parser.add_argument("--token", default=os.getenv("IPHONE_PHOTO_TOKEN", ""))
parser.add_argument("--max-mb", type=int, default=int(os.getenv("IPHONE_PHOTO_MAX_MB", "30")))
parser.add_argument("--debounce-seconds", type=float, default=float(os.getenv("IPHONE_PHOTO_DEBOUNCE_SECONDS", "5")))
parser.add_argument(
"--clipboard",
choices=[CLIPBOARD_NONE, CLIPBOARD_IMAGE, CLIPBOARD_PATH, CLIPBOARD_TERMINAL_PATH, CLIPBOARD_FILE, CLIPBOARD_FILES],
default=os.getenv("IPHONE_PHOTO_CLIPBOARD"),
)
parser.add_argument("--no-notify", action="store_true", default=env_flag("IPHONE_PHOTO_NO_NOTIFY"))
parser.add_argument("--reveal", action="store_true", default=env_flag("IPHONE_PHOTO_REVEAL"))
parser.add_argument("--host", default=env_str("PHOTO_INBOX_HOST", "0.0.0.0"))
parser.add_argument("--port", type=int, default=env_int("PHOTO_INBOX_PORT", 8787))
parser.add_argument("--output-dir", type=Path, default=env_path("PHOTO_INBOX_DIR", DEFAULT_OUTPUT_DIR))
parser.add_argument("--token", default=env_str("PHOTO_INBOX_TOKEN", ""))
parser.add_argument("--max-mb", type=int, default=env_int("PHOTO_INBOX_MAX_MB", 30))
parser.add_argument("--debounce-seconds", type=float, default=env_float("PHOTO_INBOX_DEBOUNCE_SECONDS", 5))
parser.add_argument("--clipboard", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_CLIPBOARD", True))
parser.add_argument("--notify", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_NOTIFY", True))
parser.add_argument("--reveal", action="store_true", default=env_flag_value("PHOTO_INBOX_REVEAL"))
return parser.parse_args()
def build_profiles(args: argparse.Namespace) -> dict[str, Profile]:
profiles = {
name: apply_legacy_clipboard_overrides(profile)
for name, profile in profile_defaults().items()
}
if args.clipboard:
profiles = {
name: Profile(profile.name, profile.output_dir, args.clipboard, profile.notify, profile.reveal)
for name, profile in profiles.items()
}
if args.no_notify:
profiles = {
name: Profile(profile.name, profile.output_dir, profile.clipboard, False, profile.reveal)
for name, profile in profiles.items()
}
if args.reveal:
profiles = {
name: Profile(profile.name, profile.output_dir, profile.clipboard, profile.notify, True)
for name, profile in profiles.items()
}
return profiles
def build_config(args: argparse.Namespace) -> Config:
return Config(
output_dir=args.output_dir.expanduser().resolve(),
clipboard=args.clipboard,
notify=args.notify,
reveal=args.reveal,
)
def main() -> None:
args = parse_args()
profiles = build_profiles(args)
default_profile = normalize_profile(args.profile)
if default_profile not in profiles:
known = ", ".join(sorted(profiles))
raise SystemExit(f"unknown default profile: {default_profile}; expected one of {known}")
output_dir_override = Path(args.output_dir).expanduser().resolve() if args.output_dir else None
config = build_config(args)
server = UploadServer(
(args.host, args.port),
UploadHandler,
profiles,
default_profile,
output_dir_override,
config,
args.token,
args.max_mb * 1024 * 1024,
args.debounce_seconds,
)
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
print(f"default profile: {default_profile}", flush=True)
print(f"saving to: {config.output_dir}", flush=True)
print(f"debounce seconds: {args.debounce_seconds:g}", flush=True)
for profile in profiles.values():
output_dir = output_dir_override or profile.output_dir
print(
f"profile {profile.name}: dir={output_dir.expanduser().resolve()} "
f"clipboard={profile.clipboard} notify={profile.notify} reveal={profile.reveal}",
flush=True,
)
print(f"clipboard: {config.clipboard}", flush=True)
print(f"notify: {config.notify}", flush=True)
print(f"reveal: {config.reveal}", flush=True)
if not args.token:
print("warning: no token configured; anyone on this network can upload", flush=True)
server.serve_forever()