Closed #9763: Treat IP ranges as fully populated (#19064)

This commit is contained in:
Jeremy Stretch
2025-04-09 10:30:11 -04:00
committed by GitHub
parent 076d16ca6b
commit f8f2ad1d14
16 changed files with 256 additions and 77 deletions

View File

@@ -147,7 +147,8 @@ class IPRangeSerializer(NetBoxModelSerializer):
fields = [
'id', 'url', 'display_url', 'display', 'family', 'start_address', 'end_address', 'size', 'vrf', 'tenant',
'status', 'role', 'description', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
'mark_utilized', 'description', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
'mark_populated', 'mark_utilized', 'description', 'comments', 'tags', 'custom_fields', 'created',
'last_updated',
]
brief_fields = ('id', 'url', 'display', 'family', 'start_address', 'end_address', 'description')

View File

@@ -478,7 +478,7 @@ class IPRangeFilterSet(TenancyFilterSet, NetBoxModelFilterSet):
class Meta:
model = IPRange
fields = ('id', 'mark_utilized', 'size', 'description')
fields = ('id', 'mark_populated', 'mark_utilized', 'size', 'description')
def search(self, queryset, name, value):
if not value.strip():

View File

@@ -296,6 +296,11 @@ class IPRangeBulkEditForm(NetBoxModelBulkEditForm):
queryset=Role.objects.all(),
required=False
)
mark_populated = forms.NullBooleanField(
required=False,
widget=BulkEditNullBooleanSelect(),
label=_('Treat as populated')
)
mark_utilized = forms.NullBooleanField(
required=False,
widget=BulkEditNullBooleanSelect(),

View File

@@ -268,8 +268,8 @@ class IPRangeImportForm(NetBoxModelImportForm):
class Meta:
model = IPRange
fields = (
'start_address', 'end_address', 'vrf', 'tenant', 'status', 'role', 'mark_utilized', 'description',
'comments', 'tags',
'start_address', 'end_address', 'vrf', 'tenant', 'status', 'role', 'mark_populated', 'mark_utilized',
'description', 'comments', 'tags',
)

View File

@@ -266,7 +266,7 @@ class IPRangeFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
model = IPRange
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
FieldSet('family', 'vrf_id', 'status', 'role_id', 'mark_utilized', name=_('Attributes')),
FieldSet('family', 'vrf_id', 'status', 'role_id', 'mark_populated', 'mark_utilized', name=_('Attributes')),
FieldSet('tenant_group_id', 'tenant_id', name=_('Tenant')),
)
family = forms.ChoiceField(
@@ -291,6 +291,13 @@ class IPRangeFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
null_option='None',
label=_('Role')
)
mark_populated = forms.NullBooleanField(
required=False,
label=_('Treat as populated'),
widget=forms.Select(
choices=BOOLEAN_WITH_BLANK_CHOICES
)
)
mark_utilized = forms.NullBooleanField(
required=False,
label=_('Treat as fully utilized'),

View File

@@ -257,8 +257,8 @@ class IPRangeForm(TenancyForm, NetBoxModelForm):
fieldsets = (
FieldSet(
'vrf', 'start_address', 'end_address', 'role', 'status', 'mark_utilized', 'description', 'tags',
name=_('IP Range')
'vrf', 'start_address', 'end_address', 'role', 'status', 'mark_populated', 'mark_utilized', 'description',
'tags', name=_('IP Range')
),
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
)
@@ -266,8 +266,8 @@ class IPRangeForm(TenancyForm, NetBoxModelForm):
class Meta:
model = IPRange
fields = [
'vrf', 'start_address', 'end_address', 'status', 'role', 'tenant_group', 'tenant', 'mark_utilized',
'description', 'comments', 'tags',
'vrf', 'start_address', 'end_address', 'status', 'role', 'tenant_group', 'tenant', 'mark_populated',
'mark_utilized', 'description', 'comments', 'tags',
]

View File

@@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0077_vlangroup_tenant'),
]
operations = [
migrations.AddField(
model_name='iprange',
name='mark_populated',
field=models.BooleanField(default=False),
),
]

View File

