Initial working version

This commit is contained in:
Philip Henning 2026-03-08 18:32:25 +01:00
parent 34a0627e76
commit b6886cb34a
61 changed files with 4475 additions and 6 deletions

View file

@ -0,0 +1,3 @@
"""Proxmox VM setup TUI."""
__all__ = []

View file

@ -0,0 +1,4 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

46
src/pve_vm_setup/app.py Normal file
View file

@ -0,0 +1,46 @@
from __future__ import annotations
from textual.app import App, ComposeResult
from textual.containers import Container
from textual.widgets import Footer, Header
from .models.workflow import WorkflowState
from .screens.login import LoginView
from .screens.wizard import WizardView
from .services.base import ProxmoxService
from .services.factory import ProxmoxServiceFactory
from .settings import AppSettings
from .terminal_compat import build_driver_class
class PveVmSetupApp(App[None]):
TITLE = "Proxmox VM Setup"
SUB_TITLE = "Live-access foundation"
def __init__(
self,
settings: AppSettings,
*,
service: ProxmoxService | None = None,
) -> None:
super().__init__(driver_class=build_driver_class())
self.settings = settings
self.workflow = WorkflowState()
self.service = service or ProxmoxServiceFactory.create(settings)
def compose(self) -> ComposeResult:
yield Header()
with Container(id="app-body"):
yield LoginView(self.settings, self.workflow, self.service)
yield Footer()
def on_unmount(self) -> None:
close = getattr(self.service, "close", None)
if callable(close):
close()
async def on_login_view_authenticated(self, _: LoginView.Authenticated) -> None:
self.query_one(LoginView).remove()
wizard = WizardView(self.settings, self.workflow, self.service)
await self.query_one("#app-body", Container).mount(wizard)
wizard.activate()

33
src/pve_vm_setup/cli.py Normal file
View file

@ -0,0 +1,33 @@
from __future__ import annotations
import argparse
import sys
from .app import PveVmSetupApp
from .doctor import run_live_doctor
from .settings import AppSettings
from .terminal_compat import apply_runtime_compatibility
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Proxmox VM setup TUI")
parser.add_argument(
"--doctor-live",
action="store_true",
help="Run live Proxmox connectivity and authentication diagnostics.",
)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
apply_runtime_compatibility()
settings = AppSettings.from_env()
if args.doctor_live:
return run_live_doctor(settings, stream=sys.stdout)
app = PveVmSetupApp(settings)
app.run(mouse=False)
return 0

View file

@ -0,0 +1,80 @@
from __future__ import annotations
from typing import TextIO
from .errors import ProxmoxError, SettingsError
from .services.factory import ProxmoxServiceFactory
from .settings import AppSettings
def run_live_doctor(
settings: AppSettings,
*,
stream: TextIO,
service_factory: type[ProxmoxServiceFactory] = ProxmoxServiceFactory,
) -> int:
try:
settings.validate_live_requirements()
settings.safety_policy.validate()
except SettingsError as exc:
stream.write(f"FAIL configuration: {exc}\n")
return 1
stream.write("Target\n")
stream.write(f" host: {settings.sanitized_host}\n")
stream.write(f" api_base: {settings.proxmox_api_base}\n")
stream.write(f" realm: {settings.proxmox_realm}\n")
stream.write(f" verify_tls: {settings.proxmox_verify_tls}\n")
stream.write(f" prevent_create: {settings.safety_policy.prevent_create}\n")
stream.write(f" enable_test_mode: {settings.safety_policy.enable_test_mode}\n")
service = service_factory.create(settings)
try:
stream.write("1. Checking HTTPS reachability...\n")
transport_status = service.check_connectivity()
stream.write(f" OK {transport_status}\n")
stream.write("2. Checking API base path...\n")
release = service.check_api_base()
stream.write(f" OK release={release}\n")
stream.write("3. Loading realms...\n")
realms = service.load_realms()
stream.write(f" OK realms={','.join(realm.name for realm in realms)}\n")
stream.write("4. Attempting login...\n")
session = service.login(
settings.proxmox_user or "",
settings.proxmox_password or "",
settings.proxmox_realm or "",
)
stream.write(f" OK authenticated_as={session.username}\n")
if settings.safety_policy.enable_test_mode:
stream.write("5. Validating test mode create scope...\n")
nodes = {node.name for node in service.load_nodes()}
if settings.safety_policy.test_node not in nodes:
raise SettingsError(
f"Configured test node {settings.safety_policy.test_node!r} was not found."
)
stream.write(f" OK node={settings.safety_policy.test_node}\n")
if settings.safety_policy.test_pool:
pools = {pool.poolid for pool in service.load_pools()}
if settings.safety_policy.test_pool not in pools:
raise SettingsError(
f"Configured test pool {settings.safety_policy.test_pool!r} was not found."
)
stream.write(f" OK pool={settings.safety_policy.test_pool}\n")
stream.write(f" tag={settings.safety_policy.test_tag}\n")
stream.write(f" name_prefix={settings.safety_policy.test_vm_name_prefix}\n")
except (ProxmoxError, SettingsError, ValueError) as exc:
stream.write(f"FAIL {exc}\n")
return 1
finally:
close = getattr(service, "close", None)
if callable(close):
close()
stream.write("Doctor finished successfully.\n")
return 0

320
src/pve_vm_setup/domain.py Normal file
View file

@ -0,0 +1,320 @@
from __future__ import annotations
import re
from dataclasses import replace
from .errors import SettingsError
from .models.workflow import DiskConfig, ReferenceData, VmConfig
from .settings import AppSettings
_NIXOS_ISO_PATTERN = re.compile(
r"nixos-minimal-(?P<year>\d{2})[.-](?P<month>\d{2})\.[A-Za-z0-9]+-[A-Za-z0-9_]+-linux\.iso$"
)
def select_latest_nixos_iso(isos: list[str]) -> str | None:
candidates: list[tuple[int, int, str]] = []
for iso in isos:
match = _NIXOS_ISO_PATTERN.search(iso)
if match:
candidates.append((int(match.group("year")), int(match.group("month")), iso))
if not candidates:
return None
return max(candidates)[2]
def build_startup_value(order: str, up: str, down: str) -> str:
parts: list[str] = []
if order.strip():
parts.append(f"order={order.strip()}")
if up.strip():
parts.append(f"up={up.strip()}")
if down.strip():
parts.append(f"down={down.strip()}")
return ",".join(parts)
def effective_vm_config(config: VmConfig, settings: AppSettings) -> VmConfig:
result = replace(config)
result.general = replace(config.general)
result.general.name = settings.safety_policy.effective_vm_name(config.general.name.strip())
if not settings.safety_policy.enable_test_mode:
result.general.tags = [tag for tag in config.general.tags if tag]
return result
tags = [tag for tag in config.general.tags if tag]
if settings.safety_policy.test_tag not in tags:
tags.append(settings.safety_policy.test_tag)
result.general.tags = sorted(dict.fromkeys(tags))
return result
def validate_step(
step: str,
config: VmConfig,
settings: AppSettings,
references: ReferenceData,
) -> list[str]:
errors: list[str] = []
if step == "general":
if not config.general.node:
errors.append("Node is required.")
if config.general.vmid < 100:
errors.append("VM ID must be at least 100.")
if not config.general.name.strip():
errors.append("Name is required.")
for label, value in [
("Startup order", config.general.startup_order),
("Startup delay", config.general.startup_delay),
("Shutdown timeout", config.general.shutdown_timeout),
]:
if value.strip() and not value.strip().isdigit():
errors.append(f"{label} must be an integer.")
if settings.safety_policy.enable_test_mode and settings.safety_policy.test_node:
if config.general.node != settings.safety_policy.test_node:
errors.append(
f"Live create mode is restricted to node {settings.safety_policy.test_node}."
)
if settings.safety_policy.enable_test_mode and settings.safety_policy.test_pool:
if config.general.pool != settings.safety_policy.test_pool:
errors.append(
f"Live create mode is restricted to pool {settings.safety_policy.test_pool}."
)
if step == "os":
if config.os.media_choice == "iso":
if not config.os.storage:
errors.append("ISO storage is required.")
if not config.os.iso:
errors.append("ISO selection is required.")
if config.os.media_choice == "physical" and not config.os.physical_drive_path.strip():
errors.append("Physical disc drive path is required.")
if step == "system":
if config.system.add_efi_disk and not config.system.efi_storage:
errors.append("EFI storage is required when EFI disk is enabled.")
if step == "disks":
slots: set[str] = set()
for disk in config.disks:
if disk.slot_name in slots:
errors.append(f"Duplicate disk slot {disk.slot_name}.")
slots.add(disk.slot_name)
if disk.size_gib <= 0:
errors.append(f"Disk {disk.slot_name} size must be greater than zero.")
if not disk.storage:
errors.append(f"Disk {disk.slot_name} storage is required.")
if step == "cpu":
if config.cpu.cores <= 0:
errors.append("CPU cores must be greater than zero.")
if config.cpu.sockets <= 0:
errors.append("CPU sockets must be greater than zero.")
if step == "memory":
if config.memory.memory_mib <= 0:
errors.append("Memory must be greater than zero.")
if config.memory.ballooning:
if config.memory.min_memory_mib <= 0:
errors.append("Min memory must be greater than zero when ballooning is enabled.")
if config.memory.min_memory_mib > config.memory.memory_mib:
errors.append("Min memory cannot exceed memory size.")
if step == "network" and not config.network.no_network_device:
if not config.network.bridge:
errors.append("Bridge is required unless networking is disabled.")
for label, value in [
("VLAN tag", config.network.vlan_tag),
("MTU", config.network.mtu),
("Multiqueue", config.network.multiqueue),
]:
if value.strip() and not value.strip().isdigit():
errors.append(f"{label} must be an integer.")
if step == "confirm" and not settings.safety_policy.allow_create:
errors.append("Set PROXMOX_PREVENT_CREATE=false to enable VM creation.")
return errors
def validate_all_steps(
config: VmConfig, settings: AppSettings, references: ReferenceData
) -> list[str]:
all_errors: list[str] = []
for step in ["general", "os", "system", "disks", "cpu", "memory", "network", "confirm"]:
all_errors.extend(validate_step(step, config, settings, references))
return all_errors
def _bool_int(value: bool) -> int:
return 1 if value else 0
def build_disk_value(disk: DiskConfig) -> str:
options = [
f"{disk.storage}:{disk.size_gib}",
f"format={disk.format}",
f"cache={disk.cache}",
f"discard={'on' if disk.discard else 'ignore'}",
f"iothread={_bool_int(disk.io_thread)}",
f"ssd={_bool_int(disk.ssd_emulation)}",
f"backup={_bool_int(disk.backup)}",
f"replicate={_bool_int(not disk.skip_replication)}",
f"aio={disk.async_io}",
]
return ",".join(options)
def build_network_value(config: VmConfig) -> str | None:
if config.network.no_network_device:
return None
parts: list[str] = []
if config.network.mac_address.strip():
parts.append(f"{config.network.model}={config.network.mac_address.strip()}")
else:
parts.append(f"model={config.network.model}")
parts.append(f"bridge={config.network.bridge}")
parts.append(f"firewall={_bool_int(config.network.firewall)}")
parts.append(f"link_down={_bool_int(config.network.disconnected)}")
if config.network.vlan_tag.strip():
parts.append(f"tag={int(config.network.vlan_tag)}")
if config.network.mtu.strip():
parts.append(f"mtu={int(config.network.mtu)}")
if config.network.rate_limit.strip():
parts.append(f"rate={config.network.rate_limit.strip()}")
if config.network.multiqueue.strip():
parts.append(f"queues={int(config.network.multiqueue)}")
return ",".join(parts)
def build_media_value(config: VmConfig) -> str | None:
if config.os.media_choice == "none":
return None
if config.os.media_choice == "iso" and config.os.iso:
return f"{config.os.iso},media=cdrom"
if config.os.media_choice == "physical":
return f"{config.os.physical_drive_path.strip()},media=cdrom"
return None
def build_create_payload(config: VmConfig, settings: AppSettings) -> dict[str, str | int]:
if not settings.safety_policy.allow_create:
raise SettingsError("PROXMOX_PREVENT_CREATE=false is required before creating VMs.")
effective = effective_vm_config(config, settings)
payload: dict[str, str | int] = {
"vmid": effective.general.vmid,
"name": effective.general.name,
"ostype": effective.os.guest_version,
"bios": effective.system.bios,
"machine": effective.system.machine,
"scsihw": effective.system.scsi_controller,
"agent": _bool_int(effective.system.qemu_agent),
"cores": effective.cpu.cores,
"sockets": effective.cpu.sockets,
"cpu": effective.cpu.cpu_type,
"memory": effective.memory.memory_mib,
"balloon": effective.memory.min_memory_mib if effective.memory.ballooning else 0,
"allow-ksm": _bool_int(effective.memory.allow_ksm),
"onboot": _bool_int(effective.general.onboot),
"tags": ";".join(effective.general.tags),
}
if effective.general.pool:
payload["pool"] = effective.general.pool
startup = build_startup_value(
effective.general.startup_order,
effective.general.startup_delay,
effective.general.shutdown_timeout,
)
if startup:
payload["startup"] = startup
if effective.system.graphic_card != "default":
payload["vga"] = effective.system.graphic_card
if effective.system.add_efi_disk:
payload["efidisk0"] = (
f"{effective.system.efi_storage}:1,efitype=4m,"
f"pre-enrolled-keys={_bool_int(effective.system.pre_enrolled_keys)}"
)
if effective.system.tpm_enabled:
payload["tpmstate0"] = f"{effective.system.efi_storage}:4,version=v2.0"
media = build_media_value(effective)
if media:
payload["ide2"] = media
network = build_network_value(effective)
if network:
payload["net0"] = network
for disk in effective.disks:
payload[disk.slot_name] = build_disk_value(disk)
return payload
def build_confirmation_text(config: VmConfig, settings: AppSettings) -> str:
effective = (
effective_vm_config(config, settings) if settings.safety_policy.allow_create else config
)
startup = build_startup_value(
effective.general.startup_order,
effective.general.startup_delay,
effective.general.shutdown_timeout,
)
system_line = (
f"System: machine={effective.system.machine}, "
f"bios={effective.system.bios}, scsi={effective.system.scsi_controller}"
)
efi_line = (
"EFI disk: "
f"{'enabled' if effective.system.add_efi_disk else 'disabled'} "
f"({effective.system.efi_storage or '-'})"
)
cpu_line = (
f"CPU: {effective.cpu.sockets} socket(s), "
f"{effective.cpu.cores} core(s), type={effective.cpu.cpu_type}"
)
memory_line = (
f"Memory: {effective.memory.memory_mib}MiB / balloon minimum "
f"{effective.memory.min_memory_mib if effective.memory.ballooning else 0}MiB"
)
lines = [
f"Node: {effective.general.node}",
f"VM ID: {effective.general.vmid}",
f"Name: {effective.general.name}",
f"Pool: {effective.general.pool or '-'}",
f"Tags: {', '.join(effective.general.tags) or '-'}",
f"HA: {'enabled' if effective.general.ha_enabled else 'disabled'}",
f"On boot: {'enabled' if effective.general.onboot else 'disabled'}",
f"Startup: {startup or '-'}",
"",
f"Media: {effective.os.media_choice}",
f"ISO storage: {effective.os.storage or '-'}",
f"ISO: {effective.os.iso or '-'}",
f"Guest: {effective.os.guest_type} / {effective.os.guest_version}",
"",
system_line,
efi_line,
f"TPM: {'enabled' if effective.system.tpm_enabled else 'disabled'}",
f"Qemu agent: {'enabled' if effective.system.qemu_agent else 'disabled'}",
"",
"Disks:",
]
for disk in effective.disks:
lines.append(
f" - {disk.slot_name}: {disk.storage} {disk.size_gib}GiB "
f"cache={disk.cache} discard={'on' if disk.discard else 'ignore'}"
)
lines.extend(
[
"",
cpu_line,
memory_line,
"",
(
"Network: disabled"
if effective.network.no_network_device
else f"Network: {effective.network.model} on {effective.network.bridge}"
),
]
)
return "\n".join(lines)

View file

@ -0,0 +1,48 @@
class AppError(Exception):
"""Base application error."""
class SettingsError(AppError):
"""Configuration is missing or invalid."""
class ProxmoxError(AppError):
"""Base error raised while talking to Proxmox."""
class ProxmoxTransportError(ProxmoxError):
"""Transport-level failure while talking to Proxmox."""
class ProxmoxConnectError(ProxmoxTransportError):
"""DNS or TCP connection failure."""
class ProxmoxTlsError(ProxmoxTransportError):
"""TLS handshake or certificate verification failure."""
class ProxmoxAuthError(ProxmoxError):
"""Authentication failure."""
class ProxmoxApiError(ProxmoxError):
"""Unexpected HTTP response from the API."""
def __init__(self, message: str, status_code: int | None = None) -> None:
super().__init__(message)
self.status_code = status_code
class ProxmoxUnexpectedResponseError(ProxmoxError):
"""The API returned an unexpected payload shape."""
class ProxmoxPostCreateError(ProxmoxError):
"""A follow-up step failed after the VM already existed."""
def __init__(self, node: str, vmid: int, step: str, message: str) -> None:
super().__init__(message)
self.node = node
self.vmid = vmid
self.step = step

View file

@ -0,0 +1 @@
"""Application models."""

View file

@ -0,0 +1,157 @@
from __future__ import annotations
from dataclasses import dataclass, field
WIZARD_STEPS = [
"general",
"os",
"system",
"disks",
"cpu",
"memory",
"network",
"confirm",
]
@dataclass
class AuthenticationState:
username: str | None = None
realm: str | None = None
authenticated: bool = False
@dataclass
class GeneralConfig:
node: str = ""
vmid: int = 101
name: str = ""
pool: str = ""
tags: list[str] = field(default_factory=list)
ha_enabled: bool = True
onboot: bool = False
startup_order: str = ""
startup_delay: str = ""
shutdown_timeout: str = ""
@dataclass
class OsConfig:
media_choice: str = "iso"
storage: str = ""
iso: str = ""
physical_drive_path: str = "/dev/sr0"
guest_type: str = "linux"
guest_version: str = "l26"
@dataclass
class SystemConfig:
graphic_card: str = "default"
machine: str = "q35"
bios: str = "ovmf"
add_efi_disk: bool = True
efi_storage: str = "ceph-pool"
pre_enrolled_keys: bool = False
scsi_controller: str = "virtio-scsi-single"
qemu_agent: bool = True
tpm_enabled: bool = False
@dataclass
class DiskConfig:
bus: str = "scsi"
device: int = 0
storage: str = "ceph-pool"
size_gib: int = 32
format: str = "raw"
cache: str = "none"
discard: bool = False
io_thread: bool = True
ssd_emulation: bool = True
backup: bool = True
skip_replication: bool = False
async_io: str = "io_uring"
@property
def slot_name(self) -> str:
return f"{self.bus}{self.device}"
@dataclass
class CpuConfig:
cores: int = 2
sockets: int = 1
cpu_type: str = "host"
@dataclass
class MemoryConfig:
memory_mib: int = 2048
min_memory_mib: int = 2048
ballooning: bool = True
allow_ksm: bool = True
@dataclass
class NetworkConfig:
no_network_device: bool = False
bridge: str = "vmbr9"
vlan_tag: str = ""
model: str = "virtio"
mac_address: str = ""
firewall: bool = True
disconnected: bool = False
mtu: str = ""
rate_limit: str = ""
multiqueue: str = ""
@dataclass
class VmConfig:
general: GeneralConfig = field(default_factory=GeneralConfig)
os: OsConfig = field(default_factory=OsConfig)
system: SystemConfig = field(default_factory=SystemConfig)
disks: list[DiskConfig] = field(default_factory=lambda: [DiskConfig()])
cpu: CpuConfig = field(default_factory=CpuConfig)
memory: MemoryConfig = field(default_factory=MemoryConfig)
network: NetworkConfig = field(default_factory=NetworkConfig)
@dataclass
class ReferenceData:
nodes: list[str] = field(default_factory=list)
pools: list[str] = field(default_factory=list)
existing_tags: list[str] = field(default_factory=list)
bridges: list[str] = field(default_factory=list)
iso_storages: list[str] = field(default_factory=list)
disk_storages: list[str] = field(default_factory=list)
all_storages: list[str] = field(default_factory=list)
isos: list[str] = field(default_factory=list)
@dataclass
class SubmissionState:
phase: str = "idle"
message: str = ""
node: str | None = None
vmid: int | None = None
partial_success: bool = False
@dataclass
class WorkflowState:
current_step_index: int = 0
available_realms: list[str] = field(default_factory=list)
authentication: AuthenticationState = field(default_factory=AuthenticationState)
config: VmConfig = field(default_factory=VmConfig)
reference_data: ReferenceData = field(default_factory=ReferenceData)
submission: SubmissionState = field(default_factory=SubmissionState)
@property
def current_step(self) -> str:
return WIZARD_STEPS[self.current_step_index]
@property
def step_title(self) -> str:
return self.current_step.replace("_", " ").title()

View file

@ -0,0 +1 @@
"""Textual screens."""

View file

@ -0,0 +1,159 @@
from __future__ import annotations
from textual import on
from textual.containers import Vertical
from textual.message import Message
from textual.widgets import Button, Input, Select, Static
from ..errors import ProxmoxError
from ..models.workflow import WorkflowState
from ..services.base import ProxmoxService, Realm
from ..settings import AppSettings
class LoginView(Vertical):
class Authenticated(Message):
def __init__(self, username: str, realm: str) -> None:
self.username = username
self.realm = realm
super().__init__()
DEFAULT_CSS = """
LoginView {
width: 1fr;
height: 1fr;
padding: 1 2;
align-horizontal: center;
}
#login-card {
width: 80;
max-width: 100%;
border: round $accent;
padding: 1 2;
}
#title {
text-style: bold;
margin-bottom: 1;
}
Input, Select, Button {
margin-top: 1;
}
#status {
margin-top: 1;
color: $text-muted;
}
"""
def __init__(
self,
settings: AppSettings,
workflow: WorkflowState,
service: ProxmoxService,
) -> None:
super().__init__()
self._settings = settings
self._workflow = workflow
self._service = service
def compose(self):
with Vertical(id="login-card"):
yield Static("Proxmox Login", id="title")
yield Static(
f"Mode: {self._service.mode} on {self._settings.sanitized_host}",
id="mode",
)
yield Input(
value=self._settings.proxmox_user or "",
placeholder="Username",
id="username",
)
yield Input(
value=self._settings.proxmox_password or "",
password=True,
placeholder="Password",
id="password",
)
yield Select[str](options=[], prompt="Realm", id="realm")
yield Button("Connect", id="connect", variant="primary")
yield Static("Loading realms...", id="status")
def on_mount(self) -> None:
username_input = self.query_one("#username", Input)
self.call_after_refresh(self.app.set_focus, username_input)
self.run_worker(self._load_realms, thread=True, exclusive=True)
def _load_realms(self) -> None:
try:
realms = self._service.load_realms()
except Exception as exc:
self.app.call_from_thread(self._show_status, f"Failed to load realms: {exc}")
return
self.app.call_from_thread(self._set_realms, realms)
def _set_realms(self, realms: list[Realm]) -> None:
self._workflow.available_realms = [realm.name for realm in realms]
options = [(realm.title, realm.name) for realm in realms]
select = self.query_one("#realm", Select)
select.set_options(options)
preferred_realm = self._settings.proxmox_realm
if preferred_realm and preferred_realm in self._workflow.available_realms:
select.value = preferred_realm
elif realms:
default_realm = next((realm.name for realm in realms if realm.default), realms[0].name)
select.value = default_realm
self._show_status(f"Loaded {len(realms)} realm(s).")
def _show_status(self, message: str) -> None:
self.query_one("#status", Static).update(message)
@on(Button.Pressed, "#connect")
def on_connect_pressed(self) -> None:
self._submit()
@on(Input.Submitted, "#username")
@on(Input.Submitted, "#password")
def on_input_submitted(self) -> None:
self._submit()
@on(Select.Changed, "#realm")
def on_realm_changed(self) -> None:
# Keep the form keyboard friendly once realms have loaded.
if self.app.focused is None:
username_input = self.query_one("#username", Input)
self.call_after_refresh(self.app.set_focus, username_input)
def _submit(self) -> None:
username = self.query_one("#username", Input).value.strip()
password = self.query_one("#password", Input).value
realm = self.query_one("#realm", Select).value
if not username or not password or not isinstance(realm, str):
self._show_status("Username, password, and realm are required.")
return
self._show_status("Authenticating...")
self.run_worker(
lambda: self._authenticate(username=username, password=password, realm=realm),
thread=True,
exclusive=True,
)
def _authenticate(self, *, username: str, password: str, realm: str) -> None:
try:
session = self._service.login(username, password, realm)
except (ProxmoxError, ValueError) as exc:
self.app.call_from_thread(self._show_status, f"Authentication failed: {exc}")
return
self.app.call_from_thread(self._mark_authenticated, session.username, realm)
def _mark_authenticated(self, username: str, realm: str) -> None:
self._workflow.authentication.username = username
self._workflow.authentication.realm = realm
self._workflow.authentication.authenticated = True
self._show_status(f"Authenticated as {username}.")
self.post_message(self.Authenticated(username, realm))

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1 @@
"""Service layer for Proxmox access."""

View file

@ -0,0 +1,90 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Protocol
if TYPE_CHECKING:
from ..models.workflow import VmConfig
@dataclass(frozen=True)
class Realm:
name: str
title: str
default: bool = False
@dataclass(frozen=True)
class AuthenticatedSession:
username: str
ticket: str
csrf_token: str | None = None
@dataclass(frozen=True)
class Node:
name: str
status: str | None = None
@dataclass(frozen=True)
class Pool:
poolid: str
comment: str | None = None
@dataclass(frozen=True)
class Storage:
storage: str
node: str
content: tuple[str, ...]
@dataclass(frozen=True)
class Bridge:
iface: str
active: bool = True
@dataclass(frozen=True)
class IsoImage:
volid: str
storage: str
node: str
@dataclass(frozen=True)
class VmCreationResult:
node: str
vmid: int
name: str
serial_console_configured: bool = True
ha_configured: bool = False
class ProxmoxService(Protocol):
mode: str
def check_connectivity(self) -> str: ...
def check_api_base(self) -> str: ...
def load_realms(self) -> list[Realm]: ...
def login(self, username: str, password: str, realm: str) -> AuthenticatedSession: ...
def load_nodes(self) -> list[Node]: ...
def load_next_vmid(self) -> int: ...
def load_pools(self) -> list[Pool]: ...
def load_existing_tags(self) -> list[str]: ...
def load_bridges(self, node: str) -> list[Bridge]: ...
def load_storages(self, node: str) -> list[Storage]: ...
def load_isos(self, node: str, storage: str) -> list[IsoImage]: ...
def create_vm(self, config: VmConfig, start_after_create: bool = False) -> VmCreationResult: ...

View file

@ -0,0 +1,12 @@
from ..settings import AppSettings
from .base import ProxmoxService
from .fake import FakeProxmoxService
from .proxmox import LiveProxmoxService, ProxmoxApiClient
class ProxmoxServiceFactory:
@staticmethod
def create(settings: AppSettings) -> ProxmoxService:
if settings.is_live_configured:
return LiveProxmoxService(ProxmoxApiClient(settings))
return FakeProxmoxService()

View file

@ -0,0 +1,82 @@
from ..models.workflow import VmConfig
from .base import (
AuthenticatedSession,
Bridge,
IsoImage,
Node,
Pool,
ProxmoxService,
Realm,
Storage,
VmCreationResult,
)
class FakeProxmoxService(ProxmoxService):
mode = "fake"
def __init__(self) -> None:
self.created_vms: list[VmCreationResult] = []
self.start_after_create_requests: list[bool] = []
def check_connectivity(self) -> str:
return "fake transport reachable"
def check_api_base(self) -> str:
return "fake-api-base"
def load_realms(self) -> list[Realm]:
return [
Realm(name="pam", title="Linux PAM standard authentication", default=True),
Realm(name="pve", title="Proxmox VE authentication server"),
]
def login(self, username: str, password: str, realm: str) -> AuthenticatedSession:
if not username or not password:
raise ValueError("Username and password are required.")
return AuthenticatedSession(username=f"{username}@{realm}", ticket="fake-ticket")
def load_nodes(self) -> list[Node]:
return [Node(name="fake-node-01", status="online")]
def load_next_vmid(self) -> int:
return 123
def load_pools(self) -> list[Pool]:
return [Pool(poolid="lab"), Pool(poolid="sandbox")]
def load_existing_tags(self) -> list[str]:
return ["codex-e2e", "linux", "test"]
def load_bridges(self, node: str) -> list[Bridge]:
return [Bridge(iface="vmbr9"), Bridge(iface="vmbr0")]
def load_storages(self, node: str) -> list[Storage]:
return [
Storage(storage="cephfs", node=node, content=("iso", "backup")),
Storage(storage="ceph-pool", node=node, content=("images",)),
]
def load_isos(self, node: str, storage: str) -> list[IsoImage]:
return [
IsoImage(
volid=f"{storage}:iso/nixos-minimal-24-11.1234abcd-x86_64-linux.iso",
storage=storage,
node=node,
)
]
def create_vm(self, config: VmConfig, start_after_create: bool = False) -> VmCreationResult:
name = config.general.name
if not name.startswith("codex-e2e-"):
name = f"codex-e2e-{name}"
self.start_after_create_requests.append(start_after_create)
result = VmCreationResult(
node=config.general.node,
vmid=config.general.vmid,
name=name,
serial_console_configured=True,
ha_configured=config.general.ha_enabled,
)
self.created_vms.append(result)
return result

View file

