142 lines
4.8 KiB
Python
Executable File
142 lines
4.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Receive JPEG uploads from iPhone Shortcuts into the workspace inbox."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import os
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
DEFAULT_OUTPUT_DIR = Path(__file__).resolve().parents[2] / "ai" / "inbox" / "photos"
|
|
|
|
|
|
def timestamp() -> str:
|
|
return dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
|
|
|
|
|
|
def unique_path(output_dir: Path) -> Path:
|
|
candidate = output_dir / f"iphone-{timestamp()}.jpg"
|
|
counter = 1
|
|
while candidate.exists():
|
|
candidate = output_dir / f"iphone-{timestamp()}-{counter}.jpg"
|
|
counter += 1
|
|
return candidate
|
|
|
|
|
|
def looks_like_jpeg(data: bytes) -> bool:
|
|
return data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9")
|
|
|
|
|
|
class UploadHandler(BaseHTTPRequestHandler):
|
|
server_version = "iPhonePhotoInbox/1.0"
|
|
|
|
def do_GET(self) -> None:
|
|
if self.path == "/health":
|
|
self.send_text(HTTPStatus.OK, "ok\n")
|
|
return
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
|
|
def do_POST(self) -> None:
|
|
parsed = urlparse(self.path)
|
|
if parsed.path != "/upload":
|
|
self.send_text(HTTPStatus.NOT_FOUND, "not found\n")
|
|
return
|
|
|
|
expected_token = self.server.upload_token
|
|
supplied_token = parse_qs(parsed.query).get("token", [""])[0]
|
|
if expected_token and supplied_token != expected_token:
|
|
self.send_text(HTTPStatus.UNAUTHORIZED, "bad token\n")
|
|
return
|
|
|
|
content_length = self.headers.get("Content-Length")
|
|
if content_length is None:
|
|
self.send_text(HTTPStatus.LENGTH_REQUIRED, "missing content length\n")
|
|
return
|
|
|
|
try:
|
|
size = int(content_length)
|
|
except ValueError:
|
|
self.send_text(HTTPStatus.BAD_REQUEST, "invalid content length\n")
|
|
return
|
|
|
|
if size <= 0 or size > self.server.max_bytes:
|
|
self.send_text(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "invalid upload size\n")
|
|
return
|
|
|
|
data = self.rfile.read(size)
|
|
if not looks_like_jpeg(data):
|
|
self.send_text(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "expected jpeg\n")
|
|
return
|
|
|
|
self.server.output_dir.mkdir(parents=True, exist_ok=True)
|
|
target = unique_path(self.server.output_dir)
|
|
with NamedTemporaryFile(dir=self.server.output_dir, delete=False) as tmp:
|
|
tmp.write(data)
|
|
temp_path = Path(tmp.name)
|
|
temp_path.replace(target)
|
|
|
|
self.send_text(HTTPStatus.CREATED, f"{target}\n")
|
|
print(f"saved {target}", flush=True)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {format % args}", flush=True)
|
|
|
|
def send_text(self, status: HTTPStatus, body: str) -> None:
|
|
encoded = body.encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(encoded)))
|
|
self.end_headers()
|
|
self.wfile.write(encoded)
|
|
|
|
|
|
class UploadServer(ThreadingHTTPServer):
|
|
def __init__(
|
|
self,
|
|
server_address: tuple[str, int],
|
|
handler_class: type[BaseHTTPRequestHandler],
|
|
output_dir: Path,
|
|
upload_token: str,
|
|
max_bytes: int,
|
|
) -> None:
|
|
super().__init__(server_address, handler_class)
|
|
self.output_dir = output_dir
|
|
self.upload_token = upload_token
|
|
self.max_bytes = max_bytes
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--host", default=os.getenv("IPHONE_PHOTO_HOST", "0.0.0.0"))
|
|
parser.add_argument("--port", type=int, default=int(os.getenv("IPHONE_PHOTO_PORT", "8787")))
|
|
parser.add_argument("--output-dir", type=Path, default=Path(os.getenv("IPHONE_PHOTO_OUTPUT_DIR", DEFAULT_OUTPUT_DIR)))
|
|
parser.add_argument("--token", default=os.getenv("IPHONE_PHOTO_TOKEN", ""))
|
|
parser.add_argument("--max-mb", type=int, default=int(os.getenv("IPHONE_PHOTO_MAX_MB", "30")))
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
server = UploadServer(
|
|
(args.host, args.port),
|
|
UploadHandler,
|
|
args.output_dir.expanduser().resolve(),
|
|
args.token,
|
|
args.max_mb * 1024 * 1024,
|
|
)
|
|
print(f"listening on http://{args.host}:{args.port}/upload", flush=True)
|
|
print(f"saving JPEGs to {server.output_dir}", flush=True)
|
|
if not args.token:
|
|
print("warning: no token configured; anyone on this network can upload", flush=True)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|