#!/usr/bin/env -S uv run # /// script # requires-python = ">=3.15" # dependencies = [ # "python-hcl2==4.*", # "requests==2.*", # "yaspin==3.*", # ] # /// # See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference # https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey from __future__ import annotations import argparse import random import subprocess import sys import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path import hcl2 import requests from yaspin import yaspin def load_hcl(path: Path) -> dict: with path.open("r", encoding="utf-8") as handle: return hcl2.load(handle) def get_variable_default(hcl_data: dict, name: str) -> str | None: for variable_block in hcl_data.get("variable", []): if name in variable_block: return variable_block[name].get("default") return None def main() -> int: parser = argparse.ArgumentParser( description="Unlock LUKS after install via Proxmox API (setup stage)." ) parser.add_argument( "-t", "--template", required=True, help="Path to directory containing variables.pkr.hcl (also passed to mise build).", ) parser.add_argument( "--luks-wait-seconds", type=int, default=45, help="Seconds to wait before sending the LUKS password (default: 45).", ) args = parser.parse_args() script_root = Path(__file__).resolve().parents[1] variables_common_path = script_root / "variables-common.pkr.hcl" credentials_path = script_root / "credentials.auto.pkrvars.hcl" vars_dir = Path(args.template) if not vars_dir.is_absolute(): vars_dir = script_root / vars_dir variables_path = vars_dir / "variables.pkr.hcl" variables_common = load_hcl(variables_common_path) credentials = load_hcl(credentials_path) variables = load_hcl(variables_path) proxmox_api_url = get_variable_default(variables_common, "proxmox_api_url") proxmox_skip_tls_verify = ( get_variable_default(variables_common, "proxmox_skip_tls_verify") or False ) proxmox_node = get_variable_default(variables, "proxmox_node") template_vm_id = get_variable_default(variables, "template_vm_id") _ = proxmox_api_url, proxmox_node, template_vm_id, credentials server_event = threading.Event() class InstallFinishedHandler(BaseHTTPRequestHandler): def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler if self.path != "/install_finished": self.send_response(404) self.end_headers() self.wfile.write(b"Not Found") return _ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0")) server_event.set() self.send_response(200) self.end_headers() self.wfile.write(b"OK") def log_message(self, format: str, *args: object) -> None: return def find_random_port() -> tuple[int, HTTPServer]: ports = list(range(10000, 11000)) random.SystemRandom().shuffle(ports) for port in ports: try: server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler) except OSError: continue return port, server raise RuntimeError("No free port found in range 10000-10999") port, httpd = find_random_port() def stream_colors(stream: object) -> tuple[str, str]: color = "" reset = "" is_tty = getattr(stream, "isatty", None) if callable(is_tty) and is_tty(): if stream is sys.stderr: color = "\033[31m" else: color = "\033[36m" reset = "\033[0m" return color, reset def log(message: str, stream: object = sys.stdout) -> None: color, reset = stream_colors(stream) stream.write(f"{color}[luks-unlock-wrapper] {message}{reset}\n") stream.flush() def colorize_message(message: str, stream: object = sys.stdout) -> str: color, reset = stream_colors(stream) if not color: return message return f"{color}{message}{reset}" def serve() -> None: httpd.serve_forever() server_thread = threading.Thread(target=serve, daemon=True) server_thread.start() log(f"Listening for POST on /install_finished at port {port}") build_cmd = ["mise", "build", args.template, "-i", str(port)] build_proc = subprocess.Popen( build_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) def relay_stream(stream: object, prefix: str, target: object) -> None: if not stream: return for line in stream: target.write(f"{prefix} {line}") target.flush() stdout_thread = threading.Thread( target=relay_stream, args=(build_proc.stdout, "[packer]", sys.stdout), daemon=True ) stderr_thread = threading.Thread( target=relay_stream, args=(build_proc.stderr, "[packer]", sys.stderr), daemon=True ) stdout_thread.start() stderr_thread.start() notified = False action_started = False def proxmox_request(method: str, path: str, **kwargs) -> requests.Response: if not proxmox_api_url: raise RuntimeError("proxmox_api_url not set") token_id = credentials.get("proxmox_api_token_id") token_secret = credentials.get("proxmox_api_token_secret") if not token_id or not token_secret: raise RuntimeError("Proxmox API token credentials missing") headers = kwargs.pop("headers", {}) headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}" url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}" return requests.request( method, url, headers=headers, verify=not proxmox_skip_tls_verify, timeout=30, **kwargs, ) def send_key(key: str) -> None: if not proxmox_node or not template_vm_id: raise RuntimeError("proxmox_node or template_vm_id not set") path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey" response = proxmox_request("PUT", path, data={"key": key}) response.raise_for_status() def handle_install_finished() -> None: retry_delay = 1 while True: try: response = proxmox_request("GET", "/version") response.raise_for_status() log("Authenticated to Proxmox VE API.") break except Exception as exc: log( f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.", stream=sys.stderr, ) time.sleep(retry_delay) retry_delay += 1 spinner = None try: countdown_seconds = max(0, args.luks_wait_seconds) spinner = yaspin( text=colorize_message( f"[luks-unlock-wrapper] {countdown_seconds // 60:02d}:{countdown_seconds % 60:02d}", sys.stdout, ), color="cyan", stream=sys.stdout, ) spinner.start() for remaining in range(countdown_seconds, -1, -1): minutes, seconds = divmod(remaining, 60) countdown = f"{minutes:02d}:{seconds:02d}" spinner.text = colorize_message( f"[luks-unlock-wrapper] {countdown}", sys.stdout ) if remaining: time.sleep(1) for char in "packer": send_key(char) time.sleep(0.1) send_key("ret") spinner.text = colorize_message( "[luks-unlock-wrapper] ✅ Unlocking encrypted disk. Entering LUKS password.", sys.stdout, ) spinner.ok("") except Exception as exc: if spinner: spinner.text = colorize_message( "[luks-unlock-wrapper] 💥 Post-install actions failed.", sys.stdout, ) spinner.fail("") log(f"Post-install actions failed after auth: {exc}", stream=sys.stderr) try: while True: if server_event.is_set() and not notified: log("Installation finished. -- Restarting.") notified = True if server_event.is_set() and not action_started: action_started = True threading.Thread(target=handle_install_finished, daemon=True).start() if build_proc.poll() is not None: break server_event.wait(0.2) finally: httpd.shutdown() httpd.server_close() return build_proc.returncode or 0 if __name__ == "__main__": raise SystemExit(main())