@ -0,0 +1,399 @@
from __future__ import annotations
import time
from collections.abc import Callable
import httpx
from ..domain import build_create_payload
from ..errors import (
ProxmoxApiError,
ProxmoxAuthError,
ProxmoxConnectError,
ProxmoxError,
ProxmoxPostCreateError,
ProxmoxTlsError,
ProxmoxTransportError,
ProxmoxUnexpectedResponseError,
)
from ..models.workflow import VmConfig
from ..settings import AppSettings
from .base import (
AuthenticatedSession,
Bridge,
IsoImage,
Node,
Pool,
ProxmoxService,
Realm,
Storage,
VmCreationResult,
)
def _looks_like_tls_error(message: str) -> bool:
upper = message.upper()
indicators = ("SSL", "TLS", "CERTIFICATE", "WRONG_VERSION", "EOF")
return any(token in upper for token in indicators)
class ProxmoxApiClient:
def __init__(
self,
settings: AppSettings,
*,
transport: httpx.BaseTransport | None = None,
client_factory: Callable[..., httpx.Client] | None = None,
) -> None:
settings.validate_live_requirements()
self._settings = settings
factory = client_factory or httpx.Client
self._client = factory(
base_url=settings.api_url,
verify=settings.proxmox_verify_tls,
timeout=settings.request_timeout_seconds,
follow_redirects=True,
transport=transport,
)
self._ticket: str | None = None
self._csrf_token: str | None = None
def close(self) -> None:
self._client.close()
def probe_transport(self) -> str:
try:
response = self._client.get(self._settings.proxmox_url or "/")
return f"HTTP {response.status_code}"
except httpx.ConnectError as exc:
raise ProxmoxConnectError("Unable to connect to the Proxmox host.") from exc
except httpx.TransportError as exc:
message = str(exc)
if _looks_like_tls_error(message):
raise ProxmoxTlsError("TLS handshake or verification failed.") from exc
raise ProxmoxTransportError("Transport error while probing Proxmox.") from exc
def check_api_base(self) -> str:
payload = self._request_json("GET", "/access/domains")
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("API base check did not return a realms list.")
return "access/domains"
def login(self, username: str, password: str, realm: str) -> AuthenticatedSession:
full_username = username if "@" in username else f"{username}@{realm}"
payload = self._request_json(
"POST",
"/access/ticket",
data={"username": full_username, "password": password},
)
ticket = payload.get("ticket")
csrf_token = payload.get("CSRFPreventionToken")
if not isinstance(ticket, str) or not ticket:
raise ProxmoxUnexpectedResponseError("Login response did not include a ticket.")
self._ticket = ticket
self._csrf_token = csrf_token if isinstance(csrf_token, str) else None
self._client.cookies.set("PVEAuthCookie", ticket)
return AuthenticatedSession(
username=full_username,
ticket=ticket,
csrf_token=self._csrf_token,
)
def load_realms(self) -> list[Realm]:
payload = self._request_json("GET", "/access/domains")
realms: list[Realm] = []
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("Realms payload was not a list.")
for item in payload:
if not isinstance(item, dict):
continue
realm = item.get("realm")
title = item.get("comment") or item.get("commentary") or realm
if isinstance(realm, str) and isinstance(title, str):
realms.append(
Realm(
name=realm,
title=title,
default=bool(item.get("default")),
)
)
return realms
def load_nodes(self) -> list[Node]:
payload = self._request_json("GET", "/nodes", requires_auth=True)
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("Nodes payload was not a list.")
return [
Node(name=item["node"], status=item.get("status"))
for item in payload
if isinstance(item, dict) and isinstance(item.get("node"), str)
]
def load_next_vmid(self) -> int:
payload = self._request_json("GET", "/cluster/nextid", requires_auth=True)
if isinstance(payload, int):
return payload
if isinstance(payload, str) and payload.isdigit():
return int(payload)
raise ProxmoxUnexpectedResponseError("Next VM ID payload was not an integer.")
def load_pools(self) -> list[Pool]:
payload = self._request_json("GET", "/pools", requires_auth=True)
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("Pools payload was not a list.")
return [
Pool(poolid=item["poolid"], comment=item.get("comment"))
for item in payload
if isinstance(item, dict) and isinstance(item.get("poolid"), str)
]
def load_existing_tags(self) -> list[str]:
payload = self._request_json(
"GET",
"/cluster/resources",
params={"type": "vm"},
requires_auth=True,
)
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("Cluster resource payload was not a list.")
tags: set[str] = set()
for item in payload:
if not isinstance(item, dict):
continue
raw_tags = item.get("tags")
if not isinstance(raw_tags, str):
continue
for tag in raw_tags.replace(",", ";").split(";"):
normalized = tag.strip()
if normalized:
tags.add(normalized)
return sorted(tags)
def load_bridges(self, node: str) -> list[Bridge]:
payload = self._request_json("GET", f"/nodes/{node}/network", requires_auth=True)
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("Network payload was not a list.")
bridges = [
Bridge(iface=item["iface"], active=bool(item.get("active", True)))
for item in payload
if isinstance(item, dict)
and item.get("type") == "bridge"
and isinstance(item.get("iface"), str)
]
return sorted(bridges, key=lambda bridge: bridge.iface)
def load_storages(self, node: str) -> list[Storage]:
payload = self._request_json("GET", f"/nodes/{node}/storage", requires_auth=True)
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("Storage payload was not a list.")
storages: list[Storage] = []
for item in payload:
if not isinstance(item, dict):
continue
storage = item.get("storage")
content = tuple(
part.strip()
for part in str(item.get("content", "")).split(",")
if part and isinstance(part, str)
)
if isinstance(storage, str):
storages.append(Storage(storage=storage, node=node, content=content))
return storages
def load_isos(self, node: str, storage: str) -> list[IsoImage]:
payload = self._request_json(
"GET",
f"/nodes/{node}/storage/{storage}/content",
params={"content": "iso"},
requires_auth=True,
)
if not isinstance(payload, list):
raise ProxmoxUnexpectedResponseError("ISO payload was not a list.")
return [
IsoImage(volid=item["volid"], storage=storage, node=node)
for item in payload
if isinstance(item, dict) and isinstance(item.get("volid"), str)
]
def create_vm(self, config: VmConfig, start_after_create: bool = False) -> VmCreationResult:
payload = build_create_payload(config, self._settings)
node = config.general.node
vmid = config.general.vmid
name = payload["name"]
upid = self._request_json(
"POST",
f"/nodes/{node}/qemu",
data={key: str(value) for key, value in payload.items()},
requires_auth=True,
)
if isinstance(upid, str) and upid.startswith("UPID:"):
self._wait_for_task(node, upid)
try:
serial_result = self._request_json(
"PUT",
f"/nodes/{node}/qemu/{vmid}/config",
data={"serial0": "socket"},
requires_auth=True,
)
if isinstance(serial_result, str) and serial_result.startswith("UPID:"):
self._wait_for_task(node, serial_result)
except ProxmoxError as exc:
raise ProxmoxPostCreateError(
node,
vmid,
"serial-console",
f"VM was created but serial console configuration failed: {exc}",
) from exc
if config.general.ha_enabled:
try:
ha_result = self._request_json(
"POST",
"/cluster/ha/resources",
data={
"sid": f"vm:{vmid}",
"state": "started" if start_after_create else "stopped",
},
requires_auth=True,
)
if isinstance(ha_result, str) and ha_result.startswith("UPID:"):
self._wait_for_task(node, ha_result)
except ProxmoxError as exc:
raise ProxmoxPostCreateError(
node,
vmid,
"high-availability",
f"VM was created but HA configuration failed: {exc}",
) from exc
elif start_after_create:
try:
start_result = self._request_json(
"POST",
f"/nodes/{node}/qemu/{vmid}/status/start",
requires_auth=True,
)
if isinstance(start_result, str) and start_result.startswith("UPID:"):
self._wait_for_task(node, start_result)
except ProxmoxError as exc:
raise ProxmoxPostCreateError(
node,
vmid,
"start",
f"VM was created but automatic start failed: {exc}",
) from exc
return VmCreationResult(
node=node,
vmid=vmid,
name=str(name),
serial_console_configured=True,
ha_configured=config.general.ha_enabled,
)
def _wait_for_task(self, node: str, upid: str) -> None:
deadline = time.time() + self._settings.request_timeout_seconds
while time.time() < deadline:
payload = self._request_json(
"GET",
f"/nodes/{node}/tasks/{upid}/status",
requires_auth=True,
)
if not isinstance(payload, dict):
raise ProxmoxUnexpectedResponseError("Task status payload was not an object.")
if payload.get("status") == "stopped":
if payload.get("exitstatus") != "OK":
raise ProxmoxApiError(f"Task {upid} failed with {payload.get('exitstatus')}.")
return
time.sleep(0.5)
raise ProxmoxTransportError(f"Timed out while waiting for task {upid}.")
def _request_json(
self,
method: str,
path: str,
*,
requires_auth: bool = False,
**kwargs: object,
) -> object:
headers: dict[str, str] = {}
if requires_auth:
if not self._ticket:
raise ProxmoxAuthError("Not authenticated with Proxmox.")
if method.upper() not in {"GET", "HEAD", "OPTIONS"} and self._csrf_token:
headers["CSRFPreventionToken"] = self._csrf_token
try:
response = self._client.request(method, path, headers=headers, **kwargs)
response.raise_for_status()
except httpx.ConnectError as exc:
raise ProxmoxConnectError("Unable to connect to the Proxmox API.") from exc
except httpx.TransportError as exc:
message = str(exc)
if _looks_like_tls_error(message):
raise ProxmoxTlsError("TLS handshake or verification failed.") from exc
raise ProxmoxTransportError("Transport error while calling the Proxmox API.") from exc
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
if status_code in {401, 403}:
raise ProxmoxAuthError("Authentication was rejected by Proxmox.") from exc
raise ProxmoxApiError(
f"Proxmox API returned HTTP {status_code}.",
status_code=status_code,
) from exc
try:
payload = response.json()
except ValueError as exc:
raise ProxmoxUnexpectedResponseError("Expected a JSON response from Proxmox.") from exc
if not isinstance(payload, dict) or "data" not in payload:
raise ProxmoxUnexpectedResponseError("Expected a top-level data field in the response.")
return payload["data"]
class LiveProxmoxService(ProxmoxService):
mode = "live"
def __init__(self, client: ProxmoxApiClient) -> None:
self._client = client
def check_connectivity(self) -> str:
return self._client.probe_transport()
def check_api_base(self) -> str:
return self._client.check_api_base()
def load_realms(self) -> list[Realm]:
return self._client.load_realms()
def login(self, username: str, password: str, realm: str) -> AuthenticatedSession:
return self._client.login(username, password, realm)
def load_nodes(self) -> list[Node]:
return self._client.load_nodes()
def load_next_vmid(self) -> int:
return self._client.load_next_vmid()
def load_pools(self) -> list[Pool]:
return self._client.load_pools()
def load_existing_tags(self) -> list[str]:
return self._client.load_existing_tags()
def load_bridges(self, node: str) -> list[Bridge]:
return self._client.load_bridges(node)
def load_storages(self, node: str) -> list[Storage]:
return self._client.load_storages(node)
def load_isos(self, node: str, storage: str) -> list[IsoImage]:
return self._client.load_isos(node, storage)
def create_vm(self, config: VmConfig, start_after_create: bool = False) -> VmCreationResult:
return self._client.create_vm(config, start_after_create=start_after_create)
def close(self) -> None:
self._client.close()

