packer/_scripts/unlock-luks-after-install.py

269 lines
9.1 KiB
Python
Executable file

#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.15"
# dependencies = [
# "python-hcl2==4.*",
# "requests==2.*",
# ]
# ///
# See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference
# https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey
from __future__ import annotations
import argparse
import random
import subprocess
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
import hcl2
import requests
def load_hcl(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
return hcl2.load(handle)
def get_variable_default(hcl_data: dict, name: str) -> str | None:
for variable_block in hcl_data.get("variable", []):
if name in variable_block:
return variable_block[name].get("default")
return None
def main() -> int:
parser = argparse.ArgumentParser(
description="Unlock LUKS after install via Proxmox API (setup stage)."
)
parser.add_argument(
"-t",
"--template",
required=True,
help="Path to directory containing variables.pkr.hcl (also passed to mise build).",
)
parser.add_argument(
"--luks-wait-seconds",
type=int,
default=45,
help="Seconds to wait before sending the LUKS password (default: 45).",
)
args = parser.parse_args()
script_root = Path(__file__).resolve().parents[1]
variables_common_path = script_root / "variables-common.pkr.hcl"
credentials_path = script_root / "credentials.auto.pkrvars.hcl"
vars_dir = Path(args.template)
if not vars_dir.is_absolute():
vars_dir = script_root / vars_dir
variables_path = vars_dir / "variables.pkr.hcl"
variables_common = load_hcl(variables_common_path)
credentials = load_hcl(credentials_path)
variables = load_hcl(variables_path)
proxmox_api_url = get_variable_default(variables_common, "proxmox_api_url")
proxmox_skip_tls_verify = (
get_variable_default(variables_common, "proxmox_skip_tls_verify") or False
)
proxmox_node = get_variable_default(variables, "proxmox_node")
template_vm_id = get_variable_default(variables, "template_vm_id")
_ = proxmox_api_url, proxmox_node, template_vm_id, credentials
server_event = threading.Event()
class InstallFinishedHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler
if self.path != "/install_finished":
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not Found")
return
_ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
server_event.set()
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
def log_message(self, format: str, *args: object) -> None:
return
def find_random_port() -> tuple[int, HTTPServer]:
ports = list(range(10000, 11000))
random.SystemRandom().shuffle(ports)
for port in ports:
try:
server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler)
except OSError:
continue
return port, server
raise RuntimeError("No free port found in range 10000-10999")
port, httpd = find_random_port()
def stream_colors(stream: object) -> tuple[str, str]:
color = ""
reset = ""
is_tty = getattr(stream, "isatty", None)
if callable(is_tty) and is_tty():
if stream is sys.stderr:
color = "\033[31m"
else:
color = "\033[36m"
reset = "\033[0m"
return color, reset
def log(message: str, stream: object = sys.stdout) -> None:
color, reset = stream_colors(stream)
stream.write(f"{color}[luks-unlock-wrapper] {message}{reset}\n")
stream.flush()
def write_status(message: str, stream: object = sys.stdout, *, newline: bool) -> None:
color, reset = stream_colors(stream)
is_tty = getattr(stream, "isatty", None)
prefix = f"{color}[luks-unlock-wrapper] "
suffix = f"{reset}\n" if newline else reset
if callable(is_tty) and is_tty():
stream.write(f"\r\033[2K{prefix}{message}{suffix}")
else:
stream.write(f"{prefix}{message}{suffix}")
stream.flush()
def serve() -> None:
httpd.serve_forever()
server_thread = threading.Thread(target=serve, daemon=True)
server_thread.start()
log(f"Listening for POST on /install_finished at port {port}")
build_cmd = ["mise", "build", args.template, "-i", str(port)]
build_proc = subprocess.Popen(
build_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
def relay_stream(stream: object, prefix: str, target: object) -> None:
if not stream:
return
for line in stream:
target.write(f"{prefix} {line}")
target.flush()
stdout_thread = threading.Thread(
target=relay_stream, args=(build_proc.stdout, "[packer]", sys.stdout), daemon=True
)
stderr_thread = threading.Thread(
target=relay_stream, args=(build_proc.stderr, "[packer]", sys.stderr), daemon=True
)
stdout_thread.start()
stderr_thread.start()
notified = False
action_started = False
def proxmox_request(method: str, path: str, **kwargs) -> requests.Response:
if not proxmox_api_url:
raise RuntimeError("proxmox_api_url not set")
token_id = credentials.get("proxmox_api_token_id")
token_secret = credentials.get("proxmox_api_token_secret")
if not token_id or not token_secret:
raise RuntimeError("Proxmox API token credentials missing")
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}"
url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}"
return requests.request(
method,
url,
headers=headers,
verify=not proxmox_skip_tls_verify,
timeout=30,
**kwargs,
)
def send_key(key: str) -> None:
if not proxmox_node or not template_vm_id:
raise RuntimeError("proxmox_node or template_vm_id not set")
path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey"
response = proxmox_request("PUT", path, data={"key": key})
response.raise_for_status()
def handle_install_finished() -> None:
retry_delay = 1
while True:
try:
response = proxmox_request("GET", "/version")
response.raise_for_status()
log("Authenticated to Proxmox VE API.")
break
except Exception as exc:
log(
f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.",
stream=sys.stderr,
)
time.sleep(retry_delay)
retry_delay += 1
try:
countdown_seconds = max(0, args.luks_wait_seconds)
# Braille spinner: 8 dots, one missing dot rotates clockwise.
full_mask = 0xFF # dots 1-8
dot_bits = {
1: 0x01,
2: 0x02,
3: 0x04,
4: 0x08,
5: 0x10,
6: 0x20,
7: 0x40,
8: 0x80,
}
rotation = [1, 4, 5, 6, 8, 7, 3, 2] # clockwise around the cell
spinner = [chr(0x2800 + (full_mask - dot_bits[dot])) for dot in rotation]
for remaining in range(countdown_seconds, -1, -1):
minutes, seconds = divmod(remaining, 60)
countdown = f"{minutes:02d}:{seconds:02d}"
frame = spinner[(countdown_seconds - remaining) % len(spinner)]
write_status(f"{frame} {countdown}", newline=False)
if remaining:
time.sleep(1)
write_status(f"{spinner[0]} 00:00", newline=True)
for char in "packer":
send_key(char)
time.sleep(0.1)
send_key("ret")
log("Unlocking encrypted disk. Entering LUKS password.")
except Exception as exc:
log(f"Post-install actions failed after auth: {exc}", stream=sys.stderr)
try:
while True:
if server_event.is_set() and not notified:
log("Installation finished.\nRestarting.")
notified = True
if server_event.is_set() and not action_started:
action_started = True
threading.Thread(target=handle_install_finished, daemon=True).start()
if build_proc.poll() is not None:
break
server_event.wait(0.2)
finally:
httpd.shutdown()
httpd.server_close()
return build_proc.returncode or 0
if __name__ == "__main__":
raise SystemExit(main())