diff --git a/netbox/ipam/filtersets.py b/netbox/ipam/filtersets.py
index d618c8eab..bca3f08c9 100644
--- a/netbox/ipam/filtersets.py
+++ b/netbox/ipam/filtersets.py
@@ -209,6 +209,12 @@ class PrefixFilterSet(PrimaryModelFilterSet, TenancyFilterSet):
method='search_contains',
label='Prefixes which contain this prefix or IP',
)
+ depth = django_filters.NumberFilter(
+ field_name='_depth'
+ )
+ children = django_filters.NumberFilter(
+ field_name='_children'
+ )
mask_length = django_filters.NumberFilter(
field_name='prefix',
lookup_expr='net_mask_length'
diff --git a/netbox/ipam/migrations/0047_prefix_depth_children.py b/netbox/ipam/migrations/0047_prefix_depth_children.py
new file mode 100644
index 000000000..4c49b1358
--- /dev/null
+++ b/netbox/ipam/migrations/0047_prefix_depth_children.py
@@ -0,0 +1,21 @@
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('ipam', '0046_set_vlangroup_scope_types'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='prefix',
+ name='_children',
+ field=models.PositiveBigIntegerField(default=0, editable=False),
+ ),
+ migrations.AddField(
+ model_name='prefix',
+ name='_depth',
+ field=models.PositiveSmallIntegerField(default=0, editable=False),
+ ),
+ ]
diff --git a/netbox/ipam/migrations/0048_prefix_populate_depth_children.py b/netbox/ipam/migrations/0048_prefix_populate_depth_children.py
new file mode 100644
index 000000000..b265e7f6f
--- /dev/null
+++ b/netbox/ipam/migrations/0048_prefix_populate_depth_children.py
@@ -0,0 +1,86 @@
+from django.db import migrations
+
+
+def push_to_stack(stack, prefix):
+ # Increment child count on parent nodes
+ for n in stack:
+ n['children'] += 1
+ stack.append({
+ 'pk': prefix['pk'],
+ 'prefix': prefix['prefix'],
+ 'children': 0,
+ })
+
+
+def populate_prefix_hierarchy(apps, schema_editor):
+ """
+ Populate _depth and _children attrs for all Prefixes.
+ """
+ Prefix = apps.get_model('ipam', 'Prefix')
+ VRF = apps.get_model('ipam', 'VRF')
+
+ total_count = Prefix.objects.count()
+ print(f'\nUpdating {total_count} prefixes...')
+
+ # Iterate through all VRFs and the global table
+ vrfs = [None] + list(VRF.objects.values_list('pk', flat=True))
+ for vrf in vrfs:
+
+ stack = []
+ update_queue = []
+
+ # Iterate through all Prefixes in the VRF, growing and shrinking the stack as we go
+ prefixes = Prefix.objects.filter(vrf=vrf).values('pk', 'prefix')
+ for i, p in enumerate(prefixes):
+
+ # Grow the stack if this is a child of the most recent prefix
+ if not stack or p['prefix'] in stack[-1]['prefix']:
+ push_to_stack(stack, p)
+
+ # If this is a sibling or parent of the most recent prefix, pop nodes from the
+ # stack until we reach a parent prefix (or the root)
+ else:
+ while stack and p['prefix'] not in stack[-1]['prefix'] and p['prefix'] != stack[-1]['prefix']:
+ node = stack.pop()
+ update_queue.append(
+ Prefix(
+ pk=node['pk'],
+ _depth=len(stack),
+ _children=node['children']
+ )
+ )
+ push_to_stack(stack, p)
+
+ # Flush the update queue once it reaches 100 Prefixes
+ if len(update_queue) >= 100:
+ Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
+ update_queue = []
+ print(f' [{i}/{total_count}]')
+
+ # Clear out any prefixes remaining in the stack
+ while stack:
+ node = stack.pop()
+ update_queue.append(
+ Prefix(
+ pk=node['pk'],
+ _depth=len(stack),
+ _children=node['children']
+ )
+ )
+
+ # Final flush of any remaining Prefixes
+ Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('ipam', '0047_prefix_depth_children'),
+ ]
+
+ operations = [
+ migrations.RunPython(
+ code=populate_prefix_hierarchy,
+ reverse_code=migrations.RunPython.noop
+ ),
+ ]
diff --git a/netbox/ipam/models/ip.py b/netbox/ipam/models/ip.py
index 7df84c98b..c6c8cf74c 100644
--- a/netbox/ipam/models/ip.py
+++ b/netbox/ipam/models/ip.py
@@ -293,6 +293,16 @@ class Prefix(PrimaryModel):
blank=True
)
+ # Cached depth & child counts
+ _depth = models.PositiveSmallIntegerField(
+ default=0,
+ editable=False
+ )
+ _children = models.PositiveBigIntegerField(
+ default=0,
+ editable=False
+ )
+
objects = PrefixQuerySet.as_manager()
csv_headers = [
@@ -306,6 +316,13 @@ class Prefix(PrimaryModel):
ordering = (F('vrf').asc(nulls_first=True), 'prefix', 'pk') # (vrf, prefix) may be non-unique
verbose_name_plural = 'prefixes'
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ # Cache the original prefix and VRF so we can check if they have changed on post_save
+ self._prefix = self.prefix
+ self._vrf = self.vrf
+
def __str__(self):
return str(self.prefix)
@@ -373,6 +390,14 @@ class Prefix(PrimaryModel):
return self.prefix.version
return None
+ @property
+ def depth(self):
+ return self._depth
+
+ @property
+ def children(self):
+ return self._children
+
def _set_prefix_length(self, value):
"""
Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
@@ -385,6 +410,26 @@ class Prefix(PrimaryModel):
def get_status_class(self):
return PrefixStatusChoices.CSS_CLASSES.get(self.status)
+ def get_parents(self, include_self=False):
+ """
+ Return all containing Prefixes in the hierarchy.
+ """
+ lookup = 'net_contains_or_equals' if include_self else 'net_contains'
+ return Prefix.objects.filter(**{
+ 'vrf': self.vrf,
+ f'prefix__{lookup}': self.prefix
+ })
+
+ def get_children(self, include_self=False):
+ """
+ Return all covered Prefixes in the hierarchy.
+ """
+ lookup = 'net_contained_or_equal' if include_self else 'net_contained'
+ return Prefix.objects.filter(**{
+ 'vrf': self.vrf,
+ f'prefix__{lookup}': self.prefix
+ })
+
def get_duplicates(self):
return Prefix.objects.filter(vrf=self.vrf, prefix=str(self.prefix)).exclude(pk=self.pk)
diff --git a/netbox/ipam/querysets.py b/netbox/ipam/querysets.py
index 784d58342..7edac2eff 100644
--- a/netbox/ipam/querysets.py
+++ b/netbox/ipam/querysets.py
@@ -1,27 +1,32 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
+from django.db.models.expressions import RawSQL
from utilities.querysets import RestrictedQuerySet
class PrefixQuerySet(RestrictedQuerySet):
- def annotate_tree(self):
+ def annotate_hierarchy(self):
"""
- Annotate the number of parent and child prefixes for each Prefix. Raw SQL is needed for these subqueries
- because we need to cast NULL VRF values to integers for comparison. (NULL != NULL).
+ Annotate the depth and number of child prefixes for each Prefix. Cast null VRF values to zero for
+ comparison. (NULL != NULL).
"""
- return self.extra(
- select={
- 'parents': 'SELECT COUNT(U0."prefix") AS "c" '
- 'FROM "ipam_prefix" U0 '
- 'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
- 'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
- 'children': 'SELECT COUNT(U1."prefix") AS "c" '
- 'FROM "ipam_prefix" U1 '
- 'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
- 'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
- }
+ return self.annotate(
+ hierarchy_depth=RawSQL(
+ 'SELECT COUNT(DISTINCT U0."prefix") AS "c" '
+ 'FROM "ipam_prefix" U0 '
+ 'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
+ 'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
+ ()
+ ),
+ hierarchy_children=RawSQL(
+ 'SELECT COUNT(U1."prefix") AS "c" '
+ 'FROM "ipam_prefix" U1 '
+ 'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
+ 'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
+ ()
+ )
)
diff --git a/netbox/ipam/signals.py b/netbox/ipam/signals.py
index a8fce8310..f8673b10e 100644
--- a/netbox/ipam/signals.py
+++ b/netbox/ipam/signals.py
@@ -1,9 +1,52 @@
-from django.db.models.signals import pre_delete
+from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from dcim.models import Device
from virtualization.models import VirtualMachine
-from .models import IPAddress
+from .models import IPAddress, Prefix
+
+
+def update_parents_children(prefix):
+ """
+ Update depth on prefix & containing prefixes
+ """
+ parents = prefix.get_parents(include_self=True).annotate_hierarchy()
+ for parent in parents:
+ parent._children = parent.hierarchy_children
+ Prefix.objects.bulk_update(parents, ['_children'])
+
+
+def update_children_depth(prefix):
+ """
+ Update children count on prefix & contained prefixes
+ """
+ children = prefix.get_children(include_self=True).annotate_hierarchy()
+ for child in children:
+ child._depth = child.hierarchy_depth
+ Prefix.objects.bulk_update(children, ['_depth'])
+
+
+@receiver(post_save, sender=Prefix)
+def handle_prefix_saved(instance, created, **kwargs):
+
+ # Prefix has changed (or new instance has been created)
+ if created or instance.vrf != instance._vrf or instance.prefix != instance._prefix:
+
+ update_parents_children(instance)
+ update_children_depth(instance)
+
+ # If this is not a new prefix, clean up parent/children of previous prefix
+ if not created:
+ old_prefix = Prefix(vrf=instance._vrf, prefix=instance._prefix)
+ update_parents_children(old_prefix)
+ update_children_depth(old_prefix)
+
+
+@receiver(post_delete, sender=Prefix)
+def handle_prefix_deleted(instance, **kwargs):
+
+ update_parents_children(instance)
+ update_children_depth(instance)
@receiver(pre_delete, sender=IPAddress)
diff --git a/netbox/ipam/tables.py b/netbox/ipam/tables.py
index 82e8751a9..12c835e6c 100644
--- a/netbox/ipam/tables.py
+++ b/netbox/ipam/tables.py
@@ -15,7 +15,7 @@ AVAILABLE_LABEL = mark_safe('Available'
PREFIX_LINK = """
{% load helpers %}
-{% for i in record.parents|as_range %}
+{% for i in record.depth|as_range %}
{% endfor %}
{{ record.prefix }}
@@ -262,6 +262,14 @@ class PrefixTable(BaseTable):
template_code=PREFIX_LINK,
attrs={'td': {'class': 'text-nowrap'}}
)
+ depth = tables.Column(
+ accessor=Accessor('_depth'),
+ verbose_name='Depth'
+ )
+ children = tables.Column(
+ accessor=Accessor('_children'),
+ verbose_name='Children'
+ )
status = ChoiceFieldColumn(
default=AVAILABLE_LABEL
)
@@ -287,7 +295,8 @@ class PrefixTable(BaseTable):
class Meta(BaseTable.Meta):
model = Prefix
fields = (
- 'pk', 'prefix', 'status', 'children', 'vrf', 'tenant', 'site', 'vlan', 'role', 'is_pool', 'description',
+ 'pk', 'prefix', 'status', 'depth', 'children', 'vrf', 'tenant', 'site', 'vlan', 'role', 'is_pool',
+ 'description',
)
default_columns = ('pk', 'prefix', 'status', 'vrf', 'tenant', 'site', 'vlan', 'role', 'description')
row_attrs = {
diff --git a/netbox/ipam/tests/test_models.py b/netbox/ipam/tests/test_models.py
index a47862165..4fefdec54 100644
--- a/netbox/ipam/tests/test_models.py
+++ b/netbox/ipam/tests/test_models.py
@@ -1,4 +1,4 @@
-import netaddr
+from netaddr import IPNetwork, IPSet
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
@@ -10,27 +10,27 @@ class TestAggregate(TestCase):
def test_get_utilization(self):
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
- aggregate = Aggregate(prefix=netaddr.IPNetwork('10.0.0.0/8'), rir=rir)
+ aggregate = Aggregate(prefix=IPNetwork('10.0.0.0/8'), rir=rir)
aggregate.save()
# 25% utilization
Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/12')),
- Prefix(prefix=netaddr.IPNetwork('10.16.0.0/12')),
- Prefix(prefix=netaddr.IPNetwork('10.32.0.0/12')),
- Prefix(prefix=netaddr.IPNetwork('10.48.0.0/12')),
+ Prefix(prefix=IPNetwork('10.0.0.0/12')),
+ Prefix(prefix=IPNetwork('10.16.0.0/12')),
+ Prefix(prefix=IPNetwork('10.32.0.0/12')),
+ Prefix(prefix=IPNetwork('10.48.0.0/12')),
))
self.assertEqual(aggregate.get_utilization(), 25)
# 50% utilization
Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.64.0.0/10')),
+ Prefix(prefix=IPNetwork('10.64.0.0/10')),
))
self.assertEqual(aggregate.get_utilization(), 50)
# 100% utilization
Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.128.0.0/9')),
+ Prefix(prefix=IPNetwork('10.128.0.0/9')),
))
self.assertEqual(aggregate.get_utilization(), 100)
@@ -39,9 +39,9 @@ class TestPrefix(TestCase):
def test_get_duplicates(self):
prefixes = Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
- Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
- Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
+ Prefix(prefix=IPNetwork('192.0.2.0/24')),
+ Prefix(prefix=IPNetwork('192.0.2.0/24')),
+ Prefix(prefix=IPNetwork('192.0.2.0/24')),
))
duplicate_prefix_pks = [p.pk for p in prefixes[0].get_duplicates()]
@@ -54,11 +54,11 @@ class TestPrefix(TestCase):
VRF(name='VRF 3'),
))
prefixes = Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/24'), vrf=None),
- Prefix(prefix=netaddr.IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
- Prefix(prefix=netaddr.IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
- Prefix(prefix=netaddr.IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
+ Prefix(prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
+ Prefix(prefix=IPNetwork('10.0.0.0/24'), vrf=None),
+ Prefix(prefix=IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
+ Prefix(prefix=IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
+ Prefix(prefix=IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
))
child_prefix_pks = {p.pk for p in prefixes[0].get_child_prefixes()}
@@ -79,13 +79,13 @@ class TestPrefix(TestCase):
VRF(name='VRF 3'),
))
parent_prefix = Prefix.objects.create(
- prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
+ prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
)
ips = IPAddress.objects.bulk_create((
- IPAddress(address=netaddr.IPNetwork('10.0.0.1/24'), vrf=None),
- IPAddress(address=netaddr.IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
- IPAddress(address=netaddr.IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
- IPAddress(address=netaddr.IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
+ IPAddress(address=IPNetwork('10.0.0.1/24'), vrf=None),
+ IPAddress(address=IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
+ IPAddress(address=IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
+ IPAddress(address=IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
))
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
@@ -102,16 +102,16 @@ class TestPrefix(TestCase):
def test_get_available_prefixes(self):
prefixes = Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/20')),
- Prefix(prefix=netaddr.IPNetwork('10.0.32.0/20')),
- Prefix(prefix=netaddr.IPNetwork('10.0.128.0/18')),
+ Prefix(prefix=IPNetwork('10.0.0.0/16')), # Parent prefix
+ Prefix(prefix=IPNetwork('10.0.0.0/20')),
+ Prefix(prefix=IPNetwork('10.0.32.0/20')),
+ Prefix(prefix=IPNetwork('10.0.128.0/18')),
))
- missing_prefixes = netaddr.IPSet([
- netaddr.IPNetwork('10.0.16.0/20'),
- netaddr.IPNetwork('10.0.48.0/20'),
- netaddr.IPNetwork('10.0.64.0/18'),
- netaddr.IPNetwork('10.0.192.0/18'),
+ missing_prefixes = IPSet([
+ IPNetwork('10.0.16.0/20'),
+ IPNetwork('10.0.48.0/20'),
+ IPNetwork('10.0.64.0/18'),
+ IPNetwork('10.0.192.0/18'),
])
available_prefixes = prefixes[0].get_available_prefixes()
@@ -119,17 +119,17 @@ class TestPrefix(TestCase):
def test_get_available_ips(self):
- parent_prefix = Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.0.0/28'))
+ parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/28'))
IPAddress.objects.bulk_create((
- IPAddress(address=netaddr.IPNetwork('10.0.0.1/26')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.3/26')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.5/26')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.7/26')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.9/26')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.11/26')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.13/26')),
+ IPAddress(address=IPNetwork('10.0.0.1/26')),
+ IPAddress(address=IPNetwork('10.0.0.3/26')),
+ IPAddress(address=IPNetwork('10.0.0.5/26')),
+ IPAddress(address=IPNetwork('10.0.0.7/26')),
+ IPAddress(address=IPNetwork('10.0.0.9/26')),
+ IPAddress(address=IPNetwork('10.0.0.11/26')),
+ IPAddress(address=IPNetwork('10.0.0.13/26')),
))
- missing_ips = netaddr.IPSet([
+ missing_ips = IPSet([
'10.0.0.2/32',
'10.0.0.4/32',
'10.0.0.6/32',
@@ -145,39 +145,39 @@ class TestPrefix(TestCase):
def test_get_first_available_prefix(self):
prefixes = Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/24')),
- Prefix(prefix=netaddr.IPNetwork('10.0.1.0/24')),
- Prefix(prefix=netaddr.IPNetwork('10.0.2.0/24')),
+ Prefix(prefix=IPNetwork('10.0.0.0/16')), # Parent prefix
+ Prefix(prefix=IPNetwork('10.0.0.0/24')),
+ Prefix(prefix=IPNetwork('10.0.1.0/24')),
+ Prefix(prefix=IPNetwork('10.0.2.0/24')),
))
- self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.3.0/24'))
+ self.assertEqual(prefixes[0].get_first_available_prefix(), IPNetwork('10.0.3.0/24'))
- Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.3.0/24'))
- self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.4.0/22'))
+ Prefix.objects.create(prefix=IPNetwork('10.0.3.0/24'))
+ self.assertEqual(prefixes[0].get_first_available_prefix(), IPNetwork('10.0.4.0/22'))
def test_get_first_available_ip(self):
- parent_prefix = Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.0.0/24'))
+ parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/24'))
IPAddress.objects.bulk_create((
- IPAddress(address=netaddr.IPNetwork('10.0.0.1/24')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.2/24')),
- IPAddress(address=netaddr.IPNetwork('10.0.0.3/24')),
+ IPAddress(address=IPNetwork('10.0.0.1/24')),
+ IPAddress(address=IPNetwork('10.0.0.2/24')),
+ IPAddress(address=IPNetwork('10.0.0.3/24')),
))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.4/24')
- IPAddress.objects.create(address=netaddr.IPNetwork('10.0.0.4/24'))
+ IPAddress.objects.create(address=IPNetwork('10.0.0.4/24'))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.5/24')
def test_get_utilization(self):
# Container Prefix
prefix = Prefix.objects.create(
- prefix=netaddr.IPNetwork('10.0.0.0/24'),
+ prefix=IPNetwork('10.0.0.0/24'),
status=PrefixStatusChoices.STATUS_CONTAINER
)
Prefix.objects.bulk_create((
- Prefix(prefix=netaddr.IPNetwork('10.0.0.0/26')),
- Prefix(prefix=netaddr.IPNetwork('10.0.0.128/26')),
+ Prefix(prefix=IPNetwork('10.0.0.0/26')),
+ Prefix(prefix=IPNetwork('10.0.0.128/26')),
))
self.assertEqual(prefix.get_utilization(), 50)
@@ -186,7 +186,7 @@ class TestPrefix(TestCase):
prefix.save()
IPAddress.objects.bulk_create(
# Create 32 IPAddresses within the Prefix
- [IPAddress(address=netaddr.IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
+ [IPAddress(address=IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
)
self.assertEqual(prefix.get_utilization(), 12) # ~= 12%
@@ -196,36 +196,234 @@ class TestPrefix(TestCase):
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
- Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
- duplicate_prefix = Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24'))
+ Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
+ duplicate_prefix = Prefix(prefix=IPNetwork('192.0.2.0/24'))
self.assertIsNone(duplicate_prefix.clean())
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_global_unique(self):
- Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
- duplicate_prefix = Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24'))
+ Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
+ duplicate_prefix = Prefix(prefix=IPNetwork('192.0.2.0/24'))
self.assertRaises(ValidationError, duplicate_prefix.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
- Prefix.objects.create(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
- duplicate_prefix = Prefix(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
+ Prefix.objects.create(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
+ duplicate_prefix = Prefix(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
self.assertIsNone(duplicate_prefix.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
- Prefix.objects.create(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
- duplicate_prefix = Prefix(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
+ Prefix.objects.create(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
+ duplicate_prefix = Prefix(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
self.assertRaises(ValidationError, duplicate_prefix.clean)
+class TestPrefixHierarchy(TestCase):
+ """
+ Test the automatic updating of depth and child count in response to changes made within
+ the prefix hierarchy.
+ """
+ @classmethod
+ def setUpTestData(cls):
+
+ prefixes = (
+
+ # IPv4
+ Prefix(prefix='10.0.0.0/8', _depth=0, _children=2),
+ Prefix(prefix='10.0.0.0/16', _depth=1, _children=1),
+ Prefix(prefix='10.0.0.0/24', _depth=2, _children=0),
+
+ # IPv6
+ Prefix(prefix='2001:db8::/32', _depth=0, _children=2),
+ Prefix(prefix='2001:db8::/40', _depth=1, _children=1),
+ Prefix(prefix='2001:db8::/48', _depth=2, _children=0),
+
+ )
+ Prefix.objects.bulk_create(prefixes)
+
+ def test_create_prefix4(self):
+ # Create 10.0.0.0/12
+ Prefix(prefix='10.0.0.0/12').save()
+
+ prefixes = Prefix.objects.filter(prefix__family=4)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 3)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 2)
+ self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
+ self.assertEqual(prefixes[2]._depth, 2)
+ self.assertEqual(prefixes[2]._children, 1)
+ self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
+ self.assertEqual(prefixes[3]._depth, 3)
+ self.assertEqual(prefixes[3]._children, 0)
+
+ def test_create_prefix6(self):
+ # Create 2001:db8::/36
+ Prefix(prefix='2001:db8::/36').save()
+
+ prefixes = Prefix.objects.filter(prefix__family=6)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 3)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 2)
+ self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
+ self.assertEqual(prefixes[2]._depth, 2)
+ self.assertEqual(prefixes[2]._children, 1)
+ self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
+ self.assertEqual(prefixes[3]._depth, 3)
+ self.assertEqual(prefixes[3]._children, 0)
+
+ def test_update_prefix4(self):
+ # Change 10.0.0.0/24 to 10.0.0.0/12
+ p = Prefix.objects.get(prefix='10.0.0.0/24')
+ p.prefix = '10.0.0.0/12'
+ p.save()
+
+ prefixes = Prefix.objects.filter(prefix__family=4)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 2)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 1)
+ self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
+ self.assertEqual(prefixes[2]._depth, 2)
+ self.assertEqual(prefixes[2]._children, 0)
+
+ def test_update_prefix6(self):
+ # Change 2001:db8::/48 to 2001:db8::/36
+ p = Prefix.objects.get(prefix='2001:db8::/48')
+ p.prefix = '2001:db8::/36'
+ p.save()
+
+ prefixes = Prefix.objects.filter(prefix__family=6)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 2)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 1)
+ self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
+ self.assertEqual(prefixes[2]._depth, 2)
+ self.assertEqual(prefixes[2]._children, 0)
+
+ def test_update_prefix_vrf4(self):
+ vrf = VRF(name='VRF A')
+ vrf.save()
+
+ # Move 10.0.0.0/16 to a VRF
+ p = Prefix.objects.get(prefix='10.0.0.0/16')
+ p.vrf = vrf
+ p.save()
+
+ prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=4)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 1)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 0)
+
+ prefixes = Prefix.objects.filter(vrf=vrf)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/16'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 0)
+
+ def test_update_prefix_vrf6(self):
+ vrf = VRF(name='VRF A')
+ vrf.save()
+
+ # Move 2001:db8::/40 to a VRF
+ p = Prefix.objects.get(prefix='2001:db8::/40')
+ p.vrf = vrf
+ p.save()
+
+ prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=6)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 1)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 0)
+
+ prefixes = Prefix.objects.filter(vrf=vrf)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/40'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 0)
+
+ def test_delete_prefix4(self):
+ # Delete 10.0.0.0/16
+ Prefix.objects.filter(prefix='10.0.0.0/16').delete()
+
+ prefixes = Prefix.objects.filter(prefix__family=4)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 1)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 0)
+
+ def test_delete_prefix6(self):
+ # Delete 2001:db8::/40
+ Prefix.objects.filter(prefix='2001:db8::/40').delete()
+
+ prefixes = Prefix.objects.filter(prefix__family=6)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 1)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 0)
+
+ def test_duplicate_prefix4(self):
+ # Duplicate 10.0.0.0/16
+ Prefix(prefix='10.0.0.0/16').save()
+
+ prefixes = Prefix.objects.filter(prefix__family=4)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 3)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/16'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 1)
+ self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
+ self.assertEqual(prefixes[2]._depth, 1)
+ self.assertEqual(prefixes[2]._children, 1)
+ self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
+ self.assertEqual(prefixes[3]._depth, 2)
+ self.assertEqual(prefixes[3]._children, 0)
+
+ def test_duplicate_prefix6(self):
+ # Duplicate 2001:db8::/40
+ Prefix(prefix='2001:db8::/40').save()
+
+ prefixes = Prefix.objects.filter(prefix__family=6)
+ self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
+ self.assertEqual(prefixes[0]._depth, 0)
+ self.assertEqual(prefixes[0]._children, 3)
+ self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/40'))
+ self.assertEqual(prefixes[1]._depth, 1)
+ self.assertEqual(prefixes[1]._children, 1)
+ self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
+ self.assertEqual(prefixes[2]._depth, 1)
+ self.assertEqual(prefixes[2]._children, 1)
+ self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
+ self.assertEqual(prefixes[3]._depth, 2)
+ self.assertEqual(prefixes[3]._children, 0)
+
+
class TestIPAddress(TestCase):
def test_get_duplicates(self):
ips = IPAddress.objects.bulk_create((
- IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
- IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
- IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
+ IPAddress(address=IPNetwork('192.0.2.1/24')),
+ IPAddress(address=IPNetwork('192.0.2.1/24')),
+ IPAddress(address=IPNetwork('192.0.2.1/24')),
))
duplicate_ip_pks = [p.pk for p in ips[0].get_duplicates()]
@@ -237,44 +435,44 @@ class TestIPAddress(TestCase):
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
- IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
- duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
+ duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_global_unique(self):
- IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
- duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
+ duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
- IPAddress.objects.create(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
- duplicate_ip = IPAddress(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
+ IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
+ duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
- IPAddress.objects.create(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
- duplicate_ip = IPAddress(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
+ IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
+ duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_nonrole_role(self):
- IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
- duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
+ duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_role_nonrole(self):
- IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
- duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
+ duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_role(self):
- IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
- IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
class TestVLANGroup(TestCase):
diff --git a/netbox/ipam/views.py b/netbox/ipam/views.py
index 168933af7..7c1a06cf3 100644
--- a/netbox/ipam/views.py
+++ b/netbox/ipam/views.py
@@ -238,7 +238,7 @@ class AggregateView(generic.ObjectView):
'site', 'role'
).order_by(
'prefix'
- ).annotate_tree()
+ )
# Add available prefixes to the table if requested
if request.GET.get('show_available', 'true') == 'true':
@@ -352,7 +352,7 @@ class RoleBulkDeleteView(generic.BulkDeleteView):
#
class PrefixListView(generic.ObjectListView):
- queryset = Prefix.objects.annotate_tree()
+ queryset = Prefix.objects.all()
filterset = filtersets.PrefixFilterSet
filterset_form = forms.PrefixFilterForm
table = tables.PrefixDetailTable
@@ -377,7 +377,7 @@ class PrefixView(generic.ObjectView):
prefix__net_contains=str(instance.prefix)
).prefetch_related(
'site', 'role'
- ).annotate_tree()
+ )
parent_prefix_table = tables.PrefixTable(list(parent_prefixes), orderable=False)
parent_prefix_table.exclude = ('vrf',)
@@ -407,7 +407,7 @@ class PrefixPrefixesView(generic.ObjectView):
# Child prefixes table
child_prefixes = instance.get_child_prefixes().restrict(request.user, 'view').prefetch_related(
'site', 'vlan', 'role',
- ).annotate_tree()
+ )
# Add available prefixes to the table if requested
if child_prefixes and request.GET.get('show_available', 'true') == 'true':