Files

360 lines
12 KiB
Python

#!/usr/bin/env python3
"""Receive JPEG uploads into a local Mac photo inbox."""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import subprocess
import threading
from dataclasses import dataclass
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from tempfile import NamedTemporaryFile
from urllib.parse import parse_qs, urlparse
SCRIPT_DIR = Path(__file__).resolve().parent
COPY_FILES_HELPER = SCRIPT_DIR / "copy_files_to_clipboard.swift"
COPY_FILES_HELPER_BIN = SCRIPT_DIR / "copy_files_to_clipboard"
DEFAULT_ENV_FILE = SCRIPT_DIR / ".env"
DEFAULT_OUTPUT_DIR = Path.home() / "Pictures" / "Photo Inbox"
@dataclass(frozen=True)
class Config:
output_dir: Path
clipboard: bool
notify: bool
reveal: bool
@dataclass
class Batch:
output_dir: Path
paths: list[Path]
started_at: dt.datetime
updated_at: dt.datetime
timer: threading.Timer | None = None
@property
def count(self) -> int:
return len(self.paths)
def env_flag(name: str, default: bool = False) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def env_value(name: str) -> str | None:
return os.getenv(name)
def env_flag_value(name: str, default: bool = False) -> bool:
value = env_value(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def env_path(name: str, default: Path) -> Path:
value = env_value(name)
return Path(value).expanduser() if value else default
def env_str(name: str, default: str) -> str:
value = env_value(name)
return value if value is not None else default
def env_int(name: str, default: int) -> int:
return int(env_str(name, str(default)))
def env_float(name: str, default: float) -> float:
return float(env_str(name, str(default)))
def parse_env_line(line: str) -> tuple[str, str] | None:
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
return None
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
return None
if key.startswith("export "):
key = key.removeprefix("export ").strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}:
value = value[1:-1]
return key, value
def load_env_file(path: Path) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
parsed = parse_env_line(line)
if parsed is None:
continue
key, value = parsed
os.environ.setdefault(key, value)
def timestamp() -> str:
return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
def unique_path(output_dir: Path) -> Path:
candidate = output_dir / f"iphone-{timestamp()}.jpg"
counter = 1
while candidate.exists():
candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg"
counter += 1
return candidate
def looks_like_jpeg(data: bytes) -> bool:
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
def run_macos_action(command: list[str]) -> bool:
try:
result = subprocess.run(command, check=False, capture_output=True, text=True)
except OSError as error:
print(f"macOS action failed: {error}", flush=True)
return False
if result.returncode != 0:
error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
print(f"macOS action failed: {error}", flush=True)
return False
if env_flag_value("PHOTO_INBOX_DEBUG"):
output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
if output:
print(f"macOS action output: {output}", flush=True)
return True
def notify(title: str, message: str) -> bool:
script = f"display notification {json.dumps(message)} with title {json.dumps(title)}"
return run_macos_action(["osascript", "-e", script])
def reveal_in_finder(path: Path) -> bool:
return run_macos_action(["open", "-R", str(path)])
def copy_files_to_clipboard(paths: list[Path]) -> bool:
if not paths:
return True
helper = COPY_FILES_HELPER
if COPY_FILES_HELPER_BIN.exists():
source_mtime = COPY_FILES_HELPER.stat().st_mtime if COPY_FILES_HELPER.exists() else 0
helper_mtime = COPY_FILES_HELPER_BIN.stat().st_mtime
if helper_mtime >= source_mtime:
helper = COPY_FILES_HELPER_BIN
else:
print("compiled pasteboard helper is older than source; using Swift script", flush=True)
if helper.exists():
return run_macos_action([str(helper), *[str(path) for path in paths]])
print(f"native pasteboard helper missing: {COPY_FILES_HELPER_BIN}", flush=True)
return False
class BatchManager:
def __init__(self, debounce_seconds: float, config: Config) -> None:
self.debounce_seconds = debounce_seconds
self.config = config
self.lock = threading.Lock()
self.batch: Batch | None = None
def add(self, path: Path) -> tuple[int, list[Path]]:
with self.lock:
now = dt.datetime.now()
if self.batch is None:
self.batch = Batch(self.config.output_dir, [], now, now)
self.batch.paths.append(path)
self.batch.updated_at = now
if self.batch.timer is not None:
self.batch.timer.cancel()
self.batch.timer = threading.Timer(self.debounce_seconds, self.finish)
self.batch.timer.daemon = True
self.batch.timer.start()
return self.batch.count, list(self.batch.paths)
def finish(self) -> None:
with self.lock:
batch = self.batch
self.batch = None
if batch is None:
return
if self.config.notify:
plural = "photo" if batch.count == 1 else "photos"
message = f"{batch.count} {plural} ready; clipboard={'on' if self.config.clipboard else 'off'}"
if notify("Photo Inbox", message):
print(f"batch notification sent count={batch.count}", flush=True)
print(f"batch finalized count={batch.count} dir={batch.output_dir}", flush=True)
class UploadHandler(BaseHTTPRequestHandler):
server_version = "iPhonePhotoInbox/3.0"
def do_GET(self) -> None:
if self.path == "/health":
self.send_text(HTTPStatus.OK, "ok\n")
return
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path != "/upload":
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
return
query = parse_qs(parsed.query)
expected_token = self.server.upload_token
supplied_token = query.get("token", [""])[0]
if expected_token and supplied_token != expected_token:
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
return
content_length = self.headers.get("Content-Length")
if content_length is None:
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
return
try:
size = int(content_length)
except ValueError:
self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n")
return
if size <= 0 or size > self.server.max_bytes:
self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n")
return
data = self.rfile.read(size)
if not looks_like_jpeg(data):
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
return
output_dir = self.server.config.output_dir
output_dir.mkdir(parents=True, exist_ok=True)
target = unique_path(output_dir)
with NamedTemporaryFile(dir=output_dir, delete=False) as tmp:
tmp.write(data)
temp_path = Path(tmp.name)
temp_path.replace(target)
if self.server.config.reveal:
if reveal_in_finder(target):
print("revealed in Finder", flush=True)
batch_count, batch_paths = self.server.batch_manager.add(target)
if self.server.config.clipboard:
if copy_files_to_clipboard(batch_paths):
print(f"clipboard updated count={batch_count}", flush=True)
self.send_text(HTTPStatus.CREATED, f"{target}\n")
print(f"saved {target} batch_count={batch_count}", flush=True)
def log_message(self, format: str, *args: object) -> None:
print(f"{self.address_string()} - {format % args}", flush=True)
def send_text(self, status: HTTPStatus, body: str) -> None:
encoded = body.encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
class UploadServer(ThreadingHTTPServer):
def __init__(
self,
server_address: tuple[str, int],
handler_class: type[BaseHTTPRequestHandler],
config: Config,
upload_token: str,
max_bytes: int,
debounce_seconds: float,
) -> None:
super().__init__(server_address, handler_class)
self.config = config
self.upload_token = upload_token
self.max_bytes = max_bytes
self.batch_manager = BatchManager(debounce_seconds, config)
def parse_args() -> argparse.Namespace:
env_file = Path(env_str("PHOTO_INBOX_ENV_FILE", str(DEFAULT_ENV_FILE))).expanduser()
load_env_file(env_file)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--host", default=env_str("PHOTO_INBOX_HOST", "0.0.0.0"))
parser.add_argument("--port", type=int, default=env_int("PHOTO_INBOX_PORT", 8787))
parser.add_argument("--output-dir", type=Path, default=env_path("PHOTO_INBOX_DIR", DEFAULT_OUTPUT_DIR))
parser.add_argument("--token", default=env_str("PHOTO_INBOX_TOKEN", ""))
parser.add_argument("--max-mb", type=int, default=env_int("PHOTO_INBOX_MAX_MB", 30))
parser.add_argument("--debounce-seconds", type=float, default=env_float("PHOTO_INBOX_DEBOUNCE_SECONDS", 5))
parser.add_argument("--clipboard", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_CLIPBOARD", True))
parser.add_argument("--notify", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_NOTIFY", True))
parser.add_argument("--reveal", action="store_true", default=env_flag_value("PHOTO_INBOX_REVEAL"))
return parser.parse_args()
def build_config(args: argparse.Namespace) -> Config:
return Config(
output_dir=args.output_dir.expanduser().resolve(),
clipboard=args.clipboard,
notify=args.notify,
reveal=args.reveal,
)
def main() -> None:
args = parse_args()
config = build_config(args)
server = UploadServer(
(args.host, args.port),
UploadHandler,
config,
args.token,
args.max_mb * 1024 * 1024,
args.debounce_seconds,
)
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
print(f"saving to: {config.output_dir}", flush=True)
print(f"debounce seconds: {args.debounce_seconds:g}", flush=True)
print(f"clipboard: {config.clipboard}", flush=True)
print(f"notify: {config.notify}", flush=True)
print(f"reveal: {config.reveal}", flush=True)
if not args.token:
print("warning: no token configured; anyone on this network can upload", flush=True)
server.serve_forever()
if __name__ == "__main__":
main()