View file

@ -0,0 +1,179 @@
from __future__ import annotations
import os
from collections.abc import Mapping
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlparse
from dotenv import dotenv_values
from .errors import SettingsError
def _parse_bool(value: str | None, *, default: bool) -> bool:
if value is None or value == "":
return default
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise SettingsError(f"Invalid boolean value: {value!r}")
def _parse_int(value: str | None, *, default: int) -> int:
if value is None or value == "":
return default
try:
return int(value)
except ValueError as exc:
raise SettingsError(f"Invalid integer value: {value!r}") from exc
@dataclass(frozen=True)
class LiveSafetyPolicy:
prevent_create: bool
enable_test_mode: bool
test_node: str | None
test_pool: str | None
test_tag: str
test_vm_name_prefix: str
keep_failed_vm: bool
def validate(self) -> None:
if self.enable_test_mode:
if not self.test_node:
raise SettingsError(
"PROXMOX_TEST_NODE is required when PROXMOX_ENABLE_TEST_MODE=true."
)
if not self.test_pool:
raise SettingsError(
"PROXMOX_TEST_POOL is required when PROXMOX_ENABLE_TEST_MODE=true."
)
if not self.test_tag:
raise SettingsError(
"PROXMOX_TEST_TAG is required when PROXMOX_ENABLE_TEST_MODE=true."
)
if not self.test_vm_name_prefix:
raise SettingsError(
"PROXMOX_TEST_VM_NAME_PREFIX is required when "
"PROXMOX_ENABLE_TEST_MODE=true."
)
@property
def allow_create(self) -> bool:
return not self.prevent_create
def effective_vm_name(self, requested_name: str) -> str:
if not self.enable_test_mode:
return requested_name
if requested_name.startswith(self.test_vm_name_prefix):
return requested_name
return f"{self.test_vm_name_prefix}{requested_name}"
@dataclass(frozen=True)
class AppSettings:
proxmox_url: str | None
proxmox_api_base: str
proxmox_user: str | None
proxmox_password: str | None
proxmox_realm: str | None
proxmox_verify_tls: bool
request_timeout_seconds: int
safety_policy: LiveSafetyPolicy
@classmethod
def from_env(
cls,
env: Mapping[str, str] | None = None,
*,
load_dotenv_file: bool = True,
dotenv_path: str | Path = ".env",
) -> AppSettings:
raw: dict[str, str] = {}
if load_dotenv_file:
raw.update(
{
key: value
for key, value in dotenv_values(dotenv_path).items()
if value is not None
}
)
raw.update(os.environ if env is None else env)
api_base = raw.get("PROXMOX_API_BASE", "/api2/json").strip() or "/api2/json"
if not api_base.startswith("/"):
api_base = f"/{api_base}"
safety_policy = LiveSafetyPolicy(
prevent_create=_parse_bool(raw.get("PROXMOX_PREVENT_CREATE"), default=False),
enable_test_mode=_parse_bool(raw.get("PROXMOX_ENABLE_TEST_MODE"), default=False),
test_node=raw.get("PROXMOX_TEST_NODE") or None,
test_pool=raw.get("PROXMOX_TEST_POOL") or None,
test_tag=raw.get("PROXMOX_TEST_TAG", "codex-e2e").strip() or "codex-e2e",
test_vm_name_prefix=raw.get("PROXMOX_TEST_VM_NAME_PREFIX", "codex-e2e-").strip()
or "codex-e2e-",
keep_failed_vm=_parse_bool(raw.get("PROXMOX_KEEP_FAILED_VM"), default=True),
)
safety_policy.validate()
proxmox_url = (raw.get("PROXMOX_URL") or "").strip() or None
if proxmox_url is not None:
proxmox_url = proxmox_url.rstrip("/")
return cls(
proxmox_url=proxmox_url,
proxmox_api_base=api_base,
proxmox_user=(raw.get("PROXMOX_USER") or "").strip() or None,
proxmox_password=raw.get("PROXMOX_PASSWORD") or None,
proxmox_realm=(raw.get("PROXMOX_REALM") or "").strip() or None,
proxmox_verify_tls=_parse_bool(raw.get("PROXMOX_VERIFY_TLS"), default=False),
request_timeout_seconds=_parse_int(
raw.get("PROXMOX_REQUEST_TIMEOUT_SECONDS"), default=15
),
safety_policy=safety_policy,
)
@property
def is_live_configured(self) -> bool:
return bool(self.proxmox_url and self.proxmox_user and self.proxmox_password)
@property
def effective_username(self) -> str | None:
if not self.proxmox_user or not self.proxmox_realm:
return None
if "@" in self.proxmox_user:
return self.proxmox_user
return f"{self.proxmox_user}@{self.proxmox_realm}"
@property
def sanitized_host(self) -> str:
if not self.proxmox_url:
return "not-configured"
parsed = urlparse(self.proxmox_url)
host = parsed.hostname or parsed.netloc or self.proxmox_url
if parsed.port:
return f"{host}:{parsed.port}"
return host
@property
def api_url(self) -> str:
if not self.proxmox_url:
raise SettingsError("PROXMOX_URL is required for live Proxmox access.")
return f"{self.proxmox_url}{self.proxmox_api_base}"
def validate_live_requirements(self) -> None:
missing: list[str] = []
if not self.proxmox_url:
missing.append("PROXMOX_URL")
if not self.proxmox_user:
missing.append("PROXMOX_USER")
if not self.proxmox_password:
missing.append("PROXMOX_PASSWORD")
if not self.proxmox_realm:
missing.append("PROXMOX_REALM")
if missing:
joined = ", ".join(missing)
raise SettingsError(f"Missing live Proxmox configuration: {joined}.")

