Add fully automated Debian 13 LUKS encrypted Template build
This commit is contained in:
parent
40a0623ad0
commit
f76fd4a95a
5 changed files with 215 additions and 3 deletions
188
_scripts/unlock-luks-after-install.py
Executable file
188
_scripts/unlock-luks-after-install.py
Executable file
|
|
@ -0,0 +1,188 @@
|
|||
#!/usr/bin/env -S uv run
|
||||
# /// script
|
||||
# requires-python = ">=3.15"
|
||||
# dependencies = [
|
||||
# "python-hcl2==4.*",
|
||||
# "requests==2.*",
|
||||
# ]
|
||||
# ///
|
||||
|
||||
# See https://discuss.hashicorp.com/t/luks-encryption-key-on-initial-reboot/45459/2 for reference
|
||||
# https://pve.proxmox.com/pve-docs/api-viewer/index.html#/nodes/{node}/qemu/{vmid}/sendkey
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import random
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
|
||||
import hcl2
|
||||
import requests
|
||||
|
||||
|
||||
def load_hcl(path: Path) -> dict:
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
return hcl2.load(handle)
|
||||
|
||||
|
||||
def get_variable_default(hcl_data: dict, name: str) -> str | None:
|
||||
for variable_block in hcl_data.get("variable", []):
|
||||
if name in variable_block:
|
||||
return variable_block[name].get("default")
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Unlock LUKS after install via Proxmox API (setup stage)."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--template",
|
||||
required=True,
|
||||
help="Path to directory containing variables.pkr.hcl (also passed to mise build).",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
script_root = Path(__file__).resolve().parents[1]
|
||||
variables_common_path = script_root / "variables-common.pkr.hcl"
|
||||
credentials_path = script_root / "debian/13-trixie/credentials.auto.pkrvars.hcl"
|
||||
vars_dir = Path(args.template)
|
||||
if not vars_dir.is_absolute():
|
||||
vars_dir = script_root / vars_dir
|
||||
variables_path = vars_dir / "variables.pkr.hcl"
|
||||
|
||||
variables_common = load_hcl(variables_common_path)
|
||||
credentials = load_hcl(credentials_path)
|
||||
variables = load_hcl(variables_path)
|
||||
|
||||
proxmox_api_url = get_variable_default(variables_common, "proxmox_api_url")
|
||||
proxmox_skip_tls_verify = (
|
||||
get_variable_default(variables_common, "proxmox_skip_tls_verify") or False
|
||||
)
|
||||
proxmox_node = get_variable_default(variables, "proxmox_node")
|
||||
template_vm_id = get_variable_default(variables, "template_vm_id")
|
||||
|
||||
_ = proxmox_api_url, proxmox_node, template_vm_id, credentials
|
||||
|
||||
server_event = threading.Event()
|
||||
|
||||
class InstallFinishedHandler(BaseHTTPRequestHandler):
|
||||
def do_POST(self) -> None: # noqa: N802 - required by BaseHTTPRequestHandler
|
||||
if self.path != "/install_finished":
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
self.wfile.write(b"Not Found")
|
||||
return
|
||||
|
||||
_ = self.rfile.read(int(self.headers.get("Content-Length", "0") or "0"))
|
||||
server_event.set()
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
self.wfile.write(b"OK")
|
||||
|
||||
def log_message(self, format: str, *args: object) -> None:
|
||||
return
|
||||
|
||||
def find_random_port() -> tuple[int, HTTPServer]:
|
||||
ports = list(range(10000, 11000))
|
||||
random.SystemRandom().shuffle(ports)
|
||||
for port in ports:
|
||||
try:
|
||||
server = HTTPServer(("0.0.0.0", port), InstallFinishedHandler)
|
||||
except OSError:
|
||||
continue
|
||||
return port, server
|
||||
raise RuntimeError("No free port found in range 10000-10999")
|
||||
|
||||
port, httpd = find_random_port()
|
||||
|
||||
def serve() -> None:
|
||||
httpd.serve_forever()
|
||||
|
||||
server_thread = threading.Thread(target=serve, daemon=True)
|
||||
server_thread.start()
|
||||
|
||||
print(f"Listening for POST on /install_finished at port {port}")
|
||||
|
||||
build_cmd = ["mise", "build", args.template, "-i", str(port)]
|
||||
build_proc = subprocess.Popen(build_cmd)
|
||||
|
||||
notified = False
|
||||
action_started = False
|
||||
|
||||
def proxmox_request(method: str, path: str, **kwargs) -> requests.Response:
|
||||
if not proxmox_api_url:
|
||||
raise RuntimeError("proxmox_api_url not set")
|
||||
token_id = credentials.get("proxmox_api_token_id")
|
||||
token_secret = credentials.get("proxmox_api_token_secret")
|
||||
if not token_id or not token_secret:
|
||||
raise RuntimeError("Proxmox API token credentials missing")
|
||||
|
||||
headers = kwargs.pop("headers", {})
|
||||
headers["Authorization"] = f"PVEAPIToken={token_id}={token_secret}"
|
||||
url = f"{proxmox_api_url.rstrip('/')}/{path.lstrip('/')}"
|
||||
return requests.request(
|
||||
method,
|
||||
url,
|
||||
headers=headers,
|
||||
verify=not proxmox_skip_tls_verify,
|
||||
timeout=30,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def send_key(key: str) -> None:
|
||||
if not proxmox_node or not template_vm_id:
|
||||
raise RuntimeError("proxmox_node or template_vm_id not set")
|
||||
path = f"/nodes/{proxmox_node}/qemu/{template_vm_id}/sendkey"
|
||||
response = proxmox_request("PUT", path, data={"key": key})
|
||||
response.raise_for_status()
|
||||
|
||||
def handle_install_finished() -> None:
|
||||
retry_delay = 1
|
||||
while True:
|
||||
try:
|
||||
response = proxmox_request("GET", "/version")
|
||||
response.raise_for_status()
|
||||
print("Authenticated to Proxmox VE API.")
|
||||
break
|
||||
except Exception as exc:
|
||||
print(f"Proxmox auth failed: {exc}. Retrying in {retry_delay}s.")
|
||||
time.sleep(retry_delay)
|
||||
retry_delay += 1
|
||||
|
||||
try:
|
||||
print("Waiting 45 seconds before sending LUKS password.")
|
||||
time.sleep(45)
|
||||
for char in "packer":
|
||||
send_key(char)
|
||||
time.sleep(0.1)
|
||||
send_key("ret")
|
||||
print("Sent LUKS password and Enter.")
|
||||
except Exception as exc:
|
||||
print(f"Post-install actions failed after auth: {exc}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
if server_event.is_set() and not notified:
|
||||
print("Installation finished.\nRestarting.")
|
||||
notified = True
|
||||
if server_event.is_set() and not action_started:
|
||||
action_started = True
|
||||
threading.Thread(target=handle_install_finished, daemon=True).start()
|
||||
if build_proc.poll() is not None:
|
||||
break
|
||||
server_event.wait(0.2)
|
||||
finally:
|
||||
httpd.shutdown()
|
||||
httpd.server_close()
|
||||
|
||||
return build_proc.returncode or 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue