Optimize prefix assignment. Fix tests
Some checks failed
CI / build (20.x, 3.10) (push) Has been cancelled
CI / build (20.x, 3.11) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled

This commit is contained in:
Daniel Sheppard 2025-07-10 12:59:59 -05:00
parent 76e85683ac
commit 0d31449df8
3 changed files with 50 additions and 62 deletions

View File

@ -786,6 +786,14 @@ class IPRange(ContactsMixin, PrimaryModel):
return min(float(child_count) / self.size * 100, 100)
@classmethod
def find_prefix(self, address):
prefixes = Prefix.objects.filter(
models.Q(prefix__net_contains=address.start_address) & Q(prefix__net_contains=address.end_address),
vrf=address.vrf,
)
return prefixes.last()
class IPAddress(ContactsMixin, PrimaryModel):
"""
@ -1085,3 +1093,8 @@ class IPAddress(ContactsMixin, PrimaryModel):
def get_role_color(self):
return IPAddressRoleChoices.colors.get(self.role)
@classmethod
def find_prefix(self, address):
prefixes = Prefix.objects.filter(prefix__net_contains=address.address, vrf=address.vrf)
return prefixes.last()

View File

@ -29,12 +29,11 @@ def update_children_depth(prefix):
Prefix.objects.bulk_update(children, ['_depth'], batch_size=100)
def update_ipaddress_prefix(prefix, delete=False):
def update_object_prefix(prefix, delete=False, parent_model=Prefix, child_model=IPAddress):
if delete:
# Get all possible addresses
addresses = IPAddress.objects.filter(prefix=prefix)
# Find a new containing prefix
prefix = Prefix.objects.filter(
addresses = child_model.objects.filter(prefix=prefix)
prefix = parent_model.objects.filter(
prefix__net_contains_or_equals=prefix.prefix,
vrf=prefix.vrf
).exclude(pk=prefix.pk).last()
@ -43,71 +42,38 @@ def update_ipaddress_prefix(prefix, delete=False):
# Set contained addresses to the containing prefix if it exists
address.prefix = prefix
else:
# Get all possible modified addresses
addresses = IPAddress.objects.filter(
Q(address__net_contained_or_equal=prefix.prefix, vrf=prefix.vrf) |
Q(prefix=prefix)
filter = Q(prefix=prefix)
if child_model == IPAddress:
filter |= Q(address__net_contained_or_equal=prefix.prefix, vrf=prefix.vrf)
elif child_model == IPRange:
filter |= Q(
start_address__net_contained_or_equal=prefix.prefix,
end_address__net_contained_or_equal=prefix.prefix,
vrf=prefix.vrf
)
addresses = child_model.objects.filter(filter)
for address in addresses:
if not address.prefix or (prefix.prefix in address.prefix.prefix and address.address in prefix.prefix):
# Set to new Prefix as the prefix is a child of the old prefix and the address is contained in the
# prefix
# If addresses prefix is not set then this model is the only option
if not address.prefix:
address.prefix = prefix
elif address.prefix and address.address not in prefix.prefix:
# Find a new prefix as the prefix no longer contains the address
address.prefix = Prefix.objects.filter(
prefix__net_contains_or_equals=address.address,
vrf=prefix.vrf
).last()
# This address has a different VRF so the prefix cannot be the parent prefix
elif address.prefix != address.find_prefix(address):
address.prefix = address.find_prefix(address)
else:
# No-OP as the prefix does not require modification
pass
# Update the addresses
IPAddress.objects.bulk_update(addresses, ['prefix'], batch_size=100)
child_model.objects.bulk_update(addresses, ['prefix'], batch_size=100)
def update_ipaddress_prefix(prefix, delete=False):
update_object_prefix(prefix, delete, child_model=IPAddress)
def update_iprange_prefix(prefix, delete=False):
if delete:
# Get all possible addresses
addresses = IPRange.objects.filter(prefix=prefix)
# Find a new containing prefix
prefix = Prefix.objects.filter(
prefix__net_contains_or_equals=prefix.prefix,
vrf=prefix.vrf
).exclude(pk=prefix.pk).last()
for address in addresses:
# Set contained addresses to the containing prefix if it exists
address.prefix = prefix
else:
# Get all possible modified addresses
addresses = IPRange.objects.filter(
Q(start_address__net_contained_or_equal=prefix.prefix, vrf=prefix.vrf) |
Q(prefix=prefix)
)
for address in addresses:
if not address.prefix or (
prefix.prefix in address.prefix.prefix and address.start_address in prefix.prefix and
address.end_address in prefix.prefix
):
# Set to new Prefix as the prefix is a child of the old prefix and the address is contained in the
# prefix
address.prefix = prefix
elif address.prefix and address.address not in prefix.prefix:
# Find a new prefix as the prefix no longer contains the address
address.prefix = Prefix.objects.filter(Q(prefix__net_contains_or_equals=address.start_address) &
Q(prefix__net_contains_or_equals=address.end_address),
vrf=prefix.vrf
).last()
else:
# No-OP as the prefix does not require modification
pass
# Update the addresses
IPAddress.objects.bulk_update(addresses, ['prefix'], batch_size=100)
update_object_prefix(prefix, delete, child_model=IPRange)
def update_prefix_parents(prefix, delete=False):

View File

@ -156,7 +156,7 @@ class TestIPRange(TestCase):
range.clean()
range.save()
prefix = Prefix(prefix='192.0.1.0/17')
prefix = Prefix(prefix='192.0.0.0/17')
prefix.clean()
prefix.save()
@ -264,6 +264,8 @@ class TestPrefix(TestCase):
parent_prefix.vrf = vrfs[0]
parent_prefix.save()
parent_prefix.refresh_from_db()
child_ip_pks = {p.pk for p in parent_prefix.ip_addresses.all()}
# VRF container is limited to its own VRF
@ -741,13 +743,20 @@ class TestIPAddress(TestCase):
self.assertRaises(ValidationError, duplicate_ip.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
vrf = VRF.objects.get(rd='1:1')
vrf.enforce_unique = False
vrf.clean()
vrf.save()
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
vrf = VRF.objects.get(rd='1:1')
vrf.enforce_unique = True
vrf.clean()
vrf.save()
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)