#!/usr/bin/env -S uv run # /// script # requires-python = ">=3.15" # dependencies = [ # "python-hcl2==4.*", # "requests==2.*", # ] # /// # See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference # https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey from __future__ import annotations import argparse import random import subprocess import sys import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path import hcl2 import requests def load_hcl(path: Path) -> dict: with path.open("r", encoding="utf-8") as handle: return hcl2.load(handle) def get_variable_default(hcl_data: dict, name: str) -> str | None: for variable_block in hcl_data.get("variable", []): if name in variable_block: return variable_block[name].get("default") return None def main() -> int: parser = argparse.ArgumentParser( description="Unlock LUKS after install via Proxmox API (setup stage)." ) parser.add_argument( "-t", "--template", required=True, help="Path to directory containing variables.pkr.hcl (also passed to mise build).", ) parser.add_argument( "--luks-wait-seconds", type=int, default=45, help="Seconds to wait before sending the LUKS password (default: 45).", ) args = parser.parse_args() script_root = Path(__file__).resolve().parents[1] variables_common_path = script_root / "variables-common.pkr.hcl" credentials_path = script_root / "credentials.auto.pkrvars.hcl" vars_dir = Path(args.template) if not vars_dir.is_absolute(): vars_dir = script_root / vars_dir variables_path = vars_dir / "variables.pkr.hcl" variables_common = load_hcl(variables_common_path) credentials = load_hcl(credentials_path) variables = load_hcl(variables_path) proxmox_api_url = get_variable_default(variables_common, "proxmox_api_url") proxmox_skip_tls_verify = ( get_variable_default(variables_common, "proxmox_skip_tls_verify") or False ) proxmox_node = get_variable_default(variables, "proxmox_node") template_vm_id = get_variable_default(variables, "template_vm_id") _ = proxmox_api_url, proxmox_node, template_vm_id, credentials server_event = threading.Event() class InstallFinishedHandler(BaseHTTPRequestHandler): def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler if self.path != "/install_finished": self.send_response(404) self.end_headers() self.wfile.write(b"Not Found") return _ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0")) server_event.set() self.send_response(200) self.end_headers() self.wfile.write(b"OK") def log_message(self, format: str, *args: object) -> None: return def find_random_port() -> tuple[int, HTTPServer]: ports = list(range(10000, 11000)) random.SystemRandom().shuffle(ports) for port in ports: try: server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler) except OSError: continue return port, server raise RuntimeError("No free port found in range 10000-10999") port, httpd = find_random_port() def stream_colors(stream: object) -> tuple[str, str]: color = "" reset = "" is_tty = getattr(stream, "isatty", None) if callable(is_tty) and is_tty(): if stream is sys.stderr: color = "\033[31m" else: color = "\033[36m" reset = "\033[0m" return color, reset def log(message: str, stream: object = sys.stdout) -> None: color, reset = stream_colors(stream) stream.write(f"{color}[luks-unlock-wrapper] {message}{reset}\n") stream.flush() def write_status(message: str, stream: object = sys.stdout, *, newline: bool) -> None: color, reset = stream_colors(stream) is_tty = getattr(stream, "isatty", None) prefix = f"{color}[luks-unlock-wrapper] " suffix = f"{reset}\n" if newline else reset if callable(is_tty) and is_tty(): stream.write(f"\r\033[2K{prefix}{message}{suffix}") else: stream.write(f"{prefix}{message}{suffix}") stream.flush() def serve() -> None: httpd.serve_forever() server_thread = threading.Thread(target=serve, daemon=True) server_thread.start() log(f"Listening for POST on /install_finished at port {port}") build_cmd = ["mise", "build", args.template, "-i", str(port)] build_proc = subprocess.Popen( build_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) def relay_stream(stream: object, prefix: str, target: object) -> None: if not stream: return for line in stream: target.write(f"{prefix} {line}") target.flush() stdout_thread = threading.Thread( target=relay_stream, args=(build_proc.stdout, "[packer]", sys.stdout), daemon=True ) stderr_thread = threading.Thread( target=relay_stream, args=(build_proc.stderr, "[packer]", sys.stderr), daemon=True ) stdout_thread.start() stderr_thread.start() notified = False action_started = False def proxmox_request(method: str, path: str, **kwargs) -> requests.Response: if not proxmox_api_url: raise RuntimeError("proxmox_api_url not set") token_id = credentials.get("proxmox_api_token_id") token_secret = credentials.get("proxmox_api_token_secret") if not token_id or not token_secret: raise RuntimeError("Proxmox API token credentials missing") headers = kwargs.pop("headers", {}) headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}" url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}" return requests.request( method, url, headers=headers, verify=not proxmox_skip_tls_verify, timeout=30, **kwargs, ) def send_key(key: str) -> None: if not proxmox_node or not template_vm_id: raise RuntimeError("proxmox_node or template_vm_id not set") path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey" response = proxmox_request("PUT", path, data={"key": key}) response.raise_for_status() def handle_install_finished() -> None: retry_delay = 1 while True: try: response = proxmox_request("GET", "/version") response.raise_for_status() log("Authenticated to Proxmox VE API.") break except Exception as exc: log( f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.", stream=sys.stderr, ) time.sleep(retry_delay) retry_delay += 1 try: countdown_seconds = max(0, args.luks_wait_seconds) # Braille spinner: 8 dots, one missing dot rotates clockwise. full_mask = 0xFF # dots 1-8 dot_bits = { 1: 0x01, 2: 0x02, 3: 0x04, 4: 0x08, 5: 0x10, 6: 0x20, 7: 0x40, 8: 0x80, } rotation = [1, 4, 5, 6, 8, 7, 3, 2] # clockwise around the cell spinner = [chr(0x2800 + (full_mask - dot_bits[dot])) for dot in rotation] for remaining in range(countdown_seconds, -1, -1): minutes, seconds = divmod(remaining, 60) countdown = f"{minutes:02d}:{seconds:02d}" frame = spinner[(countdown_seconds - remaining) % len(spinner)] write_status(f"{frame} {countdown}", newline=False) if remaining: time.sleep(1) write_status(f"{spinner[0]} 00:00", newline=True) for char in "packer": send_key(char) time.sleep(0.1) send_key("ret") log("Unlocking encrypted disk. Entering LUKS password.") except Exception as exc: log(f"Post-install actions failed after auth: {exc}", stream=sys.stderr) try: while True: if server_event.is_set() and not notified: log("Installation finished.\nRestarting.") notified = True if server_event.is_set() and not action_started: action_started = True threading.Thread(target=handle_install_finished, daemon=True).start() if build_proc.poll() is not None: break server_event.wait(0.2) finally: httpd.shutdown() httpd.server_close() return build_proc.returncode or 0 if __name__ == "__main__": raise SystemExit(main())