210 lines
7.1 KiB
Python
Executable File
210 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Receive JPEG uploads from iPhone Shortcuts into the workspace inbox."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
DEFAULT_OUTPUT_DIR = Path(__file__).resolve().parents[2] / "ai" / "inbox" / "photos"
|
|
|
|
|
|
def timestamp() -> str:
|
|
return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
|
|
|
|
|
|
def unique_path(output_dir: Path) -> Path:
|
|
candidate = output_dir / f"iphone-{timestamp()}.jpg"
|
|
counter = 1
|
|
while candidate.exists():
|
|
candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg"
|
|
counter += 1
|
|
return candidate
|
|
|
|
|
|
def looks_like_jpeg(data: bytes) -> bool:
|
|
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
|
|
|
|
|
|
def env_flag(name: str) -> bool:
|
|
return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def run_macos_action(command: list[str]) -> bool:
|
|
try:
|
|
result = subprocess.run(command, check=False, capture_output=True, text=True)
|
|
except OSError as error:
|
|
print(f"macOS action failed: {error}", flush=True)
|
|
return False
|
|
|
|
if result.returncode != 0:
|
|
error = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
|
|
print(f"macOS action failed: {error}", flush=True)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def notify(title: str, message: str) -> bool:
|
|
script = f"display notification {json.dumps(message)} with title {json.dumps(title)}"
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
def reveal_in_finder(path: Path) -> bool:
|
|
return run_macos_action(["open", "-R", str(path)])
|
|
|
|
|
|
def copy_file_to_clipboard(path: Path) -> bool:
|
|
script = f"""
|
|
set theFile to POSIX file {json.dumps(str(path))} as alias
|
|
tell application "Finder"
|
|
set the clipboard to {{theFile}}
|
|
end tell
|
|
"""
|
|
return run_macos_action(["osascript", "-e", script])
|
|
|
|
|
|
class UploadHandler(BaseHTTPRequestHandler):
|
|
server_version = "iPhonePhotoInbox/1.0"
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path == "/health":
|
|
self.send_text(HTTPStatus.OK, "ok\n")
|
|
return
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path != "/upload":
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
return
|
|
|
|
expected_token = self.server.upload_token
|
|
supplied_token = parse_qs(parsed.query).get("token", [""])[0]
|
|
if expected_token and supplied_token != expected_token:
|
|
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
|
|
return
|
|
|
|
content_length = self.headers.get("Content-Length")
|
|
if content_length is None:
|
|
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
|
|
return
|
|
|
|
try:
|
|
size = int(content_length)
|
|
except ValueError:
|
|
self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n")
|
|
return
|
|
|
|
if size <= 0 or size > self.server.max_bytes:
|
|
self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n")
|
|
return
|
|
|
|
data = self.rfile.read(size)
|
|
if not looks_like_jpeg(data):
|
|
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
|
|
return
|
|
|
|
self.server.output_dir.mkdir(parents=True, exist_ok=True)
|
|
target = unique_path(self.server.output_dir)
|
|
with NamedTemporaryFile(dir=self.server.output_dir, delete=False) as tmp:
|
|
tmp.write(data)
|
|
temp_path = Path(tmp.name)
|
|
temp_path.replace(target)
|
|
|
|
if self.server.notify_on_upload:
|
|
if notify("iPhone Photo Inbox", target.name):
|
|
print("notification sent", flush=True)
|
|
if self.server.reveal_on_upload:
|
|
if reveal_in_finder(target):
|
|
print("revealed in Finder", flush=True)
|
|
if self.server.copy_on_upload:
|
|
if copy_file_to_clipboard(target):
|
|
print("copied file to clipboard", flush=True)
|
|
|
|
self.send_text(HTTPStatus.CREATED, f"{target}\n")
|
|
print(f"saved {target}", flush=True)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {format % args}", flush=True)
|
|
|
|
def send_text(self, status: HTTPStatus, body: str) -> None:
|
|
encoded = body.encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(encoded)))
|
|
self.end_headers()
|
|
self.wfile.write(encoded)
|
|
|
|
|
|
class UploadServer(ThreadingHTTPServer):
|
|
def __init__(
|
|
self,
|
|
server_address: tuple[str, int],
|
|
handler_class: type[BaseHTTPRequestHandler],
|
|
output_dir: Path,
|
|
upload_token: str,
|
|
max_bytes: int,
|
|
notify_on_upload: bool,
|
|
reveal_on_upload: bool,
|
|
copy_on_upload: bool,
|
|
) -> None:
|
|
super().__init__(server_address, handler_class)
|
|
self.output_dir = output_dir
|
|
self.upload_token = upload_token
|
|
self.max_bytes = max_bytes
|
|
self.notify_on_upload = notify_on_upload
|
|
self.reveal_on_upload = reveal_on_upload
|
|
self.copy_on_upload = copy_on_upload
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--host", default=os.getenv("IPHONE_PHOTO_HOST", "0.0.0.0"))
|
|
parser.add_argument("--port", type=int, default=int(os.getenv("IPHONE_PHOTO_PORT", "8787")))
|
|
parser.add_argument("--output-dir", type=Path, default=Path(os.getenv("IPHONE_PHOTO_OUTPUT_DIR", DEFAULT_OUTPUT_DIR)))
|
|
parser.add_argument("--token", default=os.getenv("IPHONE_PHOTO_TOKEN", ""))
|
|
parser.add_argument("--max-mb", type=int, default=int(os.getenv("IPHONE_PHOTO_MAX_MB", "30")))
|
|
parser.add_argument("--notify", action="store_true", default=env_flag("IPHONE_PHOTO_NOTIFY"))
|
|
parser.add_argument("--reveal", action="store_true", default=env_flag("IPHONE_PHOTO_REVEAL"))
|
|
parser.add_argument("--copy", action="store_true", default=env_flag("IPHONE_PHOTO_COPY"))
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
server = UploadServer(
|
|
(args.host, args.port),
|
|
UploadHandler,
|
|
args.output_dir.expanduser().resolve(),
|
|
args.token,
|
|
args.max_mb * 1024 * 1024,
|
|
args.notify,
|
|
args.reveal,
|
|
args.copy,
|
|
)
|
|
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
|
|
print(f"saving JPEGs to {server.output_dir}", flush=True)
|
|
if args.notify:
|
|
print("notifications enabled", flush=True)
|
|
if args.reveal:
|
|
print("Finder reveal enabled", flush=True)
|
|
if args.copy:
|
|
print("clipboard copy enabled", flush=True)
|
|
if not args.token:
|
|
print("warning: no token configured; anyone on this network can upload", flush=True)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|