545 lines
20 KiB
Python
545 lines
20 KiB
Python
"""
|
|
Tests for the import/export functionality.
|
|
|
|
This module contains comprehensive tests for the ImportExportService class
|
|
and all supported file formats.
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
import csv
|
|
import tempfile
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from src.hosts.core.import_export import (
|
|
ImportExportService, ImportResult, ExportFormat, ImportFormat
|
|
)
|
|
from src.hosts.core.models import HostEntry, HostsFile
|
|
|
|
class TestImportExportService:
|
|
"""Test ImportExportService class."""
|
|
|
|
@pytest.fixture
|
|
def service(self):
|
|
"""Create ImportExportService instance."""
|
|
return ImportExportService()
|
|
|
|
@pytest.fixture
|
|
def sample_hosts_file(self):
|
|
"""Create sample HostsFile for testing."""
|
|
entries = [
|
|
HostEntry("127.0.0.1", ["localhost"], "Local host", True),
|
|
HostEntry("192.168.1.1", ["router.local"], "Home router", True),
|
|
HostEntry("1.1.1.1", ["dns-only.com"], "DNS only entry", False), # Temp IP
|
|
HostEntry("10.0.0.1", ["test.example.com"], "Test server", True)
|
|
]
|
|
|
|
# Convert to DNS entry and set DNS data for some entries
|
|
entries[2].ip_address = "" # Remove IP after creation
|
|
entries[2].dns_name = "dns-only.com"
|
|
entries[3].resolved_ip = "10.0.0.1"
|
|
entries[3].last_resolved = datetime(2024, 1, 15, 12, 0, 0)
|
|
entries[3].dns_resolution_status = "IP_MATCH"
|
|
|
|
hosts_file = HostsFile()
|
|
hosts_file.entries = entries
|
|
return hosts_file
|
|
|
|
@pytest.fixture
|
|
def temp_dir(self):
|
|
"""Create temporary directory for test files."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
yield Path(tmpdir)
|
|
|
|
def test_service_initialization(self, service):
|
|
"""Test service initialization."""
|
|
assert len(service.supported_export_formats) == 3
|
|
assert len(service.supported_import_formats) == 3
|
|
assert ExportFormat.HOSTS in service.supported_export_formats
|
|
assert ExportFormat.JSON in service.supported_export_formats
|
|
assert ExportFormat.CSV in service.supported_export_formats
|
|
|
|
def test_get_supported_formats(self, service):
|
|
"""Test getting supported formats."""
|
|
export_formats = service.get_supported_export_formats()
|
|
import_formats = service.get_supported_import_formats()
|
|
|
|
assert len(export_formats) == 3
|
|
assert len(import_formats) == 3
|
|
assert ExportFormat.HOSTS in export_formats
|
|
assert ImportFormat.JSON in import_formats
|
|
|
|
# Export Tests
|
|
|
|
def test_export_hosts_format(self, service, sample_hosts_file, temp_dir):
|
|
"""Test exporting to hosts format."""
|
|
export_path = temp_dir / "test_hosts.txt"
|
|
|
|
result = service.export_hosts_format(sample_hosts_file, export_path)
|
|
|
|
assert result.success is True
|
|
assert result.entries_exported == 4
|
|
assert len(result.errors) == 0
|
|
assert result.format == ExportFormat.HOSTS
|
|
assert export_path.exists()
|
|
|
|
# Verify content
|
|
content = export_path.read_text()
|
|
assert "127.0.0.1" in content
|
|
assert "localhost" in content
|
|
assert "router.local" in content
|
|
|
|
def test_export_json_format(self, service, sample_hosts_file, temp_dir):
|
|
"""Test exporting to JSON format."""
|
|
export_path = temp_dir / "test_export.json"
|
|
|
|
result = service.export_json_format(sample_hosts_file, export_path)
|
|
|
|
assert result.success is True
|
|
assert result.entries_exported == 4
|
|
assert len(result.errors) == 0
|
|
assert result.format == ExportFormat.JSON
|
|
assert export_path.exists()
|
|
|
|
# Verify JSON structure
|
|
with open(export_path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
assert "metadata" in data
|
|
assert "entries" in data
|
|
assert data["metadata"]["total_entries"] == 4
|
|
assert len(data["entries"]) == 4
|
|
|
|
# Check first entry
|
|
first_entry = data["entries"][0]
|
|
assert first_entry["ip_address"] == "127.0.0.1"
|
|
assert first_entry["hostnames"] == ["localhost"]
|
|
assert first_entry["is_active"] is True
|
|
|
|
# Check DNS entry
|
|
dns_entry = next((e for e in data["entries"] if e.get("dns_name")), None)
|
|
assert dns_entry is not None
|
|
assert dns_entry["dns_name"] == "dns-only.com"
|
|
|
|
def test_export_csv_format(self, service, sample_hosts_file, temp_dir):
|
|
"""Test exporting to CSV format."""
|
|
export_path = temp_dir / "test_export.csv"
|
|
|
|
result = service.export_csv_format(sample_hosts_file, export_path)
|
|
|
|
assert result.success is True
|
|
assert result.entries_exported == 4
|
|
assert len(result.errors) == 0
|
|
assert result.format == ExportFormat.CSV
|
|
assert export_path.exists()
|
|
|
|
# Verify CSV structure
|
|
with open(export_path, 'r') as f:
|
|
reader = csv.DictReader(f)
|
|
rows = list(reader)
|
|
|
|
assert len(rows) == 4
|
|
|
|
# Check header
|
|
expected_fields = [
|
|
'ip_address', 'hostnames', 'comment', 'is_active',
|
|
'dns_name', 'resolved_ip', 'last_resolved', 'dns_resolution_status'
|
|
]
|
|
assert reader.fieldnames == expected_fields
|
|
|
|
# Check first row
|
|
first_row = rows[0]
|
|
assert first_row["ip_address"] == "127.0.0.1"
|
|
assert first_row["hostnames"] == "localhost"
|
|
assert first_row["is_active"] == "True"
|
|
|
|
def test_export_invalid_path(self, service, sample_hosts_file):
|
|
"""Test export with invalid path."""
|
|
invalid_path = Path("/invalid/path/test.json")
|
|
|
|
result = service.export_json_format(sample_hosts_file, invalid_path)
|
|
|
|
assert result.success is False
|
|
assert result.entries_exported == 0
|
|
assert len(result.errors) > 0
|
|
assert "Failed to export JSON format" in result.errors[0]
|
|
|
|
# Import Tests
|
|
|
|
def test_import_hosts_format(self, service, temp_dir):
|
|
"""Test importing from hosts format."""
|
|
# Create test hosts file
|
|
hosts_content = """# Test hosts file
|
|
127.0.0.1 localhost
|
|
192.168.1.1 router.local # Home router
|
|
# 10.0.0.1 disabled.com # Disabled entry
|
|
"""
|
|
hosts_path = temp_dir / "test_hosts.txt"
|
|
hosts_path.write_text(hosts_content)
|
|
|
|
result = service.import_hosts_format(hosts_path)
|
|
|
|
assert result.success is True
|
|
assert result.total_processed >= 2
|
|
assert result.successfully_imported >= 2
|
|
assert len(result.errors) == 0
|
|
|
|
# Check imported entries
|
|
assert len(result.entries) >= 2
|
|
localhost_entry = next((e for e in result.entries if "localhost" in e.hostnames), None)
|
|
assert localhost_entry is not None
|
|
assert localhost_entry.ip_address == "127.0.0.1"
|
|
assert localhost_entry.is_active is True
|
|
|
|
def test_import_json_format(self, service, temp_dir):
|
|
"""Test importing from JSON format."""
|
|
# Create test JSON file
|
|
json_data = {
|
|
"metadata": {
|
|
"exported_at": "2024-01-15T12:00:00",
|
|
"total_entries": 3,
|
|
"version": "1.0"
|
|
},
|
|
"entries": [
|
|
{
|
|
"ip_address": "127.0.0.1",
|
|
"hostnames": ["localhost"],
|
|
"comment": "Local host",
|
|
"is_active": True
|
|
},
|
|
{
|
|
"ip_address": "",
|
|
"hostnames": ["dns-only.com"],
|
|
"comment": "DNS only",
|
|
"is_active": False,
|
|
"dns_name": "dns-only.com"
|
|
},
|
|
{
|
|
"ip_address": "10.0.0.1",
|
|
"hostnames": ["test.com"],
|
|
"comment": "Test",
|
|
"is_active": True,
|
|
"resolved_ip": "10.0.0.1",
|
|
"last_resolved": "2024-01-15T12:00:00",
|
|
"dns_resolution_status": "IP_MATCH"
|
|
}
|
|
]
|
|
}
|
|
|
|
json_path = temp_dir / "test_import.json"
|
|
with open(json_path, 'w') as f:
|
|
json.dump(json_data, f)
|
|
|
|
result = service.import_json_format(json_path)
|
|
|
|
assert result.success is True
|
|
assert result.total_processed == 3
|
|
assert result.successfully_imported == 3
|
|
assert len(result.errors) == 0
|
|
assert len(result.entries) == 3
|
|
|
|
# Check DNS entry
|
|
dns_entry = next((e for e in result.entries if e.dns_name), None)
|
|
assert dns_entry is not None
|
|
assert dns_entry.dns_name == "dns-only.com"
|
|
assert dns_entry.ip_address == ""
|
|
|
|
# Check resolved entry
|
|
resolved_entry = next((e for e in result.entries if e.resolved_ip), None)
|
|
assert resolved_entry is not None
|
|
assert resolved_entry.resolved_ip == "10.0.0.1"
|
|
assert resolved_entry.dns_resolution_status == "IP_MATCH"
|
|
|
|
def test_import_csv_format(self, service, temp_dir):
|
|
"""Test importing from CSV format."""
|
|
# Create test CSV file
|
|
csv_path = temp_dir / "test_import.csv"
|
|
with open(csv_path, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
'ip_address', 'hostnames', 'comment', 'is_active',
|
|
'dns_name', 'resolved_ip', 'last_resolved', 'dns_resolution_status'
|
|
])
|
|
writer.writerow([
|
|
'127.0.0.1', 'localhost', 'Local host', 'true',
|
|
'', '', '', ''
|
|
])
|
|
writer.writerow([
|
|
'', 'dns-only.com', 'DNS only', 'false',
|
|
'dns-only.com', '', '', ''
|
|
])
|
|
writer.writerow([
|
|
'10.0.0.1', 'test.com example.com', 'Test server', 'true',
|
|
'', '10.0.0.1', '2024-01-15T12:00:00', 'IP_MATCH'
|
|
])
|
|
|
|
result = service.import_csv_format(csv_path)
|
|
|
|
assert result.success is True
|
|
assert result.total_processed == 3
|
|
assert result.successfully_imported == 3
|
|
assert len(result.errors) == 0
|
|
assert len(result.entries) == 3
|
|
|
|
# Check multiple hostnames entry
|
|
multi_hostname_entry = next((e for e in result.entries if "test.com" in e.hostnames), None)
|
|
assert multi_hostname_entry is not None
|
|
assert "example.com" in multi_hostname_entry.hostnames
|
|
assert len(multi_hostname_entry.hostnames) == 2
|
|
|
|
def test_import_json_invalid_format(self, service, temp_dir):
|
|
"""Test importing invalid JSON format."""
|
|
# Create invalid JSON file
|
|
invalid_json = {"invalid": "format", "no_entries": True}
|
|
json_path = temp_dir / "invalid.json"
|
|
with open(json_path, 'w') as f:
|
|
json.dump(invalid_json, f)
|
|
|
|
result = service.import_json_format(json_path)
|
|
|
|
assert result.success is False
|
|
assert result.total_processed == 0
|
|
assert result.successfully_imported == 0
|
|
assert len(result.errors) > 0
|
|
assert "missing 'entries' field" in result.errors[0]
|
|
|
|
def test_import_json_malformed(self, service, temp_dir):
|
|
"""Test importing malformed JSON."""
|
|
json_path = temp_dir / "malformed.json"
|
|
json_path.write_text("{invalid json content")
|
|
|
|
result = service.import_json_format(json_path)
|
|
|
|
assert result.success is False
|
|
assert result.total_processed == 0
|
|
assert result.successfully_imported == 0
|
|
assert len(result.errors) > 0
|
|
assert "Invalid JSON file" in result.errors[0]
|
|
|
|
def test_import_csv_missing_required_columns(self, service, temp_dir):
|
|
"""Test importing CSV with missing required columns."""
|
|
csv_path = temp_dir / "missing_columns.csv"
|
|
with open(csv_path, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(['ip_address', 'comment']) # Missing 'hostnames'
|
|
writer.writerow(['127.0.0.1', 'test'])
|
|
|
|
result = service.import_csv_format(csv_path)
|
|
|
|
assert result.success is False
|
|
assert result.total_processed == 0
|
|
assert result.successfully_imported == 0
|
|
assert len(result.errors) > 0
|
|
assert "Missing required columns" in result.errors[0]
|
|
|
|
def test_import_json_with_warnings(self, service, temp_dir):
|
|
"""Test importing JSON with warnings (invalid dates)."""
|
|
json_data = {
|
|
"entries": [
|
|
{
|
|
"ip_address": "127.0.0.1",
|
|
"hostnames": ["localhost"],
|
|
"comment": "Test",
|
|
"is_active": True,
|
|
"last_resolved": "invalid-date-format"
|
|
}
|
|
]
|
|
}
|
|
|
|
json_path = temp_dir / "warnings.json"
|
|
with open(json_path, 'w') as f:
|
|
json.dump(json_data, f)
|
|
|
|
result = service.import_json_format(json_path)
|
|
|
|
assert result.success is True
|
|
assert result.total_processed == 1
|
|
assert result.successfully_imported == 1
|
|
assert len(result.warnings) > 0
|
|
assert "Invalid last_resolved date format" in result.warnings[0]
|
|
|
|
def test_import_nonexistent_file(self, service):
|
|
"""Test importing non-existent file."""
|
|
nonexistent_path = Path("/nonexistent/file.json")
|
|
|
|
result = service.import_json_format(nonexistent_path)
|
|
|
|
assert result.success is False
|
|
assert result.total_processed == 0
|
|
assert result.successfully_imported == 0
|
|
assert len(result.errors) > 0
|
|
|
|
# Utility Tests
|
|
|
|
def test_detect_file_format_by_extension(self, service, temp_dir):
|
|
"""Test file format detection by extension."""
|
|
json_file = temp_dir / "test.json"
|
|
csv_file = temp_dir / "test.csv"
|
|
hosts_file = temp_dir / "hosts"
|
|
txt_file = temp_dir / "test.txt"
|
|
|
|
# Create empty files
|
|
for f in [json_file, csv_file, hosts_file, txt_file]:
|
|
f.touch()
|
|
|
|
assert service.detect_file_format(json_file) == ImportFormat.JSON
|
|
assert service.detect_file_format(csv_file) == ImportFormat.CSV
|
|
assert service.detect_file_format(hosts_file) == ImportFormat.HOSTS
|
|
assert service.detect_file_format(txt_file) == ImportFormat.HOSTS
|
|
|
|
def test_detect_file_format_by_content(self, service, temp_dir):
|
|
"""Test file format detection by content."""
|
|
# JSON content
|
|
json_file = temp_dir / "no_extension"
|
|
json_file.write_text('{"entries": []}')
|
|
assert service.detect_file_format(json_file) == ImportFormat.JSON
|
|
|
|
# CSV content
|
|
csv_file = temp_dir / "csv_no_ext"
|
|
csv_file.write_text('ip_address,hostnames,comment')
|
|
assert service.detect_file_format(csv_file) == ImportFormat.CSV
|
|
|
|
# Hosts content
|
|
hosts_file = temp_dir / "hosts_no_ext"
|
|
hosts_file.write_text('127.0.0.1 localhost')
|
|
assert service.detect_file_format(hosts_file) == ImportFormat.HOSTS
|
|
|
|
def test_detect_file_format_nonexistent(self, service):
|
|
"""Test file format detection for non-existent file."""
|
|
result = service.detect_file_format(Path("/nonexistent/file.txt"))
|
|
assert result is None
|
|
|
|
def test_validate_export_path(self, service, temp_dir):
|
|
"""Test export path validation."""
|
|
# Valid path
|
|
valid_path = temp_dir / "export.json"
|
|
warnings = service.validate_export_path(valid_path, ExportFormat.JSON)
|
|
assert len(warnings) == 0
|
|
|
|
# Existing file
|
|
existing_file = temp_dir / "existing.json"
|
|
existing_file.touch()
|
|
warnings = service.validate_export_path(existing_file, ExportFormat.JSON)
|
|
assert any("already exists" in w for w in warnings)
|
|
|
|
# Wrong extension
|
|
wrong_ext = temp_dir / "file.txt"
|
|
warnings = service.validate_export_path(wrong_ext, ExportFormat.JSON)
|
|
assert any("doesn't match format" in w for w in warnings)
|
|
|
|
def test_validate_export_path_invalid_directory(self, service):
|
|
"""Test export path validation with invalid directory."""
|
|
invalid_path = Path("/invalid/nonexistent/directory/file.json")
|
|
warnings = service.validate_export_path(invalid_path, ExportFormat.JSON)
|
|
assert any("does not exist" in w for w in warnings)
|
|
|
|
# Integration Tests
|
|
|
|
def test_export_import_roundtrip_json(self, service, sample_hosts_file, temp_dir):
|
|
"""Test export-import roundtrip for JSON format."""
|
|
export_path = temp_dir / "roundtrip.json"
|
|
|
|
# Export
|
|
export_result = service.export_json_format(sample_hosts_file, export_path)
|
|
assert export_result.success is True
|
|
|
|
# Import
|
|
import_result = service.import_json_format(export_path)
|
|
assert import_result.success is True
|
|
assert import_result.successfully_imported == len(sample_hosts_file.entries)
|
|
|
|
# Verify data integrity
|
|
original_entries = sample_hosts_file.entries
|
|
imported_entries = import_result.entries
|
|
|
|
assert len(imported_entries) == len(original_entries)
|
|
|
|
# Check specific entries
|
|
for orig, imported in zip(original_entries, imported_entries):
|
|
assert orig.ip_address == imported.ip_address
|
|
assert orig.hostnames == imported.hostnames
|
|
assert orig.comment == imported.comment
|
|
assert orig.is_active == imported.is_active
|
|
assert orig.dns_name == imported.dns_name
|
|
|
|
def test_export_import_roundtrip_csv(self, service, sample_hosts_file, temp_dir):
|
|
"""Test export-import roundtrip for CSV format."""
|
|
export_path = temp_dir / "roundtrip.csv"
|
|
|
|
# Export
|
|
export_result = service.export_csv_format(sample_hosts_file, export_path)
|
|
assert export_result.success is True
|
|
|
|
# Import
|
|
import_result = service.import_csv_format(export_path)
|
|
assert import_result.success is True
|
|
assert import_result.successfully_imported == len(sample_hosts_file.entries)
|
|
|
|
def test_import_result_properties(self):
|
|
"""Test ImportResult properties."""
|
|
# Result with errors
|
|
result_with_errors = ImportResult(
|
|
success=False,
|
|
entries=[],
|
|
errors=["Error 1", "Error 2"],
|
|
warnings=[],
|
|
total_processed=5,
|
|
successfully_imported=0
|
|
)
|
|
assert result_with_errors.has_errors is True
|
|
assert result_with_errors.has_warnings is False
|
|
|
|
# Result with warnings
|
|
result_with_warnings = ImportResult(
|
|
success=True,
|
|
entries=[],
|
|
errors=[],
|
|
warnings=["Warning 1"],
|
|
total_processed=5,
|
|
successfully_imported=5
|
|
)
|
|
assert result_with_warnings.has_errors is False
|
|
assert result_with_warnings.has_warnings is True
|
|
|
|
def test_empty_hosts_file_export(self, service, temp_dir):
|
|
"""Test exporting empty hosts file."""
|
|
empty_hosts_file = HostsFile()
|
|
export_path = temp_dir / "empty.json"
|
|
|
|
result = service.export_json_format(empty_hosts_file, export_path)
|
|
|
|
assert result.success is True
|
|
assert result.entries_exported == 0
|
|
assert export_path.exists()
|
|
|
|
# Verify empty file structure
|
|
with open(export_path, 'r') as f:
|
|
data = json.load(f)
|
|
assert data["metadata"]["total_entries"] == 0
|
|
assert len(data["entries"]) == 0
|
|
|
|
def test_large_hostnames_list_csv(self, service, temp_dir):
|
|
"""Test CSV export/import with large hostnames list."""
|
|
entry = HostEntry(
|
|
"192.168.1.1",
|
|
["host1.com", "host2.com", "host3.com", "host4.com", "host5.com"],
|
|
"Multiple hostnames",
|
|
True
|
|
)
|
|
hosts_file = HostsFile()
|
|
hosts_file.entries = [entry]
|
|
|
|
export_path = temp_dir / "multi_hostnames.csv"
|
|
|
|
# Export
|
|
export_result = service.export_csv_format(hosts_file, export_path)
|
|
assert export_result.success is True
|
|
|
|
# Import
|
|
import_result = service.import_csv_format(export_path)
|
|
assert import_result.success is True
|
|
|
|
imported_entry = import_result.entries[0]
|
|
assert len(imported_entry.hostnames) == 5
|
|
assert "host1.com" in imported_entry.hostnames
|
|
assert "host5.com" in imported_entry.hostnames
|