@@ -383,14 +383,15 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
else:
return Prefix.objects.filter(prefix__net_contained=str(self.prefix), vrf=self.vrf)
def get_child_ranges(self):
def get_child_ranges(self, **kwargs):
"""
Return all IPRanges within this Prefix and VRF.
"""
return IPRange.objects.filter(
vrf=self.vrf,
start_address__net_host_contained=str(self.prefix),
end_address__net_host_contained=str(self.prefix)
end_address__net_host_contained=str(self.prefix),
**kwargs
)
def get_child_ips(self):
@@ -407,15 +408,14 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
"""
Return all available IPs within this prefix as an IPSet.
"""
if self.mark_utilized:
return netaddr.IPSet()
prefix = netaddr.IPSet(self.prefix)
child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
child_ranges = []
for iprange in self.get_child_ranges():
child_ranges.append(iprange.range)
available_ips = prefix - child_ips - netaddr.IPSet(child_ranges)
child_ips = netaddr.IPSet([
ip.address.ip for ip in self.get_child_ips()
])
child_ranges = netaddr.IPSet([
iprange.range for iprange in self.get_child_ranges().filter(mark_populated=True)
])
available_ips = prefix - child_ips - child_ranges
# IPv6 /127's, pool, or IPv4 /31-/32 sets are fully usable
if (self.family == 6 and self.prefix.prefixlen >= 127) or self.is_pool or (
@@ -433,6 +433,7 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
# For IPv6 prefixes, omit the Subnet-Router anycast address
# per RFC 4291
available_ips -= netaddr.IPSet([netaddr.IPAddress(self.prefix.first)])
return available_ips
def get_first_available_ip(self):
@@ -461,9 +462,11 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
utilization = float(child_prefixes.size) / self.prefix.size * 100
else:
# Compile an IPSet to avoid counting duplicate IPs
child_ips = netaddr.IPSet(
[_.range for _ in self.get_child_ranges()] + [_.address.ip for _ in self.get_child_ips()]
)
child_ips = netaddr.IPSet()
for iprange in self.get_child_ranges().filter(mark_utilized=True):
child_ips.add(iprange.range)
for ip in self.get_child_ips():
child_ips.add(ip.address.ip)
prefix_size = self.prefix.size
if self.prefix.version == 4 and self.prefix.prefixlen < 31 and not self.is_pool:
@@ -519,14 +522,19 @@ class IPRange(ContactsMixin, PrimaryModel):
null=True,
help_text=_('The primary function of this range')
)
mark_populated = models.BooleanField(
verbose_name=_('mark populated'),
default=False,
help_text=_("Prevent the creation of IP addresses within this range")
)
mark_utilized = models.BooleanField(
verbose_name=_('mark utilized'),
default=False,
help_text=_("Treat as fully utilized")
help_text=_("Report space as 100% utilized")
)
clone_fields = (
'vrf', 'tenant', 'status', 'role', 'description',
'vrf', 'tenant', 'status', 'role', 'description', 'mark_populated', 'mark_utilized',
)
class Meta:
@@ -663,6 +671,9 @@ class IPRange(ContactsMixin, PrimaryModel):
"""
Return all available IPs within this range as an IPSet.
"""
if self.mark_populated:
return netaddr.IPSet()
range = netaddr.IPRange(self.start_address.ip, self.end_address.ip)
child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
@@ -875,6 +886,20 @@ class IPAddress(ContactsMixin, PrimaryModel):
)
})
# Disallow the creation of IPAddresses within an IPRange with mark_populated=True
parent_range = IPRange.objects.filter(
start_address__lte=self.address,
end_address__gte=self.address,
vrf=self.vrf,
mark_populated=True
).first()
if parent_range:
raise ValidationError({
'address': _(
"Cannot create IP address {ip} inside range {range}."
).format(ip=self.address, range=parent_range)
})
if self._original_assigned_object_id and self._original_assigned_object_type_id:
parent = getattr(self.assigned_object, 'parent_object', None)
ct = ObjectType.objects.get_for_id(self._original_assigned_object_type_id)

View File

