packer/_scripts/unlock-luks-after-install.py
phg eded7180dc Refactor Debian 13 Trixie Packer templates for LUKS support
- Removed obsolete variable files: variables-common.pkr.hcl and variables.pkr.hcl.
- Updated debian-trixie.pkr.hcl to include local values for LUKS configuration.
- Modified boot command to include LUKS arguments based on the enable_luks variable.
- Enhanced initial-setup.sh to support LUKS detection and resizing.
- Replaced preseed.cfg with preseed.cfg.pkrtpl for dynamic LUKS configuration.
- Added enable_luks variable to control LUKS encryption during image build.
- Introduced luks.pkrvars.hcl for LUKS-specific variable settings.
- Updated mise.toml to support new variable file argument for Packer builds.
2026-05-11 19:13:11 +02:00

301 lines
10 KiB
Python
Executable file

#!/usr/bin/env -S uv run
# /// script
# requires-python = ">=3.15"
# dependencies = [
# "python-hcl2==4.*",
# "requests==2.*",
# "yaspin==3.*",
# ]
# ///
# See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference
# https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey
from __future__ import annotations
import argparse
import random
import subprocess
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
import hcl2
import requests
from yaspin import yaspin
def load_hcl(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
return hcl2.load(handle)
def resolve_input_path(path: str, script_root: Path) -> Path:
resolved = Path(path)
if not resolved.is_absolute():
resolved = script_root / resolved
return resolved
def merge_values(*hcl_data_items: dict) -> dict:
values = {}
for hcl_data in hcl_data_items:
for variable_block in hcl_data.get("variable", []):
for name, body in variable_block.items():
if isinstance(body, dict) and "default" in body:
values[name] = body["default"]
for name, value in hcl_data.items():
if name != "variable":
values[name] = value
return values
def main() -> int:
parser = argparse.ArgumentParser(
description="Unlock LUKS after install via Proxmox API (setup stage)."
)
parser.add_argument(
"-t",
"--template",
required=True,
help="Path to directory containing variables.pkr.hcl (also passed to mise build).",
)
parser.add_argument(
"--luks-wait-seconds",
type=int,
default=45,
help="Seconds to wait before sending the LUKS password (default: 45).",
)
parser.add_argument(
"--var-file",
action="append",
default=[],
help="Path to an HCL var-file passed through to the Packer build.",
)
args = parser.parse_args()
script_root = Path(__file__).resolve().parents[1]
variables_common_path = script_root / "variables-common.pkr.hcl"
credentials_path = script_root / "credentials.auto.pkrvars.hcl"
vars_dir = resolve_input_path(args.template, script_root)
variables_path = vars_dir / "variables.pkr.hcl"
var_file_paths = [resolve_input_path(var_file, script_root) for var_file in args.var_file]
variables_common = load_hcl(variables_common_path)
credentials = load_hcl(credentials_path)
variables = load_hcl(variables_path)
var_files = [load_hcl(var_file_path) for var_file_path in var_file_paths]
values = merge_values(variables_common, variables, credentials, *var_files)
proxmox_api_url = values.get("proxmox_api_url")
proxmox_skip_tls_verify = values.get("proxmox_skip_tls_verify", False)
default_luks_passphrase = values.get("default_luks_passphrase")
proxmox_node = values.get("proxmox_node")
template_vm_id = values.get("template_vm_id")
_ = (
proxmox_api_url,
proxmox_node,
template_vm_id,
credentials,
)
server_event = threading.Event()
class InstallFinishedHandler(BaseHTTPRequestHandler):
def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler
if self.path != "/install_finished":
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not Found")
return
_ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
server_event.set()
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
def log_message(self, format: str, *args: object) -> None:
return
def find_random_port() -> tuple[int, HTTPServer]:
ports = list(range(10000, 11000))
random.SystemRandom().shuffle(ports)
for port in ports:
try:
server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler)
except OSError:
continue
return port, server
raise RuntimeError("No free port found in range 10000-10999")
port, httpd = find_random_port()
def stream_colors(stream: object) -> tuple[str, str]:
color = ""
reset = ""
is_tty = getattr(stream, "isatty", None)
if callable(is_tty) and is_tty():
if stream is sys.stderr:
color = "\033[31m"
else:
color = "\033[36m"
reset = "\033[0m"
return color, reset
def log(message: str, stream: object = sys.stdout) -> None:
color, reset = stream_colors(stream)
stream.write(f"{color}[luks-unlock-wrapper] {message}{reset}\n")
stream.flush()
def colorize_message(message: str, stream: object = sys.stdout) -> str:
color, reset = stream_colors(stream)
if not color:
return message
return f"{color}{message}{reset}"
def serve() -> None:
httpd.serve_forever()
server_thread = threading.Thread(target=serve, daemon=True)
server_thread.start()
log(f"Listening for POST on /install_finished at port {port}")
build_cmd = ["mise", "run", "build", args.template]
for var_file_path in var_file_paths:
build_cmd.extend(["--var-file", str(var_file_path)])
build_cmd.extend(["-i", str(port)])
build_proc = subprocess.Popen(
build_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
def relay_stream(stream: object, prefix: str, target: object) -> None:
if not stream:
return
for line in stream:
target.write(f"{prefix} {line}")
target.flush()
stdout_thread = threading.Thread(
target=relay_stream, args=(build_proc.stdout, "[packer]", sys.stdout), daemon=True
)
stderr_thread = threading.Thread(
target=relay_stream, args=(build_proc.stderr, "[packer]", sys.stderr), daemon=True
)
stdout_thread.start()
stderr_thread.start()
notified = False
action_started = False
def proxmox_request(method: str, path: str, **kwargs) -> requests.Response:
if not proxmox_api_url:
raise RuntimeError("proxmox_api_url not set")
token_id = credentials.get("proxmox_api_token_id")
token_secret = credentials.get("proxmox_api_token_secret")
if not token_id or not token_secret:
raise RuntimeError("Proxmox API token credentials missing")
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}"
url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}"
return requests.request(
method,
url,
headers=headers,
verify=not proxmox_skip_tls_verify,
timeout=30,
**kwargs,
)
def send_key(key: str) -> None:
if not proxmox_node or not template_vm_id:
raise RuntimeError("proxmox_node or template_vm_id not set")
path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey"
response = proxmox_request("PUT", path, data={"key": key})
response.raise_for_status()
def handle_install_finished() -> None:
retry_delay = 1
while True:
try:
response = proxmox_request("GET", "/version")
response.raise_for_status()
log("Authenticated to Proxmox VE API.")
break
except Exception as exc:
log(
f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.",
stream=sys.stderr,
)
time.sleep(retry_delay)
retry_delay += 1
spinner = None
try:
countdown_seconds = max(0, args.luks_wait_seconds)
spinner = yaspin(
text=colorize_message(
f"[luks-unlock-wrapper] {countdown_seconds // 60:02d}:{countdown_seconds % 60:02d}",
sys.stdout,
),
color="cyan",
stream=sys.stdout,
)
spinner.start()
for remaining in range(countdown_seconds, -1, -1):
minutes, seconds = divmod(remaining, 60)
countdown = f"{minutes:02d}:{seconds:02d}"
spinner.text = colorize_message(
f"[luks-unlock-wrapper] {countdown}", sys.stdout
)
if remaining:
time.sleep(1)
if not default_luks_passphrase:
raise RuntimeError("default_luks_passphrase not set")
for char in default_luks_passphrase:
send_key(char)
time.sleep(0.1)
send_key("ret")
spinner.text = colorize_message(
"[luks-unlock-wrapper] ✔ Unlocking encrypted disk. Entering LUKS password.",
sys.stdout,
)
spinner.ok("")
except Exception as exc:
if spinner:
spinner.text = colorize_message(
"[luks-unlock-wrapper] ✗ Post-install actions failed.",
sys.stdout,
)
spinner.fail("")
log(f"Post-install actions failed after auth: {exc}", stream=sys.stderr)
try:
while True:
if server_event.is_set() and not notified:
log("Installation finished. -- Restarting.")
notified = True
if server_event.is_set() and not action_started:
action_started = True
threading.Thread(target=handle_install_finished, daemon=True).start()
if build_proc.poll() is not None:
break
server_event.wait(0.2)
finally:
httpd.shutdown()
httpd.server_close()
return build_proc.returncode or 0
if __name__ == "__main__":
raise SystemExit(main())