339 lines
12 KiB
Python
Executable File
339 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Receive JPEG uploads from iPhone Shortcuts into local Mac inboxes."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
WORKSPACE_DIR = Path(__file__).resolve().parents[2]
|
|
DEFAULT_OPENCODE_DIR = WORKSPACE_DIR / "ai" / "inbox" / "photos"
|
|
DEFAULT_MATTERMOST_DIR = Path.home() / "Pictures" / "iPhone Inbox"
|
|
|
|
CLIPBOARD_NONE = "none"
|
|
CLIPBOARD_IMAGE = "image"
|
|
CLIPBOARD_PATH = "path"
|
|
CLIPBOARD_TERMINAL_PATH = "terminal-path"
|
|
CLIPBOARD_FILE = "file"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Profile:
|
|
name: str
|
|
output_dir: Path
|
|
clipboard: str
|
|
notify: bool = True
|
|
reveal: bool = False
|
|
|
|
|
|
def env_flag(name: str, default: bool = False) -> bool:
|
|
value = os.getenv(name)
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def env_path(name: str, default: Path) -> Path:
|
|
value = os.getenv(name)
|
|
return Path(value).expanduser() if value else default
|
|
|
|
|
|
def profile_defaults() -> dict[str, Profile]:
|
|
opencode_dir = env_path("IPHONE_PHOTO_OPENCODE_DIR", DEFAULT_OPENCODE_DIR)
|
|
mattermost_dir = env_path("IPHONE_PHOTO_MATTERMOST_DIR", DEFAULT_MATTERMOST_DIR)
|
|
general_dir = env_path("IPHONE_PHOTO_GENERAL_DIR", DEFAULT_MATTERMOST_DIR)
|
|
return {
|
|
"opencode": Profile("opencode", opencode_dir, CLIPBOARD_TERMINAL_PATH),
|
|
"mattermost": Profile("mattermost", mattermost_dir, CLIPBOARD_IMAGE),
|
|
"general": Profile("general", general_dir, CLIPBOARD_NONE),
|
|
}
|
|
|
|
|
|
def timestamp() -> str:
|
|
return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
|
|
|
|
|
|
def unique_path(output_dir: Path) -> Path:
|
|
candidate = output_dir / f"iphone-{timestamp()}.jpg"
|
|
counter = 1
|
|
while candidate.exists():
|
|
candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg"
|
|
counter += 1
|
|
return candidate
|
|
|
|
|
|
def looks_like_jpeg(data: bytes) -> bool:
|
|
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
|
|
|
|
|
|
def normalize_profile(value: str) -> str:
|
|
return value.strip().lower() or "opencode"
|
|
|
|
|
|
def run_macos_action(command: list[str]) -> bool:
|
|
try:
|
|
result = subprocess.run(command, check=False, capture_output=True, text=True)
|
|
except OSError as error:
|
|
print(f"macOS action failed: {error}", flush=True)
|
|
return False
|
|
|
|
if result.returncode != 0:
|
|
error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
|
|
print(f"macOS action failed: {error}", flush=True)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def notify(title: str, message: str) -> bool:
|
|
script = f"display notification {json.dumps(message)} with title {json.dumps(title)}"
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
def reveal_in_finder(path: Path) -> bool:
|
|
return run_macos_action(["open", "-R", str(path)])
|
|
|
|
|
|
def copy_image_to_clipboard(path: Path) -> bool:
|
|
script = f"set the clipboard to (read (POSIX file {json.dumps(str(path))}) as JPEG picture)"
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
def copy_file_to_clipboard(path: Path) -> bool:
|
|
script = f"""
|
|
set theFile to POSIX file {json.dumps(str(path))} as alias
|
|
tell application "Finder"
|
|
set the clipboard to {{theFile}}
|
|
end tell
|
|
"""
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
def copy_text_to_clipboard(text: str) -> bool:
|
|
script = f"set the clipboard to {json.dumps(text)}"
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
def terminal_path_reference(path: Path) -> str:
|
|
return shlex.quote(str(path))
|
|
|
|
|
|
def apply_clipboard_mode(mode: str, path: Path) -> bool:
|
|
if mode == CLIPBOARD_NONE:
|
|
return True
|
|
if mode == CLIPBOARD_IMAGE:
|
|
return copy_image_to_clipboard(path)
|
|
if mode == CLIPBOARD_PATH:
|
|
return copy_text_to_clipboard(str(path))
|
|
if mode == CLIPBOARD_TERMINAL_PATH:
|
|
return copy_text_to_clipboard(terminal_path_reference(path))
|
|
if mode == CLIPBOARD_FILE:
|
|
return copy_file_to_clipboard(path)
|
|
print(f"unsupported clipboard mode: {mode}", flush=True)
|
|
return False
|
|
|
|
|
|
class UploadHandler(BaseHTTPRequestHandler):
|
|
server_version = "iPhonePhotoInbox/2.0"
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path == "/health":
|
|
self.send_text(HTTPStatus.OK, "ok\n")
|
|
return
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path != "/upload":
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
return
|
|
|
|
query = parse_qs(parsed.query)
|
|
expected_token = self.server.upload_token
|
|
supplied_token = query.get("token", [""])[0]
|
|
if expected_token and supplied_token != expected_token:
|
|
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
|
|
return
|
|
|
|
profile_name = normalize_profile(query.get("profile", [self.server.default_profile])[0])
|
|
profile = self.server.profiles.get(profile_name)
|
|
if profile is None:
|
|
known = ", ".join(sorted(self.server.profiles))
|
|
self.send_text(HTTPStatus.BAD_REQUEST, f"unknown profile: {profile_name}; expected one of {known}\n")
|
|
return
|
|
|
|
content_length = self.headers.get("Content-Length")
|
|
if content_length is None:
|
|
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
|
|
return
|
|
|
|
try:
|
|
size = int(content_length)
|
|
except ValueError:
|
|
self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n")
|
|
return
|
|
|
|
if size <= 0 or size > self.server.max_bytes:
|
|
self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n")
|
|
return
|
|
|
|
data = self.rfile.read(size)
|
|
if not looks_like_jpeg(data):
|
|
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
|
|
return
|
|
|
|
output_dir = self.server.output_dir_override or profile.output_dir
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
target = unique_path(output_dir)
|
|
with NamedTemporaryFile(dir=output_dir, delete=False) as tmp:
|
|
tmp.write(data)
|
|
temp_path = Path(tmp.name)
|
|
temp_path.replace(target)
|
|
|
|
if profile.notify:
|
|
if notify("iPhone Photo Inbox", f"{profile.name}: {target.name}"):
|
|
print("notification sent", flush=True)
|
|
if profile.reveal:
|
|
if reveal_in_finder(target):
|
|
print("revealed in Finder", flush=True)
|
|
if apply_clipboard_mode(profile.clipboard, target):
|
|
print(f"clipboard mode applied: {profile.clipboard}", flush=True)
|
|
|
|
self.send_text(HTTPStatus.CREATED, f"{target}\n")
|
|
print(f"saved {target} profile={profile.name}", flush=True)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {format % args}", flush=True)
|
|
|
|
def send_text(self, status: HTTPStatus, body: str) -> None:
|
|
encoded = body.encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(encoded)))
|
|
self.end_headers()
|
|
self.wfile.write(encoded)
|
|
|
|
|
|
class UploadServer(ThreadingHTTPServer):
|
|
def __init__(
|
|
self,
|
|
server_address: tuple[str, int],
|
|
handler_class: type[BaseHTTPRequestHandler],
|
|
profiles: dict[str, Profile],
|
|
default_profile: str,
|
|
output_dir_override: Path | None,
|
|
upload_token: str,
|
|
max_bytes: int,
|
|
) -> None:
|
|
super().__init__(server_address, handler_class)
|
|
self.profiles = profiles
|
|
self.default_profile = default_profile
|
|
self.output_dir_override = output_dir_override
|
|
self.upload_token = upload_token
|
|
self.max_bytes = max_bytes
|
|
|
|
|
|
def apply_legacy_clipboard_overrides(profile: Profile) -> Profile:
|
|
clipboard = profile.clipboard
|
|
if env_flag("IPHONE_PHOTO_COPY"):
|
|
clipboard = CLIPBOARD_IMAGE
|
|
if env_flag("IPHONE_PHOTO_COPY_FILE"):
|
|
clipboard = CLIPBOARD_FILE
|
|
if env_flag("IPHONE_PHOTO_COPY_PATH"):
|
|
clipboard = CLIPBOARD_PATH
|
|
if env_flag("IPHONE_PHOTO_COPY_TERMINAL_PATH"):
|
|
clipboard = CLIPBOARD_TERMINAL_PATH
|
|
|
|
notify_enabled = env_flag("IPHONE_PHOTO_NOTIFY", profile.notify)
|
|
reveal_enabled = env_flag("IPHONE_PHOTO_REVEAL", profile.reveal)
|
|
return Profile(profile.name, profile.output_dir, clipboard, notify_enabled, reveal_enabled)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--host", default=os.getenv("IPHONE_PHOTO_HOST", "0.0.0.0"))
|
|
parser.add_argument("--port", type=int, default=int(os.getenv("IPHONE_PHOTO_PORT", "8787")))
|
|
parser.add_argument("--profile", default=os.getenv("IPHONE_PHOTO_PROFILE", "opencode"))
|
|
parser.add_argument("--output-dir", type=Path, default=os.getenv("IPHONE_PHOTO_OUTPUT_DIR"))
|
|
parser.add_argument("--token", default=os.getenv("IPHONE_PHOTO_TOKEN", ""))
|
|
parser.add_argument("--max-mb", type=int, default=int(os.getenv("IPHONE_PHOTO_MAX_MB", "30")))
|
|
parser.add_argument(
|
|
"--clipboard",
|
|
choices=[CLIPBOARD_NONE, CLIPBOARD_IMAGE, CLIPBOARD_PATH, CLIPBOARD_TERMINAL_PATH, CLIPBOARD_FILE],
|
|
default=os.getenv("IPHONE_PHOTO_CLIPBOARD"),
|
|
)
|
|
parser.add_argument("--no-notify", action="store_true", default=env_flag("IPHONE_PHOTO_NO_NOTIFY"))
|
|
parser.add_argument("--reveal", action="store_true", default=env_flag("IPHONE_PHOTO_REVEAL"))
|
|
return parser.parse_args()
|
|
|
|
|
|
def build_profiles(args: argparse.Namespace) -> dict[str, Profile]:
|
|
profiles = {
|
|
name: apply_legacy_clipboard_overrides(profile)
|
|
for name, profile in profile_defaults().items()
|
|
}
|
|
|
|
if args.clipboard:
|
|
profiles = {
|
|
name: Profile(profile.name, profile.output_dir, args.clipboard, profile.notify, profile.reveal)
|
|
for name, profile in profiles.items()
|
|
}
|
|
if args.no_notify:
|
|
profiles = {
|
|
name: Profile(profile.name, profile.output_dir, profile.clipboard, False, profile.reveal)
|
|
for name, profile in profiles.items()
|
|
}
|
|
if args.reveal:
|
|
profiles = {
|
|
name: Profile(profile.name, profile.output_dir, profile.clipboard, profile.notify, True)
|
|
for name, profile in profiles.items()
|
|
}
|
|
return profiles
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
profiles = build_profiles(args)
|
|
default_profile = normalize_profile(args.profile)
|
|
if default_profile not in profiles:
|
|
known = ", ".join(sorted(profiles))
|
|
raise SystemExit(f"unknown default profile: {default_profile}; expected one of {known}")
|
|
|
|
output_dir_override = Path(args.output_dir).expanduser().resolve() if args.output_dir else None
|
|
server = UploadServer(
|
|
(args.host, args.port),
|
|
UploadHandler,
|
|
profiles,
|
|
default_profile,
|
|
output_dir_override,
|
|
args.token,
|
|
args.max_mb * 1024 * 1024,
|
|
)
|
|
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
|
|
print(f"default profile: {default_profile}", flush=True)
|
|
for profile in profiles.values():
|
|
output_dir = output_dir_override or profile.output_dir
|
|
print(
|
|
f"profile {profile.name}: dir={output_dir.expanduser().resolve()} "
|
|
f"clipboard={profile.clipboard} notify={profile.notify} reveal={profile.reveal}",
|
|
flush=True,
|
|
)
|
|
if not args.token:
|
|
print("warning: no token configured; anyone on this network can upload", flush=True)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|