From 0d31449df8452d71c7f0b3050b26b0cefdf34d43 Mon Sep 17 00:00:00 2001 From: Daniel Sheppard Date: Thu, 10 Jul 2025 12:59:59 -0500 Subject: [PATCH] Optimize prefix assignment. Fix tests --- netbox/ipam/models/ip.py | 13 +++++ netbox/ipam/signals.py | 84 ++++++++++---------------------- netbox/ipam/tests/test_models.py | 15 ++++-- 3 files changed, 50 insertions(+), 62 deletions(-) diff --git a/netbox/ipam/models/ip.py b/netbox/ipam/models/ip.py index 4073bcf21..b2ef42941 100644 --- a/netbox/ipam/models/ip.py +++ b/netbox/ipam/models/ip.py @@ -786,6 +786,14 @@ class IPRange(ContactsMixin, PrimaryModel): return min(float(child_count) / self.size * 100, 100) + @classmethod + def find_prefix(self, address): + prefixes = Prefix.objects.filter( + models.Q(prefix__net_contains=address.start_address) & Q(prefix__net_contains=address.end_address), + vrf=address.vrf, + ) + return prefixes.last() + class IPAddress(ContactsMixin, PrimaryModel): """ @@ -1085,3 +1093,8 @@ class IPAddress(ContactsMixin, PrimaryModel): def get_role_color(self): return IPAddressRoleChoices.colors.get(self.role) + + @classmethod + def find_prefix(self, address): + prefixes = Prefix.objects.filter(prefix__net_contains=address.address, vrf=address.vrf) + return prefixes.last() diff --git a/netbox/ipam/signals.py b/netbox/ipam/signals.py index d7085f8b5..7e210a756 100644 --- a/netbox/ipam/signals.py +++ b/netbox/ipam/signals.py @@ -29,12 +29,11 @@ def update_children_depth(prefix): Prefix.objects.bulk_update(children, ['_depth'], batch_size=100) -def update_ipaddress_prefix(prefix, delete=False): +def update_object_prefix(prefix, delete=False, parent_model=Prefix, child_model=IPAddress): if delete: # Get all possible addresses - addresses = IPAddress.objects.filter(prefix=prefix) - # Find a new containing prefix - prefix = Prefix.objects.filter( + addresses = child_model.objects.filter(prefix=prefix) + prefix = parent_model.objects.filter( prefix__net_contains_or_equals=prefix.prefix, vrf=prefix.vrf ).exclude(pk=prefix.pk).last() @@ -43,71 +42,38 @@ def update_ipaddress_prefix(prefix, delete=False): # Set contained addresses to the containing prefix if it exists address.prefix = prefix else: - # Get all possible modified addresses - addresses = IPAddress.objects.filter( - Q(address__net_contained_or_equal=prefix.prefix, vrf=prefix.vrf) | - Q(prefix=prefix) - ) + filter = Q(prefix=prefix) + if child_model == IPAddress: + filter |= Q(address__net_contained_or_equal=prefix.prefix, vrf=prefix.vrf) + elif child_model == IPRange: + filter |= Q( + start_address__net_contained_or_equal=prefix.prefix, + end_address__net_contained_or_equal=prefix.prefix, + vrf=prefix.vrf + ) + + addresses = child_model.objects.filter(filter) for address in addresses: - if not address.prefix or (prefix.prefix in address.prefix.prefix and address.address in prefix.prefix): - # Set to new Prefix as the prefix is a child of the old prefix and the address is contained in the - # prefix + # If addresses prefix is not set then this model is the only option + if not address.prefix: address.prefix = prefix - elif address.prefix and address.address not in prefix.prefix: - # Find a new prefix as the prefix no longer contains the address - address.prefix = Prefix.objects.filter( - prefix__net_contains_or_equals=address.address, - vrf=prefix.vrf - ).last() + # This address has a different VRF so the prefix cannot be the parent prefix + elif address.prefix != address.find_prefix(address): + address.prefix = address.find_prefix(address) else: - # No-OP as the prefix does not require modification pass # Update the addresses - IPAddress.objects.bulk_update(addresses, ['prefix'], batch_size=100) + child_model.objects.bulk_update(addresses, ['prefix'], batch_size=100) + + +def update_ipaddress_prefix(prefix, delete=False): + update_object_prefix(prefix, delete, child_model=IPAddress) def update_iprange_prefix(prefix, delete=False): - if delete: - # Get all possible addresses - addresses = IPRange.objects.filter(prefix=prefix) - # Find a new containing prefix - prefix = Prefix.objects.filter( - prefix__net_contains_or_equals=prefix.prefix, - vrf=prefix.vrf - ).exclude(pk=prefix.pk).last() - - for address in addresses: - # Set contained addresses to the containing prefix if it exists - address.prefix = prefix - else: - # Get all possible modified addresses - addresses = IPRange.objects.filter( - Q(start_address__net_contained_or_equal=prefix.prefix, vrf=prefix.vrf) | - Q(prefix=prefix) - ) - - for address in addresses: - if not address.prefix or ( - prefix.prefix in address.prefix.prefix and address.start_address in prefix.prefix and - address.end_address in prefix.prefix - ): - # Set to new Prefix as the prefix is a child of the old prefix and the address is contained in the - # prefix - address.prefix = prefix - elif address.prefix and address.address not in prefix.prefix: - # Find a new prefix as the prefix no longer contains the address - address.prefix = Prefix.objects.filter(Q(prefix__net_contains_or_equals=address.start_address) & - Q(prefix__net_contains_or_equals=address.end_address), - vrf=prefix.vrf - ).last() - else: - # No-OP as the prefix does not require modification - pass - - # Update the addresses - IPAddress.objects.bulk_update(addresses, ['prefix'], batch_size=100) + update_object_prefix(prefix, delete, child_model=IPRange) def update_prefix_parents(prefix, delete=False): diff --git a/netbox/ipam/tests/test_models.py b/netbox/ipam/tests/test_models.py index 764573b48..f348a238b 100644 --- a/netbox/ipam/tests/test_models.py +++ b/netbox/ipam/tests/test_models.py @@ -156,7 +156,7 @@ class TestIPRange(TestCase): range.clean() range.save() - prefix = Prefix(prefix='192.0.1.0/17') + prefix = Prefix(prefix='192.0.0.0/17') prefix.clean() prefix.save() @@ -264,6 +264,8 @@ class TestPrefix(TestCase): parent_prefix.vrf = vrfs[0] parent_prefix.save() + + parent_prefix.refresh_from_db() child_ip_pks = {p.pk for p in parent_prefix.ip_addresses.all()} # VRF container is limited to its own VRF @@ -741,13 +743,20 @@ class TestIPAddress(TestCase): self.assertRaises(ValidationError, duplicate_ip.clean) def test_duplicate_vrf(self): - vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False) + vrf = VRF.objects.get(rd='1:1') + vrf.enforce_unique = False + vrf.clean() + vrf.save() + IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24')) duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24')) self.assertIsNone(duplicate_ip.clean()) def test_duplicate_vrf_unique(self): - vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True) + vrf = VRF.objects.get(rd='1:1') + vrf.enforce_unique = True + vrf.clean() + vrf.save() IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24')) duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24')) self.assertRaises(ValidationError, duplicate_ip.clean)