View file

@ -0,0 +1,140 @@
from __future__ import annotations
import asyncio
import os
import signal
import sys
import termios
import tty
from threading import Thread
from textual import events
from textual.driver import Driver
from textual.geometry import Size
from textual.messages import TerminalSupportInBandWindowResize
def build_driver_class() -> type[Driver] | None:
"""Return an opt-in compatibility driver for problematic terminals.
Textual's stock driver is the default because it is the best-tested path.
The compatibility driver remains available behind an env flag for targeted
debugging only.
"""
if os.getenv("YOUR_APP_ENABLE_COMPAT_DRIVER", "").lower() not in {"1", "true", "yes"}:
return None
if sys.platform.startswith("win"):
return None
from textual.drivers._writer_thread import WriterThread
from textual.drivers.linux_driver import LinuxDriver
class CompatLinuxDriver(LinuxDriver):
"""Terminal driver with advanced terminal features disabled.
This avoids terminal-specific issues around Kitty keyboard mode,
mouse tracking, sync mode probing, and bracketed paste.
"""
def start_application_mode(self) -> None:
def _stop_again(*_) -> None:
os.kill(os.getpid(), signal.SIGSTOP)
if os.isatty(self.fileno):
signal.signal(signal.SIGTTOU, _stop_again)
signal.signal(signal.SIGTTIN, _stop_again)
try:
termios.tcsetattr(
self.fileno, termios.TCSANOW, termios.tcgetattr(self.fileno)
)
except termios.error:
return
finally:
signal.signal(signal.SIGTTOU, signal.SIG_DFL)
signal.signal(signal.SIGTTIN, signal.SIG_DFL)
loop = asyncio.get_running_loop()
def send_size_event() -> None:
width, height = self._get_terminal_size()
textual_size = Size(width, height)
event = events.Resize(textual_size, textual_size)
asyncio.run_coroutine_threadsafe(self._app._post_message(event), loop=loop)
self._writer_thread = WriterThread(self._file)
self._writer_thread.start()
def on_terminal_resize(signum, stack) -> None:
if not self._in_band_window_resize:
send_size_event()
signal.signal(signal.SIGWINCH, on_terminal_resize)
self.write("\x1b[?1049h")
try:
self.attrs_before = termios.tcgetattr(self.fileno)
except termios.error:
self.attrs_before = None
try:
newattr = termios.tcgetattr(self.fileno)
except termios.error:
pass
else:
newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])
newattr[tty.CC][termios.VMIN] = 1
try:
termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)
except termios.error:
pass
self.write("\x1b[?25l")
self.flush()
self._key_thread = Thread(target=self._run_input_thread)
send_size_event()
self._key_thread.start()
self._disable_line_wrap()
if self._must_signal_resume:
self._must_signal_resume = False
asyncio.run_coroutine_threadsafe(
self._app._post_message(self.SignalResume()),
loop=loop,
)
def stop_application_mode(self) -> None:
self._enable_line_wrap()
self.disable_input()
if self.attrs_before is not None:
try:
termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
except termios.error:
pass
self.write("\x1b[?1049l")
self.write("\x1b[?25h")
self.flush()
def _request_terminal_sync_mode_support(self) -> None:
return
def _disable_in_band_window_resize(self) -> None:
self._in_band_window_resize = False
async def _on_terminal_supports_in_band_window_resize(
self, message: TerminalSupportInBandWindowResize
) -> None:
self._in_band_window_resize = False
return CompatLinuxDriver
def apply_runtime_compatibility() -> None:
os.environ.setdefault("TEXTUAL_ALLOW_SIGNALS", "1")
signal.signal(signal.SIGINT, signal.default_int_handler)

View file

@ -0,0 +1 @@
"""Reusable Textual widgets."""