@@ -10,6 +10,7 @@ from .template_code import *
__all__ = (
'AggregateTable',
'AnnotatedIPAddressTable',
'AssignedIPAddressesTable',
'IPAddressAssignTable',
'IPAddressTable',
@@ -268,6 +269,10 @@ class IPRangeTable(TenancyColumnsMixin, NetBoxTable):
verbose_name=_('Role'),
linkify=True
)
mark_populated = columns.BooleanColumn(
verbose_name=_('Marked Populated'),
false_mark=None
)
mark_utilized = columns.BooleanColumn(
verbose_name=_('Marked Utilized'),
false_mark=None
@@ -288,7 +293,8 @@ class IPRangeTable(TenancyColumnsMixin, NetBoxTable):
model = IPRange
fields = (
'pk', 'id', 'start_address', 'end_address', 'size', 'vrf', 'status', 'role', 'tenant', 'tenant_group',
'mark_utilized', 'utilization', 'description', 'comments', 'tags', 'created', 'last_updated',
'mark_populated', 'mark_utilized', 'utilization', 'description', 'comments', 'tags', 'created',
'last_updated',
)
default_columns = (
'pk', 'start_address', 'end_address', 'size', 'vrf', 'status', 'role', 'tenant', 'description',
@@ -303,8 +309,8 @@ class IPRangeTable(TenancyColumnsMixin, NetBoxTable):
#
class IPAddressTable(TenancyColumnsMixin, NetBoxTable):
address = tables.TemplateColumn(
template_code=IPADDRESS_LINK,
address = tables.Column(
linkify=True,
verbose_name=_('IP Address')
)
vrf = tables.TemplateColumn(
@@ -369,6 +375,16 @@ class IPAddressTable(TenancyColumnsMixin, NetBoxTable):
}
class AnnotatedIPAddressTable(IPAddressTable):
address = tables.TemplateColumn(
template_code=IPADDRESS_LINK,
verbose_name=_('IP Address')
)
class Meta(IPAddressTable.Meta):
pass
class IPAddressAssignTable(NetBoxTable):
address = tables.TemplateColumn(
template_code=IPADDRESS_ASSIGN_LINK,

View File

@@ -26,12 +26,12 @@ PREFIX_LINK_WITH_DEPTH = """
""" + PREFIX_LINK
IPADDRESS_LINK = """
{% if record.pk %}
<a href="{{ record.get_absolute_url }}" id="ipaddress_{{ record.pk }}">{{ record.address }}</a>
{% if record.address or record.start_address %}
<a href="{{ record.get_absolute_url }}">{{ record }}</a>
{% elif perms.ipam.add_ipaddress %}
<a href="{% url 'ipam:ipaddress_add' %}?address={{ record.1 }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.tenant %}&tenant={{ object.tenant.pk }}{% endif %}&return_url={% url 'ipam:prefix_ipaddresses' pk=object.pk %}" class="btn btn-sm btn-success">{% if record.0 <= 65536 %}{{ record.0 }}{% else %}Many{% endif %} IP{{ record.0|pluralize }} available</a>
<a href="{% url 'ipam:ipaddress_add' %}?address={{ record.first_ip }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.tenant %}&tenant={{ object.tenant.pk }}{% endif %}&return_url={% url 'ipam:prefix_ipaddresses' pk=object.pk %}" class="btn btn-sm btn-success">{{ record.title }}</a>
{% else %}
{% if record.0 <= 65536 %}{{ record.0 }}{% else %}Many{% endif %} IP{{ record.0|pluralize }} available
{{ record.title }}
{% endif %}
"""

View File

@@ -918,7 +918,9 @@ class IPRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
tenant=None,
role=None,
status=IPRangeStatusChoices.STATUS_ACTIVE,
description='foobar1'
description='foobar1',
mark_populated=True,
mark_utilized=True,
),
IPRange(
start_address='10.0.2.100/24',
@@ -955,7 +957,9 @@ class IPRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
vrf=None,
tenant=None,
role=None,
status=IPRangeStatusChoices.STATUS_ACTIVE
status=IPRangeStatusChoices.STATUS_ACTIVE,
mark_populated=True,
mark_utilized=True,
),
IPRange(
start_address='2001:db8:0:2::1/64',
@@ -1051,6 +1055,18 @@ class IPRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
params = {'parent': ['10.0.1.0/25']} # Range 10.0.1.100-199 is not fully contained by 10.0.1.0/25
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 0)
def test_mark_utilized(self):
params = {'mark_utilized': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'mark_utilized': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
def test_mark_populated(self):
params = {'mark_populated': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'mark_populated': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
class IPAddressTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = IPAddress.objects.all()

View File

@@ -211,16 +211,25 @@ class TestPrefix(TestCase):
IPAddress(address=IPNetwork('10.0.0.5/26')),
IPAddress(address=IPNetwork('10.0.0.7/26')),
))
# Range is not marked as populated, so it doesn't count against available IP space
IPRange.objects.create(
start_address=IPNetwork('10.0.0.9/26'),
end_address=IPNetwork('10.0.0.12/26')
end_address=IPNetwork('10.0.0.10/26')
)
# Populated range reduces available IP space
IPRange.objects.create(
start_address=IPNetwork('10.0.0.12/26'),
end_address=IPNetwork('10.0.0.13/26'),
mark_populated=True
)
missing_ips = IPSet([
'10.0.0.2/32',
'10.0.0.4/32',
'10.0.0.6/32',
'10.0.0.8/32',
'10.0.0.13/32',
'10.0.0.9/32',
'10.0.0.10/32',
'10.0.0.11/32',
'10.0.0.14/32',
])
available_ips = parent_prefix.get_available_ips()
@@ -286,8 +295,12 @@ class TestPrefix(TestCase):
])
self.assertEqual(prefix.get_utilization(), 32 / 254 * 100) # ~12.5% utilization
# Create a child range with 32 additional IPs
IPRange.objects.create(start_address=IPNetwork('10.0.0.33/24'), end_address=IPNetwork('10.0.0.64/24'))
# Create a utilized child range with 32 additional IPs
IPRange.objects.create(
start_address=IPNetwork('10.0.0.33/24'),
end_address=IPNetwork('10.0.0.64/24'),
mark_utilized=True
)
self.assertEqual(prefix.get_utilization(), 64 / 254 * 100) # ~25% utilization
#
@@ -569,6 +582,27 @@ class TestIPAddress(TestCase):
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
#
# Range validation
#
def test_create_ip_in_unpopulated_range(self):
IPRange.objects.create(
start_address=IPNetwork('192.0.2.1/24'),
end_address=IPNetwork('192.0.2.100/24')
)
ip = IPAddress(address=IPNetwork('192.0.2.10/24'))
ip.full_clean()
def test_create_ip_in_populated_range(self):
IPRange.objects.create(
start_address=IPNetwork('192.0.2.1/24'),
end_address=IPNetwork('192.0.2.100/24'),
mark_populated=True
)
ip = IPAddress(address=IPNetwork('192.0.2.10/24'))
self.assertRaises(ValidationError, ip.full_clean)
class TestVLANGroup(TestCase):

View File

@@ -1,17 +1,38 @@
from dataclasses import dataclass
import netaddr
from django.utils.translation import gettext_lazy as _
from .constants import *
from .models import Prefix, VLAN
__all__ = (
'add_available_ipaddresses',
'AvailableIPSpace',
'add_available_vlans',
'add_requested_prefixes',
'annotate_ip_space',
'get_next_available_prefix',
'rebuild_prefixes',
)
@dataclass
class AvailableIPSpace:
"""
A representation of available IP space between two IP addresses/ranges.
"""
size: int
first_ip: str
@property
def title(self):
if self.size == 1:
return _('1 IP available')
if self.size <= 65536:
return _('{count} IPs available').format(count=self.size)
return _('Many IPs available')
def add_requested_prefixes(parent, prefix_list, show_available=True, show_assigned=True):
"""
Return a list of requested prefixes using show_available, show_assigned filters. If available prefixes are
@@ -42,50 +63,69 @@ def add_requested_prefixes(parent, prefix_list, show_available=True, show_assign
return child_prefixes
def add_available_ipaddresses(prefix, ipaddress_list, is_pool=False):
"""
Annotate ranges of available IP addresses within a given prefix. If is_pool is True, the first and last IP will be
considered usable (regardless of mask length).
"""
def annotate_ip_space(prefix):
# Compile child objects
records = []
records.extend([
(iprange.start_address.ip, iprange) for iprange in prefix.get_child_ranges(mark_populated=True)
])
records.extend([
(ip.address.ip, ip) for ip in prefix.get_child_ips()
])
records = sorted(records, key=lambda x: x[0])
# Determine the first & last valid IP addresses in the prefix
if prefix.family == 4 and prefix.mask_length < 31 and not prefix.is_pool:
# Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31
first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first + 1)
last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last - 1)
else:
first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first)
last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last)
if not records:
return [
AvailableIPSpace(
size=int(last_ip_in_prefix - first_ip_in_prefix + 1),
first_ip=f'{first_ip_in_prefix}/{prefix.mask_length}'
)
]
output = []
prev_ip = None
# Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31.
if prefix.version == 4 and prefix.prefixlen < 31 and not is_pool:
first_ip_in_prefix = netaddr.IPAddress(prefix.first + 1)
last_ip_in_prefix = netaddr.IPAddress(prefix.last - 1)
else:
first_ip_in_prefix = netaddr.IPAddress(prefix.first)
last_ip_in_prefix = netaddr.IPAddress(prefix.last)
if not ipaddress_list:
return [(
int(last_ip_in_prefix - first_ip_in_prefix + 1),
'{}/{}'.format(first_ip_in_prefix, prefix.prefixlen)
)]
# Account for any available IPs before the first real IP
if ipaddress_list[0].address.ip > first_ip_in_prefix:
skipped_count = int(ipaddress_list[0].address.ip - first_ip_in_prefix)
first_skipped = '{}/{}'.format(first_ip_in_prefix, prefix.prefixlen)
output.append((skipped_count, first_skipped))
if records[0][0] > first_ip_in_prefix:
output.append(AvailableIPSpace(
size=int(records[0][0] - first_ip_in_prefix),
first_ip=f'{first_ip_in_prefix}/{prefix.mask_length}'
))
# Iterate through existing IPs and annotate free ranges
for ip in ipaddress_list:
# Add IP ranges & addresses, annotating available space in between records
for record in records:
if prev_ip:
diff = int(ip.address.ip - prev_ip.address.ip)
if diff > 1:
first_skipped = '{}/{}'.format(prev_ip.address.ip + 1, prefix.prefixlen)
output.append((diff - 1, first_skipped))
output.append(ip)
prev_ip = ip
# Annotate available space
if (diff := int(record[0]) - int(prev_ip)) > 1:
first_skipped = f'{prev_ip + 1}/{prefix.mask_length}'
output.append(AvailableIPSpace(
size=diff - 1,
first_ip=first_skipped
))
output.append(record[1])
# Update the previous IP address
if hasattr(record[1], 'end_address'):
prev_ip = record[1].end_address.ip
else:
prev_ip = record[0]
# Include any remaining available IPs
if prev_ip.address.ip < last_ip_in_prefix:
skipped_count = int(last_ip_in_prefix - prev_ip.address.ip)
first_skipped = '{}/{}'.format(prev_ip.address.ip + 1, prefix.prefixlen)
output.append((skipped_count, first_skipped))
if prev_ip < last_ip_in_prefix:
output.append(AvailableIPSpace(
size=int(last_ip_in_prefix - prev_ip),
first_ip=f'{prev_ip + 1}/{prefix.mask_length}'
))
return output

View File

@@ -21,7 +21,7 @@ from . import filtersets, forms, tables
from .choices import PrefixStatusChoices
from .constants import *
from .models import *
from .utils import add_requested_prefixes, add_available_ipaddresses, add_available_vlans
from .utils import add_requested_prefixes, add_available_vlans, annotate_ip_space
#
@@ -619,7 +619,7 @@ class PrefixIPRangesView(generic.ObjectChildrenView):
class PrefixIPAddressesView(generic.ObjectChildrenView):
queryset = Prefix.objects.all()
child_model = IPAddress
table = tables.IPAddressTable
table = tables.AnnotatedIPAddressTable
filterset = filtersets.IPAddressFilterSet
filterset_form = forms.IPAddressFilterForm
template_name = 'ipam/prefix/ip_addresses.html'
@@ -635,7 +635,7 @@ class PrefixIPAddressesView(generic.ObjectChildrenView):
def prep_table_data(self, request, queryset, parent):
if not request.GET.get('q') and not get_table_ordering(request, self.table):
return add_available_ipaddresses(parent.prefix, queryset, parent.is_pool)
return annotate_ip_space(parent)
return queryset
def get_extra_context(self, request, instance):

View File

@@ -25,12 +25,19 @@
<th scope="row">{% trans "Size" %}</th>
<td>{{ object.size }}</td>
</tr>
<tr>
<th scope="row">{% trans "Marked Populated" %}</th>
<td>{% checkmark object.mark_populated %}</td>
</tr>
<tr>
<th scope="row">{% trans "Marked Utilized" %}</th>
<td>{% checkmark object.mark_utilized %}</td>
</tr>
<tr>
<th scope="row">{% trans "Utilization" %}</th>
<td>
{% if object.mark_utilized %}
{% utilization_graph 100 warning_threshold=0 danger_threshold=0 %}
<small>({% trans "Marked fully utilized" %})</small>
{% else %}
{% utilization_graph object.utilization %}
{% endif %}