- Created `test_filters.py` to test the EntryFilter and FilterOptions classes, covering default values, custom values, filtering by status, DNS type, resolution status, and search functionality. - Implemented tests for combined filters and edge cases in filtering. - Added `test_import_export.py` to test the ImportExportService class, including exporting to hosts, JSON, and CSV formats, as well as importing from these formats. - Included tests for handling invalid formats, missing required columns, and warnings during import. - Updated `uv.lock` to include `pytest-asyncio` as a dependency for asynchronous testing.
427 lines
17 KiB
Python
427 lines
17 KiB
Python
"""
|
|
Tests for the filtering system.
|
|
|
|
This module contains comprehensive tests for the EntryFilter class
|
|
and filtering functionality.
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from src.hosts.core.filters import EntryFilter, FilterOptions
|
|
from src.hosts.core.models import HostEntry
|
|
|
|
class TestFilterOptions:
|
|
"""Test FilterOptions dataclass."""
|
|
|
|
def test_default_values(self):
|
|
"""Test default FilterOptions values."""
|
|
options = FilterOptions()
|
|
assert options.show_active is True
|
|
assert options.show_inactive is True
|
|
assert options.active_only is False
|
|
assert options.inactive_only is False
|
|
assert options.show_dns_entries is True
|
|
assert options.show_ip_entries is True
|
|
assert options.dns_only is False
|
|
assert options.ip_only is False
|
|
assert options.show_resolved is True
|
|
assert options.show_unresolved is True
|
|
assert options.show_resolving is True
|
|
assert options.show_failed is True
|
|
assert options.show_mismatched is True
|
|
assert options.mismatch_only is False
|
|
assert options.resolved_only is False
|
|
assert options.search_term is None
|
|
assert options.preset_name is None
|
|
|
|
def test_custom_values(self):
|
|
"""Test FilterOptions with custom values."""
|
|
options = FilterOptions(
|
|
active_only=True,
|
|
dns_only=True,
|
|
search_term="test",
|
|
preset_name="Active DNS Only"
|
|
)
|
|
assert options.active_only is True
|
|
assert options.dns_only is True
|
|
assert options.search_term == "test"
|
|
assert options.preset_name == "Active DNS Only"
|
|
|
|
def test_to_dict(self):
|
|
"""Test converting FilterOptions to dictionary."""
|
|
options = FilterOptions(
|
|
active_only=True,
|
|
search_term="test",
|
|
preset_name="Test Preset"
|
|
)
|
|
result = options.to_dict()
|
|
|
|
expected = {
|
|
'show_active': True,
|
|
'show_inactive': True,
|
|
'active_only': True,
|
|
'inactive_only': False,
|
|
'show_dns_entries': True,
|
|
'show_ip_entries': True,
|
|
'dns_only': False,
|
|
'ip_only': False,
|
|
'show_resolved': True,
|
|
'show_unresolved': True,
|
|
'show_resolving': True,
|
|
'show_failed': True,
|
|
'show_mismatched': True,
|
|
'mismatch_only': False,
|
|
'resolved_only': False,
|
|
'search_term': 'test',
|
|
'search_in_hostnames': True,
|
|
'search_in_comments': True,
|
|
'search_in_ips': True,
|
|
'case_sensitive': False,
|
|
'preset_name': 'Test Preset'
|
|
}
|
|
|
|
assert result == expected
|
|
|
|
def test_from_dict(self):
|
|
"""Test creating FilterOptions from dictionary."""
|
|
data = {
|
|
'active_only': True,
|
|
'dns_only': True,
|
|
'search_term': 'test',
|
|
'preset_name': 'Test Preset'
|
|
}
|
|
|
|
options = FilterOptions.from_dict(data)
|
|
assert options.active_only is True
|
|
assert options.dns_only is True
|
|
assert options.search_term == 'test'
|
|
assert options.preset_name == 'Test Preset'
|
|
# Verify missing keys use defaults
|
|
assert options.inactive_only is False
|
|
|
|
def test_from_dict_partial(self):
|
|
"""Test creating FilterOptions from partial dictionary."""
|
|
data = {'active_only': True}
|
|
options = FilterOptions.from_dict(data)
|
|
|
|
assert options.active_only is True
|
|
assert options.inactive_only is False # Default value
|
|
assert options.search_term is None # Default value
|
|
|
|
def test_is_empty(self):
|
|
"""Test checking if filter options are empty."""
|
|
# Default options should be empty
|
|
options = FilterOptions()
|
|
assert options.is_empty() is True
|
|
|
|
# Options with search term should not be empty
|
|
options = FilterOptions(search_term="test")
|
|
assert options.is_empty() is False
|
|
|
|
# Options with any filter enabled should not be empty
|
|
options = FilterOptions(active_only=True)
|
|
assert options.is_empty() is False
|
|
|
|
class TestEntryFilter:
|
|
"""Test EntryFilter class."""
|
|
|
|
@pytest.fixture
|
|
def sample_entries(self):
|
|
"""Create sample entries for testing."""
|
|
entries = []
|
|
|
|
# Active IP entry
|
|
entry1 = HostEntry("192.168.1.1", ["example.com"], "Test entry", True)
|
|
entries.append(entry1)
|
|
|
|
# Inactive IP entry
|
|
entry2 = HostEntry("192.168.1.2", ["inactive.com"], "Inactive entry", False)
|
|
entries.append(entry2)
|
|
|
|
# Active DNS entry - create with temporary IP then convert to DNS entry
|
|
entry3 = HostEntry("1.1.1.1", ["dns-only.com"], "DNS only entry", True)
|
|
entry3.ip_address = "" # Remove IP after creation
|
|
entry3.dns_name = "dns-only.com" # Set DNS name
|
|
entries.append(entry3)
|
|
|
|
# Inactive DNS entry - create with temporary IP then convert to DNS entry
|
|
entry4 = HostEntry("1.1.1.1", ["inactive-dns.com"], "Inactive DNS entry", False)
|
|
entry4.ip_address = "" # Remove IP after creation
|
|
entry4.dns_name = "inactive-dns.com" # Set DNS name
|
|
entries.append(entry4)
|
|
|
|
# Entry with DNS resolution data
|
|
entry5 = HostEntry("10.0.0.1", ["resolved.com"], "Resolved entry", True)
|
|
entry5.resolved_ip = "10.0.0.1"
|
|
entry5.last_resolved = datetime.now()
|
|
entry5.dns_resolution_status = "IP_MATCH"
|
|
entries.append(entry5)
|
|
|
|
# Entry with mismatched DNS
|
|
entry6 = HostEntry("10.0.0.2", ["mismatch.com"], "Mismatch entry", True)
|
|
entry6.resolved_ip = "10.0.0.3" # Different from IP address
|
|
entry6.last_resolved = datetime.now()
|
|
entry6.dns_resolution_status = "IP_MISMATCH"
|
|
entries.append(entry6)
|
|
|
|
# Entry without DNS resolution
|
|
entry7 = HostEntry("10.0.0.4", ["unresolved.com"], "Unresolved entry", True)
|
|
entries.append(entry7)
|
|
|
|
return entries
|
|
|
|
@pytest.fixture
|
|
def entry_filter(self):
|
|
"""Create EntryFilter instance."""
|
|
return EntryFilter()
|
|
|
|
def test_apply_filters_no_filters(self, entry_filter, sample_entries):
|
|
"""Test applying empty filters returns all entries."""
|
|
options = FilterOptions()
|
|
result = entry_filter.apply_filters(sample_entries, options)
|
|
assert len(result) == len(sample_entries)
|
|
assert result == sample_entries
|
|
|
|
def test_filter_by_status_active_only(self, entry_filter, sample_entries):
|
|
"""Test filtering by active status only."""
|
|
options = FilterOptions(active_only=True)
|
|
result = entry_filter.filter_by_status(sample_entries, options)
|
|
|
|
active_entries = [e for e in result if e.is_active]
|
|
assert len(active_entries) == len(result)
|
|
assert all(entry.is_active for entry in result)
|
|
|
|
def test_filter_by_status_inactive_only(self, entry_filter, sample_entries):
|
|
"""Test filtering by inactive status only."""
|
|
options = FilterOptions(inactive_only=True)
|
|
result = entry_filter.filter_by_status(sample_entries, options)
|
|
|
|
assert all(not entry.is_active for entry in result)
|
|
assert len(result) == 2 # entry2 and entry4
|
|
|
|
def test_filter_by_dns_type_dns_only(self, entry_filter, sample_entries):
|
|
"""Test filtering by DNS entries only."""
|
|
options = FilterOptions(dns_only=True)
|
|
result = entry_filter.filter_by_dns_type(sample_entries, options)
|
|
|
|
assert all(entry.dns_name is not None for entry in result)
|
|
assert len(result) == 2 # entry3 and entry4
|
|
|
|
def test_filter_by_dns_type_ip_only(self, entry_filter, sample_entries):
|
|
"""Test filtering by IP entries only."""
|
|
options = FilterOptions(ip_only=True)
|
|
result = entry_filter.filter_by_dns_type(sample_entries, options)
|
|
|
|
assert all(not entry.has_dns_name() for entry in result)
|
|
# Should exclude DNS-only entries (entry3, entry4)
|
|
expected_count = len(sample_entries) - 2
|
|
assert len(result) == expected_count
|
|
|
|
def test_filter_by_resolution_status_resolved(self, entry_filter, sample_entries):
|
|
"""Test filtering by resolved entries only."""
|
|
options = FilterOptions(resolved_only=True)
|
|
result = entry_filter.filter_by_resolution_status(sample_entries, options)
|
|
|
|
assert all(entry.dns_resolution_status in ["IP_MATCH", "RESOLVED"] for entry in result)
|
|
assert len(result) == 1 # Only entry5 has resolved status
|
|
|
|
def test_filter_by_resolution_status_unresolved(self, entry_filter, sample_entries):
|
|
"""Test filtering by unresolved entries only."""
|
|
options = FilterOptions(
|
|
show_resolved=False,
|
|
show_resolving=False,
|
|
show_failed=False,
|
|
show_mismatched=False
|
|
)
|
|
result = entry_filter.filter_by_resolution_status(sample_entries, options)
|
|
|
|
assert all(entry.dns_resolution_status in [None, "NOT_RESOLVED"] for entry in result)
|
|
assert len(result) == 5 # All except entry5 and entry6
|
|
|
|
def test_filter_by_resolution_status_mismatch(self, entry_filter, sample_entries):
|
|
"""Test filtering by DNS mismatch entries only."""
|
|
options = FilterOptions(mismatch_only=True)
|
|
result = entry_filter.filter_by_resolution_status(sample_entries, options)
|
|
|
|
# Should only return entry6 (mismatch between IP and resolved_ip)
|
|
assert len(result) == 1
|
|
assert result[0].hostnames[0] == "mismatch.com"
|
|
|
|
def test_filter_by_search_hostname(self, entry_filter, sample_entries):
|
|
"""Test filtering by search term in hostname."""
|
|
options = FilterOptions(search_term="example")
|
|
result = entry_filter.filter_by_search(sample_entries, options)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].hostnames[0] == "example.com"
|
|
|
|
def test_filter_by_search_ip(self, entry_filter, sample_entries):
|
|
"""Test filtering by search term in IP address."""
|
|
options = FilterOptions(search_term="192.168")
|
|
result = entry_filter.filter_by_search(sample_entries, options)
|
|
|
|
assert len(result) == 2 # entry1 and entry2
|
|
|
|
def test_filter_by_search_comment(self, entry_filter, sample_entries):
|
|
"""Test filtering by search term in comment."""
|
|
options = FilterOptions(search_term="DNS only")
|
|
result = entry_filter.filter_by_search(sample_entries, options)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].comment == "DNS only entry"
|
|
|
|
def test_filter_by_search_case_insensitive(self, entry_filter, sample_entries):
|
|
"""Test search is case insensitive."""
|
|
options = FilterOptions(search_term="EXAMPLE")
|
|
result = entry_filter.filter_by_search(sample_entries, options)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].hostnames[0] == "example.com"
|
|
|
|
def test_combined_filters(self, entry_filter, sample_entries):
|
|
"""Test applying multiple filters together."""
|
|
# Filter for active DNS entries containing "dns"
|
|
options = FilterOptions(
|
|
active_only=True,
|
|
dns_only=True,
|
|
search_term="dns"
|
|
)
|
|
result = entry_filter.apply_filters(sample_entries, options)
|
|
|
|
# Should only return entry3 (active DNS entry with "dns" in hostname)
|
|
assert len(result) == 1
|
|
assert result[0].hostnames[0] == "dns-only.com"
|
|
assert result[0].is_active
|
|
assert result[0].dns_name is not None
|
|
|
|
def test_count_filtered_entries(self, entry_filter, sample_entries):
|
|
"""Test counting filtered entries."""
|
|
options = FilterOptions(active_only=True)
|
|
counts = entry_filter.count_filtered_entries(sample_entries, options)
|
|
|
|
assert counts['total'] == len(sample_entries)
|
|
assert counts['filtered'] == 5 # 5 active entries
|
|
|
|
def test_get_default_presets(self, entry_filter):
|
|
"""Test getting default filter presets."""
|
|
presets = entry_filter.get_default_presets()
|
|
|
|
# Check that default presets exist
|
|
assert "All Entries" in presets
|
|
assert "Active Only" in presets
|
|
assert "Inactive Only" in presets
|
|
assert "DNS Entries Only" in presets
|
|
assert "IP Entries Only" in presets
|
|
assert "DNS Mismatches" in presets
|
|
assert "Resolved Entries" in presets
|
|
assert "Unresolved Entries" in presets
|
|
|
|
# Check that presets have correct structure
|
|
for preset_name, options in presets.items():
|
|
assert isinstance(options, FilterOptions)
|
|
|
|
def test_save_and_load_preset(self, entry_filter):
|
|
"""Test saving and loading custom presets."""
|
|
# Create custom filter options
|
|
custom_options = FilterOptions(
|
|
active_only=True,
|
|
search_term="test",
|
|
preset_name="My Custom Filter"
|
|
)
|
|
|
|
# Save preset
|
|
entry_filter.save_preset("My Custom Filter", custom_options)
|
|
|
|
# Check it was saved
|
|
presets = entry_filter.get_saved_presets()
|
|
assert "My Custom Filter" in presets
|
|
|
|
# Load and verify
|
|
loaded_options = presets["My Custom Filter"]
|
|
assert loaded_options.active_only is True
|
|
# Note: search_term is not saved in presets
|
|
assert loaded_options.search_term is None
|
|
|
|
def test_delete_preset(self, entry_filter):
|
|
"""Test deleting custom presets."""
|
|
# Save a preset first
|
|
custom_options = FilterOptions(active_only=True)
|
|
entry_filter.save_preset("To Delete", custom_options)
|
|
|
|
# Verify it exists
|
|
presets = entry_filter.get_saved_presets()
|
|
assert "To Delete" in presets
|
|
|
|
# Delete it
|
|
result = entry_filter.delete_preset("To Delete")
|
|
assert result is True
|
|
|
|
# Verify it's gone
|
|
presets = entry_filter.get_saved_presets()
|
|
assert "To Delete" not in presets
|
|
|
|
# Try to delete non-existent preset
|
|
result = entry_filter.delete_preset("Non Existent")
|
|
assert result is False
|
|
|
|
def test_filter_edge_cases(self, entry_filter):
|
|
"""Test filtering with edge cases."""
|
|
# Empty entry list
|
|
empty_options = FilterOptions()
|
|
result = entry_filter.apply_filters([], empty_options)
|
|
assert result == []
|
|
|
|
# None entries in list - filtering should handle None values gracefully
|
|
entries_with_none = [None, HostEntry("192.168.1.1", ["test.com"], "", True)]
|
|
# Filter out None values before applying filters
|
|
valid_entries = [e for e in entries_with_none if e is not None]
|
|
result = entry_filter.apply_filters(valid_entries, empty_options)
|
|
assert len(result) == 1 # Only the valid entry
|
|
assert result[0].ip_address == "192.168.1.1"
|
|
|
|
def test_search_multiple_hostnames(self, entry_filter):
|
|
"""Test search across multiple hostnames in single entry."""
|
|
# Create entry with multiple hostnames
|
|
entry = HostEntry("192.168.1.1", ["primary.com", "secondary.com", "alias.org"], "Multi-hostname entry", True)
|
|
entries = [entry]
|
|
|
|
# Search for each hostname
|
|
for hostname in ["primary", "secondary", "alias"]:
|
|
options = FilterOptions(search_term=hostname)
|
|
result = entry_filter.filter_by_search(entries, options)
|
|
assert len(result) == 1
|
|
assert result[0] == entry
|
|
|
|
def test_dns_resolution_age_filtering(self, entry_filter, sample_entries):
|
|
"""Test filtering based on DNS resolution age."""
|
|
# Modify sample entries to have different resolution times
|
|
old_time = datetime.now() - timedelta(days=1)
|
|
recent_time = datetime.now() - timedelta(minutes=5)
|
|
|
|
# Make one entry have old resolution
|
|
for entry in sample_entries:
|
|
if entry.resolved_ip:
|
|
if entry.hostnames[0] == "resolved.com":
|
|
entry.last_resolved = recent_time
|
|
else:
|
|
entry.last_resolved = old_time
|
|
|
|
# Test that entries are still found regardless of age
|
|
# (Age filtering might be added in future versions)
|
|
options = FilterOptions(resolved_only=True)
|
|
result = entry_filter.filter_by_resolution_status(sample_entries, options)
|
|
assert len(result) == 1 # Only entry5 has resolved status
|
|
|
|
def test_preset_name_preservation(self, entry_filter):
|
|
"""Test that preset names are preserved in FilterOptions."""
|
|
preset_options = FilterOptions(
|
|
active_only=True,
|
|
preset_name="Active Only"
|
|
)
|
|
|
|
# Apply filters and check preset name is preserved
|
|
sample_entry = HostEntry("192.168.1.1", ["test.com"], "Test", True)
|
|
result = entry_filter.apply_filters([sample_entry], preset_options)
|
|
|
|
# The original preset name should be accessible
|
|
assert preset_options.preset_name == "Active Only"
|