360 lines
12 KiB
Python
360 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Receive JPEG uploads into a local Mac photo inbox."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
COPY_FILES_HELPER = SCRIPT_DIR / "copy_files_to_clipboard.swift"
|
|
COPY_FILES_HELPER_BIN = SCRIPT_DIR / "copy_files_to_clipboard"
|
|
DEFAULT_ENV_FILE = SCRIPT_DIR / ".env"
|
|
DEFAULT_OUTPUT_DIR = Path.home() / "Pictures" / "Photo Inbox"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Config:
|
|
output_dir: Path
|
|
clipboard: bool
|
|
notify: bool
|
|
reveal: bool
|
|
|
|
|
|
@dataclass
|
|
class Batch:
|
|
output_dir: Path
|
|
paths: list[Path]
|
|
started_at: dt.datetime
|
|
updated_at: dt.datetime
|
|
timer: threading.Timer | None = None
|
|
|
|
@property
|
|
def count(self) -> int:
|
|
return len(self.paths)
|
|
|
|
|
|
def env_flag(name: str, default: bool = False) -> bool:
|
|
value = os.getenv(name)
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def env_value(name: str) -> str | None:
|
|
return os.getenv(name)
|
|
|
|
|
|
def env_flag_value(name: str, default: bool = False) -> bool:
|
|
value = env_value(name)
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def env_path(name: str, default: Path) -> Path:
|
|
value = env_value(name)
|
|
return Path(value).expanduser() if value else default
|
|
|
|
|
|
def env_str(name: str, default: str) -> str:
|
|
value = env_value(name)
|
|
return value if value is not None else default
|
|
|
|
|
|
def env_int(name: str, default: int) -> int:
|
|
return int(env_str(name, str(default)))
|
|
|
|
|
|
def env_float(name: str, default: float) -> float:
|
|
return float(env_str(name, str(default)))
|
|
|
|
|
|
def parse_env_line(line: str) -> tuple[str, str] | None:
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
|
return None
|
|
|
|
key, value = stripped.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
if not key:
|
|
return None
|
|
if key.startswith("export "):
|
|
key = key.removeprefix("export ").strip()
|
|
if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}:
|
|
value = value[1:-1]
|
|
return key, value
|
|
|
|
|
|
def load_env_file(path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
parsed = parse_env_line(line)
|
|
if parsed is None:
|
|
continue
|
|
key, value = parsed
|
|
os.environ.setdefault(key, value)
|
|
|
|
|
|
def timestamp() -> str:
|
|
return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
|
|
|
|
|
|
def unique_path(output_dir: Path) -> Path:
|
|
candidate = output_dir / f"iphone-{timestamp()}.jpg"
|
|
counter = 1
|
|
while candidate.exists():
|
|
candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg"
|
|
counter += 1
|
|
return candidate
|
|
|
|
|
|
def looks_like_jpeg(data: bytes) -> bool:
|
|
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
|
|
|
|
|
|
def run_macos_action(command: list[str]) -> bool:
|
|
try:
|
|
result = subprocess.run(command, check=False, capture_output=True, text=True)
|
|
except OSError as error:
|
|
print(f"macOS action failed: {error}", flush=True)
|
|
return False
|
|
|
|
if result.returncode != 0:
|
|
error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
|
|
print(f"macOS action failed: {error}", flush=True)
|
|
return False
|
|
|
|
if env_flag_value("PHOTO_INBOX_DEBUG"):
|
|
output = "\n".join(part for part in [result.stdout.strip(), result.stderr.strip()] if part)
|
|
if output:
|
|
print(f"macOS action output: {output}", flush=True)
|
|
|
|
return True
|
|
|
|
|
|
def notify(title: str, message: str) -> bool:
|
|
script = f"display notification {json.dumps(message)} with title {json.dumps(title)}"
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
def reveal_in_finder(path: Path) -> bool:
|
|
return run_macos_action(["open", "-R", str(path)])
|
|
|
|
|
|
def copy_files_to_clipboard(paths: list[Path]) -> bool:
|
|
if not paths:
|
|
return True
|
|
|
|
helper = COPY_FILES_HELPER
|
|
if COPY_FILES_HELPER_BIN.exists():
|
|
source_mtime = COPY_FILES_HELPER.stat().st_mtime if COPY_FILES_HELPER.exists() else 0
|
|
helper_mtime = COPY_FILES_HELPER_BIN.stat().st_mtime
|
|
if helper_mtime >= source_mtime:
|
|
helper = COPY_FILES_HELPER_BIN
|
|
else:
|
|
print("compiled pasteboard helper is older than source; using Swift script", flush=True)
|
|
|
|
if helper.exists():
|
|
return run_macos_action([str(helper), *[str(path) for path in paths]])
|
|
|
|
print(f"native pasteboard helper missing: {COPY_FILES_HELPER_BIN}", flush=True)
|
|
return False
|
|
|
|
|
|
class BatchManager:
|
|
def __init__(self, debounce_seconds: float, config: Config) -> None:
|
|
self.debounce_seconds = debounce_seconds
|
|
self.config = config
|
|
self.lock = threading.Lock()
|
|
self.batch: Batch | None = None
|
|
|
|
def add(self, path: Path) -> tuple[int, list[Path]]:
|
|
with self.lock:
|
|
now = dt.datetime.now()
|
|
if self.batch is None:
|
|
self.batch = Batch(self.config.output_dir, [], now, now)
|
|
|
|
self.batch.paths.append(path)
|
|
self.batch.updated_at = now
|
|
|
|
if self.batch.timer is not None:
|
|
self.batch.timer.cancel()
|
|
self.batch.timer = threading.Timer(self.debounce_seconds, self.finish)
|
|
self.batch.timer.daemon = True
|
|
self.batch.timer.start()
|
|
|
|
return self.batch.count, list(self.batch.paths)
|
|
|
|
def finish(self) -> None:
|
|
with self.lock:
|
|
batch = self.batch
|
|
self.batch = None
|
|
if batch is None:
|
|
return
|
|
|
|
if self.config.notify:
|
|
plural = "photo" if batch.count == 1 else "photos"
|
|
message = f"{batch.count} {plural} ready; clipboard={'on' if self.config.clipboard else 'off'}"
|
|
if notify("Photo Inbox", message):
|
|
print(f"batch notification sent count={batch.count}", flush=True)
|
|
|
|
print(f"batch finalized count={batch.count} dir={batch.output_dir}", flush=True)
|
|
|
|
|
|
class UploadHandler(BaseHTTPRequestHandler):
|
|
server_version = "iPhonePhotoInbox/3.0"
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path == "/health":
|
|
self.send_text(HTTPStatus.OK, "ok\n")
|
|
return
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path != "/upload":
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
return
|
|
|
|
query = parse_qs(parsed.query)
|
|
expected_token = self.server.upload_token
|
|
supplied_token = query.get("token", [""])[0]
|
|
if expected_token and supplied_token != expected_token:
|
|
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
|
|
return
|
|
|
|
content_length = self.headers.get("Content-Length")
|
|
if content_length is None:
|
|
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
|
|
return
|
|
|
|
try:
|
|
size = int(content_length)
|
|
except ValueError:
|
|
self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n")
|
|
return
|
|
|
|
if size <= 0 or size > self.server.max_bytes:
|
|
self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n")
|
|
return
|
|
|
|
data = self.rfile.read(size)
|
|
if not looks_like_jpeg(data):
|
|
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
|
|
return
|
|
|
|
output_dir = self.server.config.output_dir
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
target = unique_path(output_dir)
|
|
with NamedTemporaryFile(dir=output_dir, delete=False) as tmp:
|
|
tmp.write(data)
|
|
temp_path = Path(tmp.name)
|
|
temp_path.replace(target)
|
|
|
|
if self.server.config.reveal:
|
|
if reveal_in_finder(target):
|
|
print("revealed in Finder", flush=True)
|
|
|
|
batch_count, batch_paths = self.server.batch_manager.add(target)
|
|
if self.server.config.clipboard:
|
|
if copy_files_to_clipboard(batch_paths):
|
|
print(f"clipboard updated count={batch_count}", flush=True)
|
|
|
|
self.send_text(HTTPStatus.CREATED, f"{target}\n")
|
|
print(f"saved {target} batch_count={batch_count}", flush=True)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {format % args}", flush=True)
|
|
|
|
def send_text(self, status: HTTPStatus, body: str) -> None:
|
|
encoded = body.encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(encoded)))
|
|
self.end_headers()
|
|
self.wfile.write(encoded)
|
|
|
|
|
|
class UploadServer(ThreadingHTTPServer):
|
|
def __init__(
|
|
self,
|
|
server_address: tuple[str, int],
|
|
handler_class: type[BaseHTTPRequestHandler],
|
|
config: Config,
|
|
upload_token: str,
|
|
max_bytes: int,
|
|
debounce_seconds: float,
|
|
) -> None:
|
|
super().__init__(server_address, handler_class)
|
|
self.config = config
|
|
self.upload_token = upload_token
|
|
self.max_bytes = max_bytes
|
|
self.batch_manager = BatchManager(debounce_seconds, config)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
env_file = Path(env_str("PHOTO_INBOX_ENV_FILE", str(DEFAULT_ENV_FILE))).expanduser()
|
|
load_env_file(env_file)
|
|
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--host", default=env_str("PHOTO_INBOX_HOST", "0.0.0.0"))
|
|
parser.add_argument("--port", type=int, default=env_int("PHOTO_INBOX_PORT", 8787))
|
|
parser.add_argument("--output-dir", type=Path, default=env_path("PHOTO_INBOX_DIR", DEFAULT_OUTPUT_DIR))
|
|
parser.add_argument("--token", default=env_str("PHOTO_INBOX_TOKEN", ""))
|
|
parser.add_argument("--max-mb", type=int, default=env_int("PHOTO_INBOX_MAX_MB", 30))
|
|
parser.add_argument("--debounce-seconds", type=float, default=env_float("PHOTO_INBOX_DEBOUNCE_SECONDS", 5))
|
|
parser.add_argument("--clipboard", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_CLIPBOARD", True))
|
|
parser.add_argument("--notify", action=argparse.BooleanOptionalAction, default=env_flag_value("PHOTO_INBOX_NOTIFY", True))
|
|
parser.add_argument("--reveal", action="store_true", default=env_flag_value("PHOTO_INBOX_REVEAL"))
|
|
return parser.parse_args()
|
|
|
|
|
|
def build_config(args: argparse.Namespace) -> Config:
|
|
return Config(
|
|
output_dir=args.output_dir.expanduser().resolve(),
|
|
clipboard=args.clipboard,
|
|
notify=args.notify,
|
|
reveal=args.reveal,
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
config = build_config(args)
|
|
server = UploadServer(
|
|
(args.host, args.port),
|
|
UploadHandler,
|
|
config,
|
|
args.token,
|
|
args.max_mb * 1024 * 1024,
|
|
args.debounce_seconds,
|
|
)
|
|
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
|
|
print(f"saving to: {config.output_dir}", flush=True)
|
|
print(f"debounce seconds: {args.debounce_seconds:g}", flush=True)
|
|
print(f"clipboard: {config.clipboard}", flush=True)
|
|
print(f"notify: {config.notify}", flush=True)
|
|
print(f"reveal: {config.reveal}", flush=True)
|
|
if not args.token:
|
|
print("warning: no token configured; anyone on this network can upload", flush=True)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|