Files
fidelity-ai-workspace/scripts/iphone-photo-inbox/receiver.py

339 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""Receive JPEG uploads from iPhone Shortcuts into local Mac inboxes."""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import shlex
import subprocess
from dataclasses import dataclass
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from tempfile import NamedTemporaryFile
from urllib.parse import parse_qs, urlparse
WORKSPACE_DIR = Path(__file__).resolve().parents[2]
DEFAULT_OPENCODE_DIR = WORKSPACE_DIR / "ai" / "inbox" / "photos"
DEFAULT_MATTERMOST_DIR = Path.home() / "Pictures" / "iPhone Inbox"
CLIPBOARD_NONE = "none"
CLIPBOARD_IMAGE = "image"
CLIPBOARD_PATH = "path"
CLIPBOARD_TERMINAL_PATH = "terminal-path"
CLIPBOARD_FILE = "file"
@dataclass(frozen=True)
class Profile:
name: str
output_dir: Path
clipboard: str
notify: bool = True
reveal: bool = False
def env_flag(name: str, default: bool = False) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def env_path(name: str, default: Path) -> Path:
value = os.getenv(name)
return Path(value).expanduser() if value else default
def profile_defaults() -> dict[str, Profile]:
opencode_dir = env_path("IPHONE_PHOTO_OPENCODE_DIR", DEFAULT_OPENCODE_DIR)
mattermost_dir = env_path("IPHONE_PHOTO_MATTERMOST_DIR", DEFAULT_MATTERMOST_DIR)
general_dir = env_path("IPHONE_PHOTO_GENERAL_DIR", DEFAULT_MATTERMOST_DIR)
return {
"opencode": Profile("opencode", opencode_dir, CLIPBOARD_TERMINAL_PATH),
"mattermost": Profile("mattermost", mattermost_dir, CLIPBOARD_IMAGE),
"general": Profile("general", general_dir, CLIPBOARD_NONE),
}
def timestamp() -> str:
return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
def unique_path(output_dir: Path) -> Path:
candidate = output_dir / f"iphone-{timestamp()}.jpg"
counter = 1
while candidate.exists():
candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg"
counter += 1
return candidate
def looks_like_jpeg(data: bytes) -> bool:
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
def normalize_profile(value: str) -> str:
return value.strip().lower() or "opencode"
def run_macos_action(command: list[str]) -> bool:
try:
result = subprocess.run(command, check=False, capture_output=True, text=True)
except OSError as error:
print(f"macOS action failed: {error}", flush=True)
return False
if result.returncode != 0:
error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
print(f"macOS action failed: {error}", flush=True)
return False
return True
def notify(title: str, message: str) -> bool:
script = f"display notification {json.dumps(message)} with title {json.dumps(title)}"
return run_macos_action(["osascript", "-e", script])
def reveal_in_finder(path: Path) -> bool:
return run_macos_action(["open", "-R", str(path)])
def copy_image_to_clipboard(path: Path) -> bool:
script = f"set the clipboard to (read (POSIX file {json.dumps(str(path))}) as JPEG picture)"
return run_macos_action(["osascript", "-e", script])
def copy_file_to_clipboard(path: Path) -> bool:
script = f"""
set theFile to POSIX file {json.dumps(str(path))} as alias
tell application "Finder"
set the clipboard to {{theFile}}
end tell
"""
return run_macos_action(["osascript", "-e", script])
def copy_text_to_clipboard(text: str) -> bool:
script = f"set the clipboard to {json.dumps(text)}"
return run_macos_action(["osascript", "-e", script])
def terminal_path_reference(path: Path) -> str:
return shlex.quote(str(path))
def apply_clipboard_mode(mode: str, path: Path) -> bool:
if mode == CLIPBOARD_NONE:
return True
if mode == CLIPBOARD_IMAGE:
return copy_image_to_clipboard(path)
if mode == CLIPBOARD_PATH:
return copy_text_to_clipboard(str(path))
if mode == CLIPBOARD_TERMINAL_PATH:
return copy_text_to_clipboard(terminal_path_reference(path))
if mode == CLIPBOARD_FILE:
return copy_file_to_clipboard(path)
print(f"unsupported clipboard mode: {mode}", flush=True)
return False
class UploadHandler(BaseHTTPRequestHandler):
server_version = "iPhonePhotoInbox/2.0"
def do_GET(self) -> None:
if self.path == "/health":
self.send_text(HTTPStatus.OK, "ok\n")
return
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path != "/upload":
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
return
query = parse_qs(parsed.query)
expected_token = self.server.upload_token
supplied_token = query.get("token", [""])[0]
if expected_token and supplied_token != expected_token:
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
return
profile_name = normalize_profile(query.get("profile", [self.server.default_profile])[0])
profile = self.server.profiles.get(profile_name)
if profile is None:
known = ", ".join(sorted(self.server.profiles))
self.send_text(HTTPStatus.BAD_REQUEST, f"unknown profile: {profile_name}; expected one of {known}\n")
return
content_length = self.headers.get("Content-Length")
if content_length is None:
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
return
try:
size = int(content_length)
except ValueError:
self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n")
return
if size <= 0 or size > self.server.max_bytes:
self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n")
return
data = self.rfile.read(size)
if not looks_like_jpeg(data):
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
return
output_dir = self.server.output_dir_override or profile.output_dir
output_dir.mkdir(parents=True, exist_ok=True)
target = unique_path(output_dir)
with NamedTemporaryFile(dir=output_dir, delete=False) as tmp:
tmp.write(data)
temp_path = Path(tmp.name)
temp_path.replace(target)
if profile.notify:
if notify("iPhone Photo Inbox", f"{profile.name}: {target.name}"):
print("notification sent", flush=True)
if profile.reveal:
if reveal_in_finder(target):
print("revealed in Finder", flush=True)
if apply_clipboard_mode(profile.clipboard, target):
print(f"clipboard mode applied: {profile.clipboard}", flush=True)
self.send_text(HTTPStatus.CREATED, f"{target}\n")
print(f"saved {target} profile={profile.name}", flush=True)
def log_message(self, format: str, *args: object) -> None:
print(f"{self.address_string()} - {format % args}", flush=True)
def send_text(self, status: HTTPStatus, body: str) -> None:
encoded = body.encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
class UploadServer(ThreadingHTTPServer):
def __init__(
self,
server_address: tuple[str, int],
handler_class: type[BaseHTTPRequestHandler],
profiles: dict[str, Profile],
default_profile: str,
output_dir_override: Path | None,
upload_token: str,
max_bytes: int,
) -> None:
super().__init__(server_address, handler_class)
self.profiles = profiles
self.default_profile = default_profile
self.output_dir_override = output_dir_override
self.upload_token = upload_token
self.max_bytes = max_bytes
def apply_legacy_clipboard_overrides(profile: Profile) -> Profile:
clipboard = profile.clipboard
if env_flag("IPHONE_PHOTO_COPY"):
clipboard = CLIPBOARD_IMAGE
if env_flag("IPHONE_PHOTO_COPY_FILE"):
clipboard = CLIPBOARD_FILE
if env_flag("IPHONE_PHOTO_COPY_PATH"):
clipboard = CLIPBOARD_PATH
if env_flag("IPHONE_PHOTO_COPY_TERMINAL_PATH"):
clipboard = CLIPBOARD_TERMINAL_PATH
notify_enabled = env_flag("IPHONE_PHOTO_NOTIFY", profile.notify)
reveal_enabled = env_flag("IPHONE_PHOTO_REVEAL", profile.reveal)
return Profile(profile.name, profile.output_dir, clipboard, notify_enabled, reveal_enabled)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default=os.getenv("IPHONE_PHOTO_HOST", "0.0.0.0"))
parser.add_argument("--port", type=int, default=int(os.getenv("IPHONE_PHOTO_PORT", "8787")))
parser.add_argument("--profile", default=os.getenv("IPHONE_PHOTO_PROFILE", "opencode"))
parser.add_argument("--output-dir", type=Path, default=os.getenv("IPHONE_PHOTO_OUTPUT_DIR"))
parser.add_argument("--token", default=os.getenv("IPHONE_PHOTO_TOKEN", ""))
parser.add_argument("--max-mb", type=int, default=int(os.getenv("IPHONE_PHOTO_MAX_MB", "30")))
parser.add_argument(
"--clipboard",
choices=[CLIPBOARD_NONE, CLIPBOARD_IMAGE, CLIPBOARD_PATH, CLIPBOARD_TERMINAL_PATH, CLIPBOARD_FILE],
default=os.getenv("IPHONE_PHOTO_CLIPBOARD"),
)
parser.add_argument("--no-notify", action="store_true", default=env_flag("IPHONE_PHOTO_NO_NOTIFY"))
parser.add_argument("--reveal", action="store_true", default=env_flag("IPHONE_PHOTO_REVEAL"))
return parser.parse_args()
def build_profiles(args: argparse.Namespace) -> dict[str, Profile]:
profiles = {
name: apply_legacy_clipboard_overrides(profile)
for name, profile in profile_defaults().items()
}
if args.clipboard:
profiles = {
name: Profile(profile.name, profile.output_dir, args.clipboard, profile.notify, profile.reveal)
for name, profile in profiles.items()
}
if args.no_notify:
profiles = {
name: Profile(profile.name, profile.output_dir, profile.clipboard, False, profile.reveal)
for name, profile in profiles.items()
}
if args.reveal:
profiles = {
name: Profile(profile.name, profile.output_dir, profile.clipboard, profile.notify, True)
for name, profile in profiles.items()
}
return profiles
def main() -> None:
args = parse_args()
profiles = build_profiles(args)
default_profile = normalize_profile(args.profile)
if default_profile not in profiles:
known = ", ".join(sorted(profiles))
raise SystemExit(f"unknown default profile: {default_profile}; expected one of {known}")
output_dir_override = Path(args.output_dir).expanduser().resolve() if args.output_dir else None
server = UploadServer(
(args.host, args.port),
UploadHandler,
profiles,
default_profile,
output_dir_override,
args.token,
args.max_mb * 1024 * 1024,
)
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
print(f"default profile: {default_profile}", flush=True)
for profile in profiles.values():
output_dir = output_dir_override or profile.output_dir
print(
f"profile {profile.name}: dir={output_dir.expanduser().resolve()} "
f"clipboard={profile.clipboard} notify={profile.notify} reveal={profile.reveal}",
flush=True,
)
if not args.token:
print("warning: no token configured; anyone on this network can upload", flush=True)
server.serve_forever()
if __name__ == "__main__":
main()