Replace add_available_ipaddresses() with annotate_ip_space()

This commit is contained in:
Jeremy Stretch 2025-04-02 11:30:41 -04:00
parent d82040bdc4
commit ecb56dda14
4 changed files with 77 additions and 42 deletions

View File

@ -383,14 +383,15 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
else: else:
return Prefix.objects.filter(prefix__net_contained=str(self.prefix), vrf=self.vrf) return Prefix.objects.filter(prefix__net_contained=str(self.prefix), vrf=self.vrf)
def get_child_ranges(self): def get_child_ranges(self, **kwargs):
""" """
Return all IPRanges within this Prefix and VRF. Return all IPRanges within this Prefix and VRF.
""" """
return IPRange.objects.filter( return IPRange.objects.filter(
vrf=self.vrf, vrf=self.vrf,
start_address__net_host_contained=str(self.prefix), start_address__net_host_contained=str(self.prefix),
end_address__net_host_contained=str(self.prefix) end_address__net_host_contained=str(self.prefix),
**kwargs
) )
def get_child_ips(self): def get_child_ips(self):

View File

@ -26,12 +26,14 @@ PREFIX_LINK_WITH_DEPTH = """
""" + PREFIX_LINK """ + PREFIX_LINK
IPADDRESS_LINK = """ IPADDRESS_LINK = """
{% if record.pk %} {% if record.address %}
<a href="{{ record.get_absolute_url }}" id="ipaddress_{{ record.pk }}">{{ record.address }}</a> <a href="{{ record.get_absolute_url }}" id="ipaddress_{{ record.pk }}">{{ record.address }}</a>
{% elif record.start_address %}
<a href="{{ record.get_absolute_url }}" id="range_{{ record.pk }}">{{ record }}</a>
{% elif perms.ipam.add_ipaddress %} {% elif perms.ipam.add_ipaddress %}
<a href="{% url 'ipam:ipaddress_add' %}?address={{ record.1 }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.tenant %}&tenant={{ object.tenant.pk }}{% endif %}&return_url={% url 'ipam:prefix_ipaddresses' pk=object.pk %}" class="btn btn-sm btn-success">{% if record.0 <= 65536 %}{{ record.0 }}{% else %}Many{% endif %} IP{{ record.0|pluralize }} available</a> <a href="{% url 'ipam:ipaddress_add' %}?address={{ record.first_ip }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.tenant %}&tenant={{ object.tenant.pk }}{% endif %}&return_url={% url 'ipam:prefix_ipaddresses' pk=object.pk %}" class="btn btn-sm btn-success">{% if record.size <= 65536 %}{{ record.size }}{% else %}Many{% endif %} IP{{ record.size|pluralize }} available</a>
{% else %} {% else %}
{% if record.0 <= 65536 %}{{ record.0 }}{% else %}Many{% endif %} IP{{ record.0|pluralize }} available {% if record.size <= 65536 %}{{ record.size }}{% else %}Many{% endif %} IP{{ record.size|pluralize }} available
{% endif %} {% endif %}
""" """

View File

@ -1,17 +1,25 @@
from dataclasses import dataclass
import netaddr import netaddr
from .constants import * from .constants import *
from .models import Prefix, VLAN from .models import Prefix, VLAN
__all__ = ( __all__ = (
'add_available_ipaddresses', 'AvailableIPSpace',
'add_available_vlans', 'add_available_vlans',
'add_requested_prefixes', 'add_requested_prefixes',
'annotate_ip_space',
'get_next_available_prefix', 'get_next_available_prefix',
'rebuild_prefixes', 'rebuild_prefixes',
) )
@dataclass
class AvailableIPSpace:
size: int
first_ip: str
def add_requested_prefixes(parent, prefix_list, show_available=True, show_assigned=True): def add_requested_prefixes(parent, prefix_list, show_available=True, show_assigned=True):
""" """
Return a list of requested prefixes using show_available, show_assigned filters. If available prefixes are Return a list of requested prefixes using show_available, show_assigned filters. If available prefixes are
@ -42,50 +50,74 @@ def add_requested_prefixes(parent, prefix_list, show_available=True, show_assign
return child_prefixes return child_prefixes
def add_available_ipaddresses(prefix, ipaddress_list, is_pool=False): def annotate_ip_space(prefix):
""" # Compile child objects
Annotate ranges of available IP addresses within a given prefix. If is_pool is True, the first and last IP will be records = []
considered usable (regardless of mask length). records.extend([
""" (iprange.start_address.ip, iprange) for iprange in prefix.get_child_ranges(mark_populated=True)
])
records.extend([
(ip.address.ip, ip) for ip in prefix.get_child_ips()
])
records = sorted(records, key=lambda x: x[0])
# Determine the first & last valid IP addresses in the prefix
if prefix.family == 4 and prefix.mask_length < 31 and not prefix.is_pool:
# Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31
first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first + 1)
last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last - 1)
else:
first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first)
last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last)
if not records:
return [
AvailableIPSpace(
size=int(last_ip_in_prefix - first_ip_in_prefix + 1),
first_ip=f'{first_ip_in_prefix}/{prefix.mask_length}'
)
]
output = [] output = []
prev_ip = None prev_ip = None
# Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31.
if prefix.version == 4 and prefix.prefixlen < 31 and not is_pool:
first_ip_in_prefix = netaddr.IPAddress(prefix.first + 1)
last_ip_in_prefix = netaddr.IPAddress(prefix.last - 1)
else:
first_ip_in_prefix = netaddr.IPAddress(prefix.first)
last_ip_in_prefix = netaddr.IPAddress(prefix.last)
if not ipaddress_list:
return [(
int(last_ip_in_prefix - first_ip_in_prefix + 1),
'{}/{}'.format(first_ip_in_prefix, prefix.prefixlen)
)]
# Account for any available IPs before the first real IP # Account for any available IPs before the first real IP
if ipaddress_list[0].address.ip > first_ip_in_prefix: if records[0][0] > first_ip_in_prefix:
skipped_count = int(ipaddress_list[0].address.ip - first_ip_in_prefix) output.append(AvailableIPSpace(
first_skipped = '{}/{}'.format(first_ip_in_prefix, prefix.prefixlen) size=int(records[0][0] - first_ip_in_prefix),
output.append((skipped_count, first_skipped)) first_ip=f'{first_ip_in_prefix}/{prefix.mask_length}'
))
# Iterate through existing IPs and annotate free ranges # Iterate through existing IPs and annotate free ranges
for ip in ipaddress_list: for record in records:
if prev_ip: if prev_ip:
diff = int(ip.address.ip - prev_ip.address.ip) # We've already listed a range which covers this IP
if diff > 1: if record[0] < prev_ip:
first_skipped = '{}/{}'.format(prev_ip.address.ip + 1, prefix.prefixlen) continue
output.append((diff - 1, first_skipped))
output.append(ip) # Annotate available space
prev_ip = ip if (diff := int(record[0] - prev_ip)) > 1:
first_skipped = f'{prev_ip + 1}/{prefix.mask_length}'
output.append(AvailableIPSpace(
size=diff - 1,
first_ip=first_skipped
))
output.append(record[1])
# Update the previous IP address
if hasattr(record[1], 'end_address'):
prev_ip = record[1].end_address.ip
else:
prev_ip = record[0]
print(f'prev_ip: {prev_ip}')
# Include any remaining available IPs # Include any remaining available IPs
if prev_ip.address.ip < last_ip_in_prefix: if prev_ip < last_ip_in_prefix:
skipped_count = int(last_ip_in_prefix - prev_ip.address.ip) output.append(AvailableIPSpace(
first_skipped = '{}/{}'.format(prev_ip.address.ip + 1, prefix.prefixlen) size=int(last_ip_in_prefix - prev_ip),
output.append((skipped_count, first_skipped)) first_ip=f'{prev_ip + 1}/{prefix.mask_length}'
))
return output return output

View File

@ -21,7 +21,7 @@ from . import filtersets, forms, tables
from .choices import PrefixStatusChoices from .choices import PrefixStatusChoices
from .constants import * from .constants import *
from .models import * from .models import *
from .utils import add_requested_prefixes, add_available_ipaddresses, add_available_vlans from .utils import add_requested_prefixes, add_available_vlans, annotate_ip_space
# #
@ -635,7 +635,7 @@ class PrefixIPAddressesView(generic.ObjectChildrenView):
def prep_table_data(self, request, queryset, parent): def prep_table_data(self, request, queryset, parent):
if not request.GET.get('q') and not get_table_ordering(request, self.table): if not request.GET.get('q') and not get_table_ordering(request, self.table):
return add_available_ipaddresses(parent.prefix, queryset, parent.is_pool) return annotate_ip_space(parent)
return queryset return queryset
def get_extra_context(self, request, instance): def get_extra_context(self, request, instance):