diff --git a/.gitignore b/.gitignore index 0794fa2..3d0bf74 100644 --- a/.gitignore +++ b/.gitignore @@ -227,3 +227,220 @@ fabric.properties # Android studio 3.1+ serialized cache file .idea/caches/build_file_checksums.ser + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[codz] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py.cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +# Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +# poetry.lock +# poetry.toml + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. +# https://pdm-project.org/en/latest/usage/project/#working-with-version-control +# pdm.lock +# pdm.toml +.pdm-python +.pdm-build/ + +# pixi +# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. +# pixi.lock +# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one +# in the .venv directory. It is recommended not to include this directory in version control. +.pixi + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# Redis +*.rdb +*.aof +*.pid + +# RabbitMQ +mnesia/ +rabbitmq/ +rabbitmq-data/ + +# ActiveMQ +activemq-data/ + +# SageMath parsed files +*.sage.py + +# Environments +.env +.envrc +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +# .idea/ + +# Abstra +# Abstra is an AI-powered process automation framework. +# Ignore directories containing user credentials, local state, and settings. +# Learn more at https://abstra.io/docs +.abstra/ + +# Visual Studio Code +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore +# and can be added to the global gitignore or merged into this file. However, if you prefer, +# you could uncomment the following to ignore the entire vscode folder +# .vscode/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# Marimo +marimo/_static/ +marimo/_lsp/ +__marimo__/ + +# Streamlit +.streamlit/secrets.toml diff --git a/dotfiles/local/bin/agenix-helper b/dotfiles/local/bin/agenix-helper index 079fcb7..11ef82d 100755 --- a/dotfiles/local/bin/agenix-helper +++ b/dotfiles/local/bin/agenix-helper @@ -1,250 +1,251 @@ -#!/usr/bin/env bash -# More safety, by turning some bugs into errors. -# Without `errexit` you don’t need ! and can replace -# ${PIPESTATUS[0]} with a simple $?, but I prefer safety. -set -euf -o pipefail +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.11" +# /// -#--------------------------------------------------- -# # {{@@ header() @@}} # # age encryption / decryption helpers # based on https://github.com/ryantm/agenix -# -# For macOS coreutils and gnu-getopt are required -# to run this script. -# brew install coreutils gnu-getopt -# -#--------------------------------------------------- -#TMPPATH="/dev/shm" -TMPPATH="/tmp" +from __future__ import annotations -[[ -d "/opt/homebrew/opt/coreutils/libexec/gnubin" ]] && export PATH="/opt/homebrew/opt/coreutils/libexec/gnubin:${PATH}" -[[ -d "/opt/homebrew/opt/gnu-getopt/bin" ]] && export PATH="/opt/homebrew/opt/gnu-getopt/bin:${PATH}" +import argparse +import re +import subprocess +import sys +from pathlib import Path -update_keys() { - local file="${1}" - local start_marker="${2}" - local end_marker="${3}" - local new_key="${4}" - local list_name="${5}" - local tmp_file=$(mktemp -p ${TMPPATH}) - local content_file=$(mktemp -p ${TMPPATH}) - - local content_array=() - local content_array_unsorted=() - # Get current configured keys and save them to the array "content_array" - mapfile -t content_array_unsorted < <(awk "/${start_marker}/{f=1;next} /${end_marker}/{f=0} f" ${file}) - # Add new key to the array "content_array" - content_array_unsorted+=("${new_key}") - # Sort content alphabetically - IFS=$'\n' content_array=($(sort <<<"${content_array_unsorted[*]}")); unset IFS - - # Remove duplicates from the array - declare -A seen=() - unique_content_array=() - for item in "${content_array[@]}"; do - key="${item%%=*}" # Extract the key part - if [[ -z "${seen[$key]+unset}" ]]; then - unique_content_array+=("${item}") - seen[$key]=1 - fi - done - - # Write the unique contents of the array to a temporary file - printf "%s\n" "${unique_content_array[@]}" > "${content_file}" - - # Process the file to replace the keyword list and the block of text - awk -v start="${start_marker}" -v end="${end_marker}" -v content_file="${content_file}" -v keys="${!seen[*]}" -v list_name="${list_name}" ' - BEGIN { - in_block = 0 - split(keys, key_array, " ") - } - { - if ($0 ~ start) { - print - in_block = 1 - while ((getline line < content_file) > 0) { - print line - } - close(content_file) - next - } - if ($0 ~ end) { - in_block = 0 - print - next - } - if (!in_block) { - if ($0 ~ list_name " = \\[.*\\];") { - # Recreate the list_name list from the keys of unique_content_array - printf " %s = [ ", list_name - sep = "" - for (i in key_array) { - gsub(/^ +/, "", key_array[i]) # Remove leading spaces from keys - printf "%s%s", sep, key_array[i] - sep = " " - } - print " ];" - next - } - print - } - } - ' "${file}" > "${tmp_file}" - - # Move the temporary file to the original file - mv "${tmp_file}" "${file}" - rm "${content_file}" -} - -gen-user-key() { - local keyname="${1}" - local public_key="${2}" - local working_directory="${3:-$(pwd)}" - local begin_marker='#-----BEGIN USER PUBLIC KEYS-----' - local end_marker='#------END USER PUBLIC KEYS------' - local input_file="${working_directory}/secrets/secrets.nix" - local userkey - - if [[ ${public_key} == "EMPTY" ]]; then - echo "generating new keys for host ${keyname}"; - ssh-keygen \ - -t ed25519 \ - -f ~/.ssh/${keyname} \ - -C "agenix@${keyname}" \ - -N '' - - echo "getting user public key for user ${keyname}" - userkey=$(echo -n " ${keyname} = \"$(cat ~/.ssh/${keyname}.pub | awk -F' ' '{ print $1, $2 }')\";") - else - userkey=$(echo -n " ${keyname} = \"$(echo -n "${public_key}" | awk -F' ' '{ print $1, $2 }')\";") - fi - - update_keys "${input_file}" "${begin_marker}" "${end_marker}" "${userkey}" "users" -} - -get-host-key() { - local keyname="${1}" - local target="${2}" - local type="${3:-ssh-ed25519}" - local working_directory="${4:-$(pwd)}" - local begin_marker='#-----BEGIN SYSTEM PUBLIC KEYS-----' - local end_marker='#------END SYSTEM PUBLIC KEYS------' - local input_file="${working_directory}/secrets/secrets.nix" - local hostkey - - echo "getting host public key for host ${keyname}" - hostkey=$(echo -n " ${keyname} = \"$(ssh-keyscan -t ${type} ${target} 2>/dev/null | awk -F' ' '{ print $2, $3 }')\";") - - update_keys "${input_file}" "${begin_marker}" "${end_marker}" "${hostkey}" "systems" -} - -help() { - echo "Usage: $(basename ${0}) < gen-user-key [argument ...] | get-host-key [argument ...] >" - echo "" - echo "Options:" - echo " gen-user-key generates a new ssh-ed25519 keypair and adds the public key to secrets.nix" - echo "" - echo " -k, --public-key provide a public key, instead of generiting a new keypair (format: \"ssh-ed25519 AAAAC3N...\")" - echo " -n, --name keyname, usually the hostname (e.g. )" - echo " -p, --path path to the root directory for the nixOS configuration files, defaults to \`pwd\`" - echo "" - echo "" - echo " get-host-key get a ssh host public key via ssh-keyscan and adds it to secrets.nix" - echo "" - echo " -t, --target hostname, fqdn or IP from whom the host key is requested" - echo " -n, --name keyname, usually the hostname (e.g. )" - echo " -p, --path path to the root directory for the nixOS configuration files, defaults to \`pwd\`" - echo " --type type of the key which is requested via ssh-keyscan, defaults to \`ssh-ed25519\`" -} +USER_BEGIN_MARKER = "#-----BEGIN USER PUBLIC KEYS-----" +USER_END_MARKER = "#------END USER PUBLIC KEYS------" +SYSTEM_BEGIN_MARKER = "#-----BEGIN SYSTEM PUBLIC KEYS-----" +SYSTEM_END_MARKER = "#------END SYSTEM PUBLIC KEYS------" -# -allow a command to fail with !’s side effect on errexit -# -use return value from ${PIPESTATUS[0]}, because ! hosed $? -! getopt --test > /dev/null -if [[ ${PIPESTATUS[0]} -ne 4 ]]; then - echo 'I’m sorry, `getopt --test` failed in this environment.' - exit 1 -fi +def normalize_public_key(raw_key: str) -> str: + parts = raw_key.strip().split() + if len(parts) < 2: + raise ValueError( + 'Public key must contain at least " " (e.g. "ssh-ed25519 AAAAC3N...").' + ) + return f"{parts[0]} {parts[1]}" -# option --output/-o requires 1 argument -OPTIONS=hk:n:p:t: -LONGOPTS=help,name:,path:,public-key:,target:,type: -# -regarding ! and PIPESTATUS see above -# -temporarily store output to be able to check for errors -# -activate quoting/enhanced mode (e.g. by writing out “--options”) -# -pass arguments only via -- "$@" to separate them correctly -! PARSED=$(getopt --options=${OPTIONS} --longoptions=${LONGOPTS} --name "$(basename ${0})" -- "${@:--h}") -if [[ ${PIPESTATUS[0]} -ne 0 ]]; then - # e.g. return value is 1 - # then getopt has complained about wrong arguments to stdout - exit 2 -fi -# read getopt’s output this way to handle the quoting right: -eval set -- "${PARSED}" +def find_marker(lines: list[str], marker: str) -> int: + for index, line in enumerate(lines): + if marker in line: + return index + raise ValueError(f'Marker "{marker}" was not found.') -# now enjoy the options in order and nicely split until we see -- -while true; do - case "${1}" in - -h|--help) - shift - help - exit - ;; - -k|--public-key) - public_key="${2}" - shift 2 - ;; - -n|--name) - name="${2}" - shift 2 - ;; - -p|--path) - path="${2}" - shift 2 - ;; - -t|--target) - target="${2}" - shift 2 - ;; - --type) - type="${2}" - shift 2 - ;; - --) - shift - break - ;; - *) - echo "This option (${1}) does not exist. Exiting." - exit 3 - ;; - esac -done -# handle non-option arguments -if [[ ${#} -eq 1 ]]; then - while true; do - case "${1}" in - gen-user-key) - gen-user-key "${name:?Error, missing option \"-n\"}" "${public_key:-"EMPTY"}" "${path:-}" - shift - exit - ;; - get-host-key) - get-host-key "${name:?Error, missing option \"-n\"}" "${target:?Error, missing option \"-t\"}" "${type:-}" "${path:-}" - shift - exit - ;; - *) - echo "Wrong sub command, use -h to print the help." - exit 4 - ;; - esac - done -else - echo "No sub command provided, use -h to print the help." -fi +def update_keys( + file_path: Path, + start_marker: str, + end_marker: str, + new_key_line: str, + list_name: str, +) -> None: + lines = file_path.read_text(encoding="utf-8").splitlines(keepends=True) + + start_index = find_marker(lines, start_marker) + end_index = find_marker(lines[start_index + 1 :], end_marker) + start_index + 1 + + existing_lines = [line.rstrip("\n") for line in lines[start_index + 1 : end_index]] + merged = sorted(existing_lines + [new_key_line]) + + unique_lines: list[str] = [] + ordered_keys: list[str] = [] + seen_keys: set[str] = set() + for entry in merged: + key_name = entry.split("=", 1)[0].strip() + if key_name not in seen_keys: + unique_lines.append(entry) + ordered_keys.append(key_name) + seen_keys.add(key_name) + + output_lines: list[str] = [] + in_block = False + list_pattern = re.compile(rf"^\s*{re.escape(list_name)}\s*=\s*\[.*\];\s*$") + for index, line in enumerate(lines): + if index == start_index: + output_lines.append(line) + for entry in unique_lines: + output_lines.append(f"{entry}\n") + in_block = True + continue + + if in_block: + if index == end_index: + output_lines.append(line) + in_block = False + continue + + if list_pattern.match(line): + indent = re.match(r"^\s*", line).group(0) + key_list = " ".join(ordered_keys) + output_lines.append(f"{indent}{list_name} = [ {key_list} ];\n") + else: + output_lines.append(line) + + file_path.write_text("".join(output_lines), encoding="utf-8") + + +def parse_ssh_keyscan_output(raw_output: str, requested_types: str) -> str: + preferred_types = [item.strip() for item in requested_types.split(",") if item.strip()] + type_rank = {key_type: rank for rank, key_type in enumerate(preferred_types)} + + candidates: list[tuple[int, str, str]] = [] + for index, raw_line in enumerate(raw_output.splitlines()): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + + fields = line.split() + if len(fields) < 3: + continue + + key_type, key_material = fields[1], fields[2] + candidates.append((index, key_type, key_material)) + + if not candidates: + raise ValueError("No valid SSH key lines found in ssh-keyscan output.") + + # Prefer modern Ed25519 keys over RSA whenever both are available. + if not preferred_types or "ssh-ed25519" in preferred_types: + ed25519_candidate = next( + ((key_type, key_material) for _, key_type, key_material in candidates if key_type == "ssh-ed25519"), + None, + ) + if ed25519_candidate is not None: + key_type, key_material = ed25519_candidate + return f"{key_type} {key_material}" + + if preferred_types: + ranked_candidates = [ + (type_rank[key_type], output_order, key_type, key_material) + for output_order, key_type, key_material in candidates + if key_type in type_rank + ] + if ranked_candidates: + _, _, key_type, key_material = min(ranked_candidates) + return f"{key_type} {key_material}" + + _, key_type, key_material = candidates[0] + return f"{key_type} {key_material}" + + +def gen_user_key(name: str, public_key: str | None, working_directory: Path) -> None: + input_file = working_directory / "secrets" / "secrets.nix" + + if public_key is None: + print(f"generating new keys for host {name}") + private_key_file = Path.home() / ".ssh" / name + subprocess.run( + [ + "ssh-keygen", + "-t", + "ed25519", + "-f", + str(private_key_file), + "-C", + f"agenix@{name}", + "-N", + "", + ], + check=True, + ) + + print(f"getting user public key for user {name}") + user_public_key = normalize_public_key(private_key_file.with_suffix(".pub").read_text(encoding="utf-8")) + else: + user_public_key = normalize_public_key(public_key) + + user_key_line = f' {name} = "{user_public_key}";' + update_keys(input_file, USER_BEGIN_MARKER, USER_END_MARKER, user_key_line, "users") + + +def get_host_key(name: str, target: str, key_type: str, working_directory: Path) -> None: + input_file = working_directory / "secrets" / "secrets.nix" + + print(f"getting host public key for host {name}") + keyscan = subprocess.run( + ["ssh-keyscan", "-t", key_type, target], + check=True, + text=True, + capture_output=True, + ) + parsed_key = parse_ssh_keyscan_output(keyscan.stdout, key_type) + host_key_line = f' {name} = "{parsed_key}";' + + update_keys(input_file, SYSTEM_BEGIN_MARKER, SYSTEM_END_MARKER, host_key_line, "systems") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog=Path(sys.argv[0]).name, + usage="%(prog)s < gen-user-key [argument ...] | get-host-key [argument ...] >", + description="age encryption / decryption helpers (based on https://github.com/ryantm/agenix)", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument( + "subcommand", + choices=["gen-user-key", "get-host-key"], + help=( + "gen-user-key: generate or add a user key to secrets.nix\n" + "get-host-key: fetch a host key via ssh-keyscan and add it to secrets.nix" + ), + ) + parser.add_argument( + "-k", + "--public-key", + help='provide a public key instead of generating one (format: "ssh-ed25519 AAAAC3N...")', + ) + parser.add_argument("-n", "--name", help="key name, usually the hostname") + parser.add_argument( + "-p", + "--path", + help="path to the root directory for nixOS configuration files (defaults to current directory)", + ) + parser.add_argument("-t", "--target", help="hostname/FQDN/IP to query via ssh-keyscan") + parser.add_argument( + "--type", + default="ssh-ed25519", + help='ssh-keyscan key type(s), defaults to "ssh-ed25519"', + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + effective_argv = sys.argv[1:] if argv is None else argv + if not effective_argv: + parser.print_help() + return 0 + + args = parser.parse_args(effective_argv) + working_directory = Path(args.path).expanduser().resolve() if args.path else Path.cwd() + + if args.subcommand == "gen-user-key": + if not args.name: + parser.error('Error, missing option "-n/--name"') + gen_user_key(args.name, args.public_key, working_directory) + return 0 + + if args.subcommand == "get-host-key": + if not args.name: + parser.error('Error, missing option "-n/--name"') + if not args.target: + parser.error('Error, missing option "-t/--target"') + get_host_key(args.name, args.target, args.type, working_directory) + return 0 + + parser.error("Wrong sub command, use -h to print the help.") + return 4 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except (subprocess.CalledProcessError, ValueError, OSError) as exc: + print(str(exc), file=sys.stderr) + raise SystemExit(1)