Merge pull request #6760 from netbox-community/834-ip-ranges

Closes #834: Add support for IP ranges
This commit is contained in:
Jeremy Stretch 2021-07-19 09:47:19 -04:00 committed by GitHub
commit 3cfb0bacf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1267 additions and 197 deletions

View File

@ -10,6 +10,7 @@
---
{!docs/models/ipam/iprange.md!}
{!docs/models/ipam/ipaddress.md!}
---

View File

@ -0,0 +1,11 @@
# IP Ranges
This model represents an arbitrary range of individual IPv4 or IPv6 addresses, inclusive of its starting and ending addresses. For instance, the range 192.0.2.10 to 192.0.2.20 has eleven members. (The total member count is available as the `size` property on an IPRange instance.) Like prefixes and IP addresses, each IP range may optionally be assigned to a VRF and/or tenant.
IP also ranges share the same [functional roles](role.md) as prefixes and VLANs, although the assignment of a role is optional. Each IP range must be assigned an operational status, which is one of the following:
* Active - Provisioned and in use
* Reserved - Designated for future use
* Deprecated - No longer in use
The status of a range does _not_ have any impact on its member IP addresses, which may have their statuses modified separately.

181
netbox/ipam/api/mixins.py Normal file
View File

@ -0,0 +1,181 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.db import transaction
from django.shortcuts import get_object_or_404
from django_pglocks import advisory_lock
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from ipam.models import *
from utilities.constants import ADVISORY_LOCK_KEYS
from . import serializers
class AvailablePrefixesMixin:
@swagger_auto_schema(method='get', responses={200: serializers.AvailablePrefixSerializer(many=True)})
@swagger_auto_schema(method='post', responses={201: serializers.PrefixSerializer(many=False)})
@action(detail=True, url_path='available-prefixes', methods=['get', 'post'])
@advisory_lock(ADVISORY_LOCK_KEYS['available-prefixes'])
def available_prefixes(self, request, pk=None):
"""
A convenience method for returning available child prefixes within a parent.
The advisory lock decorator uses a PostgreSQL advisory lock to prevent this API from being
invoked in parallel, which results in a race condition where multiple insertions can occur.
"""
prefix = get_object_or_404(self.queryset, pk=pk)
available_prefixes = prefix.get_available_prefixes()
if request.method == 'POST':
# Validate Requested Prefixes' length
serializer = serializers.PrefixLengthSerializer(
data=request.data if isinstance(request.data, list) else [request.data],
many=True,
context={
'request': request,
'prefix': prefix,
}
)
if not serializer.is_valid():
return Response(
serializer.errors,
status=status.HTTP_400_BAD_REQUEST
)
requested_prefixes = serializer.validated_data
# Allocate prefixes to the requested objects based on availability within the parent
for i, requested_prefix in enumerate(requested_prefixes):
# Find the first available prefix equal to or larger than the requested size
for available_prefix in available_prefixes.iter_cidrs():
if requested_prefix['prefix_length'] >= available_prefix.prefixlen:
allocated_prefix = '{}/{}'.format(available_prefix.network, requested_prefix['prefix_length'])
requested_prefix['prefix'] = allocated_prefix
requested_prefix['vrf'] = prefix.vrf.pk if prefix.vrf else None
break
else:
return Response(
{
"detail": "Insufficient space is available to accommodate the requested prefix size(s)"
},
status=status.HTTP_204_NO_CONTENT
)
# Remove the allocated prefix from the list of available prefixes
available_prefixes.remove(allocated_prefix)
# Initialize the serializer with a list or a single object depending on what was requested
context = {'request': request}
if isinstance(request.data, list):
serializer = serializers.PrefixSerializer(data=requested_prefixes, many=True, context=context)
else:
serializer = serializers.PrefixSerializer(data=requested_prefixes[0], context=context)
# Create the new Prefix(es)
if serializer.is_valid():
try:
with transaction.atomic():
created = serializer.save()
self._validate_objects(created)
except ObjectDoesNotExist:
raise PermissionDenied()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
else:
serializer = serializers.AvailablePrefixSerializer(available_prefixes.iter_cidrs(), many=True, context={
'request': request,
'vrf': prefix.vrf,
})
return Response(serializer.data)
class AvailableIPsMixin:
parent_model = Prefix
@swagger_auto_schema(method='get', responses={200: serializers.AvailableIPSerializer(many=True)})
@swagger_auto_schema(method='post', responses={201: serializers.AvailableIPSerializer(many=True)},
request_body=serializers.AvailableIPSerializer(many=True))
@action(detail=True, url_path='available-ips', methods=['get', 'post'], queryset=IPAddress.objects.all())
@advisory_lock(ADVISORY_LOCK_KEYS['available-ips'])
def available_ips(self, request, pk=None):
"""
A convenience method for returning available IP addresses within a Prefix or IPRange. By default, the number of
IPs returned will be equivalent to PAGINATE_COUNT. An arbitrary limit (up to MAX_PAGE_SIZE, if set) may be
passed, however results will not be paginated.
The advisory lock decorator uses a PostgreSQL advisory lock to prevent this API from being
invoked in parallel, which results in a race condition where multiple insertions can occur.
"""
parent = get_object_or_404(self.parent_model.objects.restrict(request.user), pk=pk)
# Create the next available IP
if request.method == 'POST':
# Normalize to a list of objects
requested_ips = request.data if isinstance(request.data, list) else [request.data]
# Determine if the requested number of IPs is available
available_ips = parent.get_available_ips()
if available_ips.size < len(requested_ips):
return Response(
{
"detail": f"An insufficient number of IP addresses are available within {parent} "
f"({len(requested_ips)} requested, {len(available_ips)} available)"
},
status=status.HTTP_204_NO_CONTENT
)
# Assign addresses from the list of available IPs and copy VRF assignment from the parent
available_ips = iter(available_ips)
for requested_ip in requested_ips:
requested_ip['address'] = f'{next(available_ips)}/{parent.mask_length}'
requested_ip['vrf'] = parent.vrf.pk if parent.vrf else None
# Initialize the serializer with a list or a single object depending on what was requested
context = {'request': request}
if isinstance(request.data, list):
serializer = serializers.IPAddressSerializer(data=requested_ips, many=True, context=context)
else:
serializer = serializers.IPAddressSerializer(data=requested_ips[0], context=context)
# Create the new IP address(es)
if serializer.is_valid():
try:
with transaction.atomic():
created = serializer.save()
self._validate_objects(created)
except ObjectDoesNotExist:
raise PermissionDenied()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# Determine the maximum number of IPs to return
else:
try:
limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT))
except ValueError:
limit = settings.PAGINATE_COUNT
if settings.MAX_PAGE_SIZE:
limit = min(limit, settings.MAX_PAGE_SIZE)
# Calculate available IPs within the parent
ip_list = []
for index, ip in enumerate(parent.get_available_ips(), start=1):
ip_list.append(ip)
if index == limit:
break
serializer = serializers.AvailableIPSerializer(ip_list, many=True, context={
'request': request,
'parent': parent,
})
return Response(serializer.data)

View File

@ -6,6 +6,7 @@ from netbox.api import WritableNestedSerializer
__all__ = [
'NestedAggregateSerializer',
'NestedIPAddressSerializer',
'NestedIPRangeSerializer',
'NestedPrefixSerializer',
'NestedRIRSerializer',
'NestedRoleSerializer',
@ -109,6 +110,19 @@ class NestedPrefixSerializer(WritableNestedSerializer):
fields = ['id', 'url', 'display', 'family', 'prefix', '_depth']
#
# IP ranges
#
class NestedIPRangeSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:iprange-detail')
family = serializers.IntegerField(read_only=True)
class Meta:
model = models.IPRange
fields = ['id', 'url', 'display', 'family', 'start_address', 'end_address']
#
# IP addresses
#

View File

@ -8,7 +8,7 @@ from rest_framework.validators import UniqueTogetherValidator
from dcim.api.nested_serializers import NestedDeviceSerializer, NestedSiteSerializer
from ipam.choices import *
from ipam.constants import IPADDRESS_ASSIGNMENT_MODELS, VLANGROUP_SCOPE_TYPES
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from ipam.models import *
from netbox.api import ChoiceField, ContentTypeField, SerializedPKRelatedField
from netbox.api.serializers import OrganizationalModelSerializer
from netbox.api.serializers import PrimaryModelSerializer
@ -255,6 +255,28 @@ class AvailablePrefixSerializer(serializers.Serializer):
])
#
# IP ranges
#
class IPRangeSerializer(PrimaryModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:iprange-detail')
family = ChoiceField(choices=IPAddressFamilyChoices, read_only=True)
vrf = NestedVRFSerializer(required=False, allow_null=True)
tenant = NestedTenantSerializer(required=False, allow_null=True)
status = ChoiceField(choices=IPRangeStatusChoices, required=False)
role = NestedRoleSerializer(required=False, allow_null=True)
children = serializers.IntegerField(read_only=True)
class Meta:
model = IPRange
fields = [
'id', 'url', 'display', 'family', 'start_address', 'end_address', 'size', 'vrf', 'tenant', 'status', 'role',
'description', 'tags', 'custom_fields', 'created', 'last_updated', 'children',
]
read_only_fields = ['family']
#
# IP addresses
#
@ -307,9 +329,9 @@ class AvailableIPSerializer(serializers.Serializer):
else:
vrf = None
return OrderedDict([
('family', self.context['prefix'].version),
('address', '{}/{}'.format(instance, self.context['prefix'].prefixlen)),
('vrf', vrf),
('family', self.context['parent'].family),
('address', f"{instance}/{self.context['parent'].mask_length}"),
('vrf', self.context['parent'].vrf),
])

View File

@ -21,6 +21,9 @@ router.register('aggregates', views.AggregateViewSet)
router.register('roles', views.RoleViewSet)
router.register('prefixes', views.PrefixViewSet)
# IP ranges
router.register('ip-ranges', views.IPRangeViewSet)
# IP addresses
router.register('ip-addresses', views.IPAddressViewSet)

View File

@ -1,21 +1,11 @@
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.db import transaction
from django.shortcuts import get_object_or_404
from django_pglocks import advisory_lock
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.routers import APIRootView
from extras.api.views import CustomFieldModelViewSet
from ipam import filtersets
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from ipam.models import *
from netbox.api.views import ModelViewSet
from utilities.constants import ADVISORY_LOCK_KEYS
from utilities.utils import count_related
from . import serializers
from . import mixins, serializers
class IPAMRootView(APIRootView):
@ -90,180 +80,31 @@ class RoleViewSet(CustomFieldModelViewSet):
# Prefixes
#
class PrefixViewSet(CustomFieldModelViewSet):
class PrefixViewSet(mixins.AvailableIPsMixin, mixins.AvailablePrefixesMixin, CustomFieldModelViewSet):
queryset = Prefix.objects.prefetch_related(
'site', 'vrf__tenant', 'tenant', 'vlan', 'role', 'tags'
)
serializer_class = serializers.PrefixSerializer
filterset_class = filtersets.PrefixFilterSet
parent_model = Prefix # AvailableIPsMixin
def get_serializer_class(self):
if self.action == "available_prefixes" and self.request.method == "POST":
return serializers.PrefixLengthSerializer
return super().get_serializer_class()
@swagger_auto_schema(method='get', responses={200: serializers.AvailablePrefixSerializer(many=True)})
@swagger_auto_schema(method='post', responses={201: serializers.PrefixSerializer(many=False)})
@action(detail=True, url_path='available-prefixes', methods=['get', 'post'])
@advisory_lock(ADVISORY_LOCK_KEYS['available-prefixes'])
def available_prefixes(self, request, pk=None):
"""
A convenience method for returning available child prefixes within a parent.
The advisory lock decorator uses a PostgreSQL advisory lock to prevent this API from being
invoked in parallel, which results in a race condition where multiple insertions can occur.
"""
prefix = get_object_or_404(self.queryset, pk=pk)
available_prefixes = prefix.get_available_prefixes()
#
# IP ranges
#
if request.method == 'POST':
class IPRangeViewSet(mixins.AvailableIPsMixin, CustomFieldModelViewSet):
queryset = IPRange.objects.prefetch_related('vrf', 'role', 'tenant', 'tags')
serializer_class = serializers.IPRangeSerializer
filterset_class = filtersets.IPRangeFilterSet
# Validate Requested Prefixes' length
serializer = serializers.PrefixLengthSerializer(
data=request.data if isinstance(request.data, list) else [request.data],
many=True,
context={
'request': request,
'prefix': prefix,
}
)
if not serializer.is_valid():
return Response(
serializer.errors,
status=status.HTTP_400_BAD_REQUEST
)
requested_prefixes = serializer.validated_data
# Allocate prefixes to the requested objects based on availability within the parent
for i, requested_prefix in enumerate(requested_prefixes):
# Find the first available prefix equal to or larger than the requested size
for available_prefix in available_prefixes.iter_cidrs():
if requested_prefix['prefix_length'] >= available_prefix.prefixlen:
allocated_prefix = '{}/{}'.format(available_prefix.network, requested_prefix['prefix_length'])
requested_prefix['prefix'] = allocated_prefix
requested_prefix['vrf'] = prefix.vrf.pk if prefix.vrf else None
break
else:
return Response(
{
"detail": "Insufficient space is available to accommodate the requested prefix size(s)"
},
status=status.HTTP_204_NO_CONTENT
)
# Remove the allocated prefix from the list of available prefixes
available_prefixes.remove(allocated_prefix)
# Initialize the serializer with a list or a single object depending on what was requested
context = {'request': request}
if isinstance(request.data, list):
serializer = serializers.PrefixSerializer(data=requested_prefixes, many=True, context=context)
else:
serializer = serializers.PrefixSerializer(data=requested_prefixes[0], context=context)
# Create the new Prefix(es)
if serializer.is_valid():
try:
with transaction.atomic():
created = serializer.save()
self._validate_objects(created)
except ObjectDoesNotExist:
raise PermissionDenied()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
else:
serializer = serializers.AvailablePrefixSerializer(available_prefixes.iter_cidrs(), many=True, context={
'request': request,
'vrf': prefix.vrf,
})
return Response(serializer.data)
@swagger_auto_schema(method='get', responses={200: serializers.AvailableIPSerializer(many=True)})
@swagger_auto_schema(method='post', responses={201: serializers.AvailableIPSerializer(many=True)},
request_body=serializers.AvailableIPSerializer(many=True))
@action(detail=True, url_path='available-ips', methods=['get', 'post'], queryset=IPAddress.objects.all())
@advisory_lock(ADVISORY_LOCK_KEYS['available-ips'])
def available_ips(self, request, pk=None):
"""
A convenience method for returning available IP addresses within a prefix. By default, the number of IPs
returned will be equivalent to PAGINATE_COUNT. An arbitrary limit (up to MAX_PAGE_SIZE, if set) may be passed,
however results will not be paginated.
The advisory lock decorator uses a PostgreSQL advisory lock to prevent this API from being
invoked in parallel, which results in a race condition where multiple insertions can occur.
"""
prefix = get_object_or_404(Prefix.objects.restrict(request.user), pk=pk)
# Create the next available IP within the prefix
if request.method == 'POST':
# Normalize to a list of objects
requested_ips = request.data if isinstance(request.data, list) else [request.data]
# Determine if the requested number of IPs is available
available_ips = prefix.get_available_ips()
if available_ips.size < len(requested_ips):
return Response(
{
"detail": "An insufficient number of IP addresses are available within the prefix {} ({} "
"requested, {} available)".format(prefix, len(requested_ips), len(available_ips))
},
status=status.HTTP_204_NO_CONTENT
)
# Assign addresses from the list of available IPs and copy VRF assignment from the parent prefix
available_ips = iter(available_ips)
prefix_length = prefix.prefix.prefixlen
for requested_ip in requested_ips:
requested_ip['address'] = '{}/{}'.format(next(available_ips), prefix_length)
requested_ip['vrf'] = prefix.vrf.pk if prefix.vrf else None
# Initialize the serializer with a list or a single object depending on what was requested
context = {'request': request}
if isinstance(request.data, list):
serializer = serializers.IPAddressSerializer(data=requested_ips, many=True, context=context)
else:
serializer = serializers.IPAddressSerializer(data=requested_ips[0], context=context)
# Create the new IP address(es)
if serializer.is_valid():
try:
with transaction.atomic():
created = serializer.save()
self._validate_objects(created)
except ObjectDoesNotExist:
raise PermissionDenied()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# Determine the maximum number of IPs to return
else:
try:
limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT))
except ValueError:
limit = settings.PAGINATE_COUNT
if settings.MAX_PAGE_SIZE:
limit = min(limit, settings.MAX_PAGE_SIZE)
# Calculate available IPs within the prefix
ip_list = []
for index, ip in enumerate(prefix.get_available_ips(), start=1):
ip_list.append(ip)
if index == limit:
break
serializer = serializers.AvailableIPSerializer(ip_list, many=True, context={
'request': request,
'prefix': prefix.prefix,
'vrf': prefix.vrf,
})
return Response(serializer.data)
parent_model = IPRange # AvailableIPsMixin
#

View File

@ -39,7 +39,30 @@ class PrefixStatusChoices(ChoiceSet):
#
# IPAddresses
# IP Ranges
#
class IPRangeStatusChoices(ChoiceSet):
STATUS_ACTIVE = 'active'
STATUS_RESERVED = 'reserved'
STATUS_DEPRECATED = 'deprecated'
CHOICES = (
(STATUS_ACTIVE, 'Active'),
(STATUS_RESERVED, 'Reserved'),
(STATUS_DEPRECATED, 'Deprecated'),
)
CSS_CLASSES = {
STATUS_ACTIVE: 'primary',
STATUS_RESERVED: 'info',
STATUS_DEPRECATED: 'danger',
}
#
# IP Addresses
#
class IPAddressStatusChoices(ChoiceSet):

View File

@ -14,12 +14,13 @@ from utilities.filters import (
)
from virtualization.models import VirtualMachine, VMInterface
from .choices import *
from .models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from .models import *
__all__ = (
'AggregateFilterSet',
'IPAddressFilterSet',
'IPRangeFilterSet',
'PrefixFilterSet',
'RIRFilterSet',
'RoleFilterSet',
@ -375,6 +376,73 @@ class PrefixFilterSet(PrimaryModelFilterSet, TenancyFilterSet):
)
class IPRangeFilterSet(TenancyFilterSet, PrimaryModelFilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
family = django_filters.NumberFilter(
field_name='start_address',
lookup_expr='family'
)
contains = django_filters.CharFilter(
method='search_contains',
label='Ranges which contain this prefix or IP',
)
vrf_id = django_filters.ModelMultipleChoiceFilter(
queryset=VRF.objects.all(),
label='VRF',
)
vrf = django_filters.ModelMultipleChoiceFilter(
field_name='vrf__rd',
queryset=VRF.objects.all(),
to_field_name='rd',
label='VRF (RD)',
)
role_id = django_filters.ModelMultipleChoiceFilter(
queryset=Role.objects.all(),
label='Role (ID)',
)
role = django_filters.ModelMultipleChoiceFilter(
field_name='role__slug',
queryset=Role.objects.all(),
to_field_name='slug',
label='Role (slug)',
)
status = django_filters.MultipleChoiceFilter(
choices=IPRangeStatusChoices,
null_value=None
)
tag = TagFilter()
class Meta:
model = IPRange
fields = ['id']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(description__icontains=value)
try:
ipaddress = str(netaddr.IPNetwork(value.strip()).cidr)
qs_filter |= Q(start_address=ipaddress)
qs_filter |= Q(end_address=ipaddress)
except (AddrFormatError, ValueError):
pass
return queryset.filter(qs_filter)
def search_contains(self, queryset, name, value):
value = value.strip()
if not value:
return queryset
try:
# Strip mask
ipaddress = netaddr.IPNetwork(value)
return queryset.filter(start_address__lte=ipaddress, end_address__gte=ipaddress)
except (AddrFormatError, ValueError):
return queryset.none()
class IPAddressFilterSet(PrimaryModelFilterSet, TenancyFilterSet):
q = django_filters.CharFilter(
method='search',

View File

@ -18,7 +18,7 @@ from utilities.forms import (
from virtualization.models import Cluster, ClusterGroup, VirtualMachine, VMInterface
from .choices import *
from .constants import *
from .models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from .models import *
PREFIX_MASK_LENGTH_CHOICES = add_blank_choice([
(i, i) for i in range(PREFIX_LENGTH_MIN, PREFIX_LENGTH_MAX + 1)
@ -696,6 +696,144 @@ class PrefixFilterForm(BootstrapMixin, TenancyFilterForm, CustomFieldModelFilter
tag = TagFilterField(model)
#
# IP ranges
#
class IPRangeForm(BootstrapMixin, TenancyForm, CustomFieldModelForm):
vrf = DynamicModelChoiceField(
queryset=VRF.objects.all(),
required=False,
label='VRF'
)
role = DynamicModelChoiceField(
queryset=Role.objects.all(),
required=False
)
tags = DynamicModelMultipleChoiceField(
queryset=Tag.objects.all(),
required=False
)
class Meta:
model = IPRange
fields = [
'vrf', 'start_address', 'end_address', 'status', 'role', 'description', 'tenant_group', 'tenant', 'tags',
]
fieldsets = (
('IP Range', ('vrf', 'start_address', 'end_address', 'role', 'status', 'description', 'tags')),
('Tenancy', ('tenant_group', 'tenant')),
)
widgets = {
'status': StaticSelect2(),
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['vrf'].empty_label = 'Global'
class IPRangeCSVForm(CustomFieldModelCSVForm):
vrf = CSVModelChoiceField(
queryset=VRF.objects.all(),
to_field_name='name',
required=False,
help_text='Assigned VRF'
)
tenant = CSVModelChoiceField(
queryset=Tenant.objects.all(),
required=False,
to_field_name='name',
help_text='Assigned tenant'
)
status = CSVChoiceField(
choices=IPRangeStatusChoices,
help_text='Operational status'
)
role = CSVModelChoiceField(
queryset=Role.objects.all(),
required=False,
to_field_name='name',
help_text='Functional role'
)
class Meta:
model = IPRange
fields = (
'start_address', 'end_address', 'vrf', 'tenant', 'status', 'role', 'description',
)
class IPRangeBulkEditForm(BootstrapMixin, AddRemoveTagsForm, CustomFieldModelBulkEditForm):
pk = forms.ModelMultipleChoiceField(
queryset=IPRange.objects.all(),
widget=forms.MultipleHiddenInput()
)
vrf = DynamicModelChoiceField(
queryset=VRF.objects.all(),
required=False,
label='VRF'
)
tenant = DynamicModelChoiceField(
queryset=Tenant.objects.all(),
required=False
)
status = forms.ChoiceField(
choices=add_blank_choice(IPRangeStatusChoices),
required=False,
widget=StaticSelect2()
)
role = DynamicModelChoiceField(
queryset=Role.objects.all(),
required=False
)
description = forms.CharField(
max_length=100,
required=False
)
class Meta:
nullable_fields = [
'vrf', 'tenant', 'role', 'description',
]
class IPRangeFilterForm(BootstrapMixin, TenancyFilterForm, CustomFieldModelFilterForm):
model = IPRange
field_order = [
'family', 'vrf_id', 'status', 'role_id', 'tenant_group_id', 'tenant_id',
]
field_groups = [
['family', 'vrf_id', 'status', 'role_id'],
['tenant_group_id', 'tenant_id', 'tag'],
]
family = forms.ChoiceField(
required=False,
choices=add_blank_choice(IPAddressFamilyChoices),
label=_('Address family'),
widget=StaticSelect2()
)
vrf_id = DynamicModelMultipleChoiceField(
queryset=VRF.objects.all(),
required=False,
label=_('Assigned VRF'),
null_option='Global'
)
status = forms.MultipleChoiceField(
choices=PrefixStatusChoices,
required=False,
widget=StaticSelect2Multiple()
)
role_id = DynamicModelMultipleChoiceField(
queryset=Role.objects.all(),
required=False,
null_option='None',
label=_('Role')
)
tag = TagFilterField(model)
#
# IP addresses
#

View File

@ -11,6 +11,9 @@ class IPAMQuery(graphene.ObjectType):
ip_address = ObjectField(IPAddressType)
ip_address_list = ObjectListField(IPAddressType)
ip_range = ObjectField(IPRangeType)
ip_range_list = ObjectListField(IPRangeType)
prefix = ObjectField(PrefixType)
prefix_list = ObjectListField(PrefixType)

View File

@ -4,6 +4,7 @@ from netbox.graphql.types import ObjectType, TaggedObjectType
__all__ = (
'AggregateType',
'IPAddressType',
'IPRangeType',
'PrefixType',
'RIRType',
'RoleType',
@ -34,6 +35,17 @@ class IPAddressType(TaggedObjectType):
return self.role or None
class IPRangeType(TaggedObjectType):
class Meta:
model = models.IPRange
fields = '__all__'
filterset_class = filtersets.IPRangeFilterSet
def resolve_role(self, info):
return self.role or None
class PrefixType(TaggedObjectType):
class Meta:

View File

@ -0,0 +1,43 @@
# Generated by Django 3.2.5 on 2021-07-16 14:15
import django.core.serializers.json
from django.db import migrations, models
import django.db.models.deletion
import django.db.models.expressions
import ipam.fields
import taggit.managers
class Migration(migrations.Migration):
dependencies = [
('extras', '0061_extras_change_logging'),
('tenancy', '0001_squashed_0012'),
('ipam', '0049_prefix_mark_utilized'),
]
operations = [
migrations.CreateModel(
name='IPRange',
fields=[
('created', models.DateField(auto_now_add=True, null=True)),
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder)),
('id', models.BigAutoField(primary_key=True, serialize=False)),
('start_address', ipam.fields.IPAddressField()),
('end_address', ipam.fields.IPAddressField()),
('size', models.PositiveIntegerField(editable=False)),
('status', models.CharField(default='active', max_length=50)),
('description', models.CharField(blank=True, max_length=200)),
('role', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='ip_ranges', to='ipam.role')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),
('tenant', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='ip_ranges', to='tenancy.tenant')),
('vrf', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='ip_ranges', to='ipam.vrf')),
],
options={
'verbose_name': 'IP range',
'verbose_name_plural': 'IP ranges',
'ordering': (django.db.models.expressions.OrderBy(django.db.models.expressions.F('vrf'), nulls_first=True), 'start_address', 'pk'),
},
),
]

View File

@ -6,6 +6,7 @@ from .vrfs import *
__all__ = (
'Aggregate',
'IPAddress',
'IPRange',
'Prefix',
'RIR',
'Role',

View File

@ -4,8 +4,9 @@ from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import F
from django.db.models import F, Q
from django.urls import reverse
from django.utils.functional import cached_property
from dcim.models import Device
from extras.utils import extras_features
@ -23,6 +24,7 @@ from virtualization.models import VirtualMachine
__all__ = (
'Aggregate',
'IPAddress',
'IPRange',
'Prefix',
'RIR',
'Role',
@ -333,9 +335,11 @@ class Prefix(PrimaryModel):
@property
def family(self):
if self.prefix:
return self.prefix.version
return None
return self.prefix.version if self.prefix else None
@property
def mask_length(self):
return self.prefix.prefixlen if self.prefix else None
@property
def depth(self):
@ -475,6 +479,195 @@ class Prefix(PrimaryModel):
return int(float(child_count) / prefix_size * 100)
@extras_features('custom_fields', 'custom_links', 'export_templates', 'tags', 'webhooks')
class IPRange(PrimaryModel):
"""
A range of IP addresses, defined by start and end addresses.
"""
start_address = IPAddressField(
help_text='IPv4 or IPv6 address (with mask)'
)
end_address = IPAddressField(
help_text='IPv4 or IPv6 address (with mask)'
)
size = models.PositiveIntegerField(
editable=False
)
vrf = models.ForeignKey(
to='ipam.VRF',
on_delete=models.PROTECT,
related_name='ip_ranges',
blank=True,
null=True,
verbose_name='VRF'
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='ip_ranges',
blank=True,
null=True
)
status = models.CharField(
max_length=50,
choices=IPRangeStatusChoices,
default=IPRangeStatusChoices.STATUS_ACTIVE,
help_text='Operational status of this range'
)
role = models.ForeignKey(
to='ipam.Role',
on_delete=models.SET_NULL,
related_name='ip_ranges',
blank=True,
null=True,
help_text='The primary function of this range'
)
description = models.CharField(
max_length=200,
blank=True
)
objects = RestrictedQuerySet.as_manager()
clone_fields = [
'vrf', 'tenant', 'status', 'role', 'description',
]
class Meta:
ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique
verbose_name = 'IP range'
verbose_name_plural = 'IP ranges'
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('ipam:iprange', args=[self.pk])
def clean(self):
super().clean()
if self.start_address and self.end_address:
# Check that start & end IP versions match
if self.start_address.version != self.end_address.version:
raise ValidationError({
'end_address': f"Ending address version (IPv{self.end_address.version}) does not match starting "
f"address (IPv{self.start_address.version})"
})
# Check that the start & end IP prefix lengths match
if self.start_address.prefixlen != self.end_address.prefixlen:
raise ValidationError({
'end_address': f"Ending address mask (/{self.end_address.prefixlen}) does not match starting "
f"address mask (/{self.start_address.prefixlen})"
})
# Check that the ending address is greater than the starting address
if not self.end_address > self.start_address:
raise ValidationError({
'end_address': f"Ending address must be lower than the starting address ({self.start_address})"
})
# Check for overlapping ranges
overlapping_range = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter(
Q(start_address__gte=self.start_address, start_address__lte=self.end_address) | # Starts inside
Q(end_address__gte=self.start_address, end_address__lte=self.end_address) | # Ends inside
Q(start_address__lte=self.start_address, end_address__gte=self.end_address) # Starts & ends outside
).first()
if overlapping_range:
raise ValidationError(f"Defined addresses overlap with range {overlapping_range} in VRF {self.vrf}")
def save(self, *args, **kwargs):
# Record the range's size (number of IP addresses)
self.size = int(self.end_address.ip - self.start_address.ip) + 1
super().save(*args, **kwargs)
@property
def family(self):
return self.start_address.version if self.start_address else None
@property
def mask_length(self):
return self.start_address.prefixlen if self.start_address else None
@cached_property
def name(self):
"""
Return an efficient string representation of the IP range.
"""
separator = ':' if self.family == 6 else '.'
start_chunks = str(self.start_address.ip).split(separator)
end_chunks = str(self.end_address.ip).split(separator)
base_chunks = []
for a, b in zip(start_chunks, end_chunks):
if a == b:
base_chunks.append(a)
base_str = separator.join(base_chunks)
start_str = separator.join(start_chunks[len(base_chunks):])
end_str = separator.join(end_chunks[len(base_chunks):])
return f'{base_str}{separator}{start_str}-{end_str}/{self.start_address.prefixlen}'
def _set_prefix_length(self, value):
"""
Expose the IPRange object's prefixlen attribute on the parent model so that it can be manipulated directly,
e.g. for bulk editing.
"""
self.start_address.prefixlen = value
self.end_address.prefixlen = value
prefix_length = property(fset=_set_prefix_length)
def get_status_class(self):
return IPRangeStatusChoices.CSS_CLASSES.get(self.status)
def get_child_ips(self):
"""
Return all IPAddresses within this IPRange and VRF.
"""
return IPAddress.objects.filter(
address__gte=self.start_address,
address__lte=self.end_address,
vrf=self.vrf
)
def get_available_ips(self):
"""
Return all available IPs within this range as an IPSet.
"""
range = netaddr.IPRange(self.start_address.ip, self.end_address.ip)
child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
return netaddr.IPSet(range) - child_ips
@cached_property
def first_available_ip(self):
"""
Return the first available IP within the range (or None).
"""
available_ips = self.get_available_ips()
if not available_ips:
return None
return '{}/{}'.format(next(available_ips.__iter__()), self.start_address.prefixlen)
@cached_property
def utilization(self):
"""
Determine the utilization of the range and return it as a percentage.
"""
# Compile an IPSet to avoid counting duplicate IPs
child_count = netaddr.IPSet([
ip.address.ip for ip in self.get_child_ips()
]).size
return int(float(child_count) / self.size * 100)
@extras_features('custom_fields', 'custom_links', 'export_templates', 'tags', 'webhooks')
class IPAddress(PrimaryModel):
"""

View File

@ -9,7 +9,7 @@ from utilities.tables import (
ToggleColumn, UtilizationColumn,
)
from virtualization.models import VMInterface
from .models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from .models import *
AVAILABLE_LABEL = mark_safe('<span class="badge bg-success">Available</span>')
@ -351,6 +351,39 @@ class PrefixDetailTable(PrefixTable):
)
#
# IP ranges
#
class IPRangeTable(BaseTable):
pk = ToggleColumn()
start_address = tables.Column(
linkify=True
)
vrf = tables.TemplateColumn(
template_code=VRF_LINK,
verbose_name='VRF'
)
status = ChoiceFieldColumn(
default=AVAILABLE_LABEL
)
role = tables.TemplateColumn(
template_code=PREFIX_ROLE_LINK
)
tenant = TenantColumn()
class Meta(BaseTable.Meta):
model = IPRange
fields = (
'pk', 'start_address', 'end_address', 'size', 'vrf', 'status', 'role', 'tenant', 'description',
)
default_columns = (
'pk', 'start_address', 'end_address', 'size', 'vrf', 'status', 'role', 'tenant', 'description',
)
row_attrs = {
'class': lambda record: 'success' if not record.pk else '',
}
#
# IPAddresses
#

View File

@ -6,7 +6,7 @@ from rest_framework import status
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from ipam.choices import *
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from ipam.models import *
from utilities.testing import APITestCase, APIViewTestCases, disable_warnings
@ -305,6 +305,7 @@ class PrefixTest(APIViewTestCases.APIViewTestCase):
# Retrieve all available IPs
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data), 8) # 8 because prefix.is_pool = True
# Change the prefix to not be a pool and try again
@ -358,6 +359,105 @@ class PrefixTest(APIViewTestCases.APIViewTestCase):
self.assertEqual(len(response.data), 8)
class IPRangeTest(APIViewTestCases.APIViewTestCase):
model = IPRange
brief_fields = ['display', 'end_address', 'family', 'id', 'start_address', 'url']
create_data = [
{
'start_address': '192.168.4.10/24',
'end_address': '192.168.4.50/24',
},
{
'start_address': '192.168.5.10/24',
'end_address': '192.168.5.50/24',
},
{
'start_address': '192.168.6.10/24',
'end_address': '192.168.6.50/24',
},
]
bulk_update_data = {
'description': 'New description',
}
@classmethod
def setUpTestData(cls):
ip_ranges = (
IPRange(start_address=IPNetwork('192.168.1.10/24'), end_address=IPNetwork('192.168.1.50/24'), size=51),
IPRange(start_address=IPNetwork('192.168.2.10/24'), end_address=IPNetwork('192.168.2.50/24'), size=51),
IPRange(start_address=IPNetwork('192.168.3.10/24'), end_address=IPNetwork('192.168.3.50/24'), size=51),
)
IPRange.objects.bulk_create(ip_ranges)
def test_list_available_ips(self):
"""
Test retrieval of all available IP addresses within a parent IP range.
"""
iprange = IPRange.objects.create(
start_address=IPNetwork('192.0.2.10/24'),
end_address=IPNetwork('192.0.2.19/24')
)
url = reverse('ipam-api:iprange-available-ips', kwargs={'pk': iprange.pk})
self.add_permissions('ipam.view_iprange', 'ipam.view_ipaddress')
# Retrieve all available IPs
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data), 10)
def test_create_single_available_ip(self):
"""
Test retrieval of the first available IP address within a parent IP range.
"""
vrf = VRF.objects.create(name='Test VRF 1', rd='1234')
iprange = IPRange.objects.create(
start_address=IPNetwork('192.0.2.1/24'),
end_address=IPNetwork('192.0.2.3/24'),
vrf=vrf
)
url = reverse('ipam-api:iprange-available-ips', kwargs={'pk': iprange.pk})
self.add_permissions('ipam.view_iprange', 'ipam.add_ipaddress')
# Create all three available IPs with individual requests
for i in range(1, 4):
data = {
'description': f'Test IP #{i}'
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['vrf']['id'], vrf.pk)
self.assertEqual(response.data['description'], data['description'])
# Try to create one more IP
response = self.client.post(url, {}, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertIn('detail', response.data)
def test_create_multiple_available_ips(self):
"""
Test the creation of available IP addresses within a parent IP range.
"""
iprange = IPRange.objects.create(
start_address=IPNetwork('192.0.2.1/24'),
end_address=IPNetwork('192.0.2.8/24')
)
url = reverse('ipam-api:iprange-available-ips', kwargs={'pk': iprange.pk})
self.add_permissions('ipam.view_iprange', 'ipam.add_ipaddress')
# Try to create nine IPs (only eight are available)
data = [{'description': f'Test IP #{i}'} for i in range(1, 10)] # 9 IPs
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertIn('detail', response.data)
# Create all eight available IPs in a single request
data = [{'description': f'Test IP #{i}'} for i in range(1, 9)] # 8 IPs
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(len(response.data), 8)
class IPAddressTest(APIViewTestCases.APIViewTestCase):
model = IPAddress
brief_fields = ['address', 'display', 'family', 'id', 'url']

View File

@ -3,7 +3,7 @@ from django.test import TestCase
from dcim.models import Device, DeviceRole, DeviceType, Interface, Location, Manufacturer, Rack, Region, Site, SiteGroup
from ipam.choices import *
from ipam.filtersets import *
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from ipam.models import *
from utilities.testing import ChangeLoggedFilterSetTests
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine, VMInterface
from tenancy.models import Tenant, TenantGroup
@ -524,6 +524,97 @@ class PrefixTestCase(TestCase, ChangeLoggedFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
class IPRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = IPRange.objects.all()
filterset = IPRangeFilterSet
@classmethod
def setUpTestData(cls):
vrfs = (
VRF(name='VRF 1', rd='65000:100'),
VRF(name='VRF 2', rd='65000:200'),
VRF(name='VRF 3', rd='65000:300'),
)
VRF.objects.bulk_create(vrfs)
roles = (
Role(name='Role 1', slug='role-1'),
Role(name='Role 2', slug='role-2'),
Role(name='Role 3', slug='role-3'),
)
Role.objects.bulk_create(roles)
tenant_groups = (
TenantGroup(name='Tenant group 1', slug='tenant-group-1'),
TenantGroup(name='Tenant group 2', slug='tenant-group-2'),
TenantGroup(name='Tenant group 3', slug='tenant-group-3'),
)
for tenantgroup in tenant_groups:
tenantgroup.save()
tenants = (
Tenant(name='Tenant 1', slug='tenant-1', group=tenant_groups[0]),
Tenant(name='Tenant 2', slug='tenant-2', group=tenant_groups[1]),
Tenant(name='Tenant 3', slug='tenant-3', group=tenant_groups[2]),
)
Tenant.objects.bulk_create(tenants)
ip_ranges = (
IPRange(start_address='10.0.1.100/24', end_address='10.0.1.199/24', size=100, vrf=None, tenant=None, role=None, status=IPRangeStatusChoices.STATUS_ACTIVE),
IPRange(start_address='10.0.2.100/24', end_address='10.0.2.199/24', size=100, vrf=vrfs[0], tenant=tenants[0], role=roles[0], status=IPRangeStatusChoices.STATUS_ACTIVE),
IPRange(start_address='10.0.3.100/24', end_address='10.0.3.199/24', size=100, vrf=vrfs[1], tenant=tenants[1], role=roles[1], status=IPRangeStatusChoices.STATUS_DEPRECATED),
IPRange(start_address='10.0.4.100/24', end_address='10.0.4.199/24', size=100, vrf=vrfs[2], tenant=tenants[2], role=roles[2], status=IPRangeStatusChoices.STATUS_RESERVED),
IPRange(start_address='2001:db8:0:1::1/64', end_address='2001:db8:0:1::100/64', size=100, vrf=None, tenant=None, role=None, status=IPRangeStatusChoices.STATUS_ACTIVE),
IPRange(start_address='2001:db8:0:2::1/64', end_address='2001:db8:0:2::100/64', size=100, vrf=vrfs[0], tenant=tenants[0], role=roles[0], status=IPRangeStatusChoices.STATUS_ACTIVE),
IPRange(start_address='2001:db8:0:3::1/64', end_address='2001:db8:0:3::100/64', size=100, vrf=vrfs[1], tenant=tenants[1], role=roles[1], status=IPRangeStatusChoices.STATUS_DEPRECATED),
IPRange(start_address='2001:db8:0:4::1/64', end_address='2001:db8:0:4::100/64', size=100, vrf=vrfs[2], tenant=tenants[2], role=roles[2], status=IPRangeStatusChoices.STATUS_RESERVED),
)
IPRange.objects.bulk_create(ip_ranges)
def test_family(self):
params = {'family': '6'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_contains(self):
params = {'contains': '10.0.1.150/24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'contains': '2001:db8:0:1::50/64'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_vrf(self):
vrfs = VRF.objects.all()[:2]
params = {'vrf_id': [vrfs[0].pk, vrfs[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'vrf': [vrfs[0].rd, vrfs[1].rd]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_role(self):
roles = Role.objects.all()[:2]
params = {'role_id': [roles[0].pk, roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'role': [roles[0].slug, roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_status(self):
params = {'status': [PrefixStatusChoices.STATUS_DEPRECATED, PrefixStatusChoices.STATUS_RESERVED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_tenant(self):
tenants = Tenant.objects.all()[:2]
params = {'tenant_id': [tenants[0].pk, tenants[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'tenant': [tenants[0].slug, tenants[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_tenant_group(self):
tenant_groups = TenantGroup.objects.all()[:2]
params = {'tenant_group_id': [tenant_groups[0].pk, tenant_groups[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'tenant_group': [tenant_groups[0].slug, tenant_groups[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
class IPAddressTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = IPAddress.objects.all()
filterset = IPAddressFilterSet

View File

@ -4,7 +4,7 @@ from netaddr import IPNetwork
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from ipam.choices import *
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from ipam.models import *
from tenancy.models import Tenant
from utilities.testing import ViewTestCases, create_tags
@ -259,6 +259,64 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
}
class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
model = IPRange
@classmethod
def setUpTestData(cls):
vrfs = (
VRF(name='VRF 1', rd='65000:1'),
VRF(name='VRF 2', rd='65000:2'),
)
VRF.objects.bulk_create(vrfs)
roles = (
Role(name='Role 1', slug='role-1'),
Role(name='Role 2', slug='role-2'),
)
Role.objects.bulk_create(roles)
ip_ranges = (
IPRange(start_address='192.168.0.10/24', end_address='192.168.0.100/24', size=91),
IPRange(start_address='192.168.1.10/24', end_address='192.168.1.100/24', size=91),
IPRange(start_address='192.168.2.10/24', end_address='192.168.2.100/24', size=91),
IPRange(start_address='192.168.3.10/24', end_address='192.168.3.100/24', size=91),
IPRange(start_address='192.168.4.10/24', end_address='192.168.4.100/24', size=91),
)
IPRange.objects.bulk_create(ip_ranges)
tags = create_tags('Alpha', 'Bravo', 'Charlie')
cls.form_data = {
'start_address': IPNetwork('192.0.5.10/24'),
'end_address': IPNetwork('192.0.5.100/24'),
'vrf': vrfs[1].pk,
'tenant': None,
'vlan': None,
'status': IPRangeStatusChoices.STATUS_RESERVED,
'role': roles[1].pk,
'is_pool': True,
'description': 'A new IP range',
'tags': [t.pk for t in tags],
}
cls.csv_data = (
"vrf,start_address,end_address,status",
"VRF 1,10.1.0.1/16,10.1.9.254/16,active",
"VRF 1,10.2.0.1/16,10.2.9.254/16,active",
"VRF 1,10.3.0.1/16,10.3.9.254/16,active",
)
cls.bulk_edit_data = {
'vrf': vrfs[1].pk,
'tenant': None,
'status': IPRangeStatusChoices.STATUS_RESERVED,
'role': roles[1].pk,
'description': 'New description',
}
class IPAddressTestCase(ViewTestCases.PrimaryObjectViewTestCase):
model = IPAddress

View File

@ -2,7 +2,7 @@ from django.urls import path
from extras.views import ObjectChangeLogView, ObjectJournalView
from . import views
from .models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from .models import *
app_name = 'ipam'
urlpatterns = [
@ -79,6 +79,19 @@ urlpatterns = [
path('prefixes/<int:pk>/prefixes/', views.PrefixPrefixesView.as_view(), name='prefix_prefixes'),
path('prefixes/<int:pk>/ip-addresses/', views.PrefixIPAddressesView.as_view(), name='prefix_ipaddresses'),
# IP ranges
path('ip-ranges/', views.IPRangeListView.as_view(), name='iprange_list'),
path('ip-ranges/add/', views.IPRangeEditView.as_view(), name='iprange_add'),
path('ip-ranges/import/', views.IPRangeBulkImportView.as_view(), name='iprange_import'),
path('ip-ranges/edit/', views.IPRangeBulkEditView.as_view(), name='iprange_bulk_edit'),
path('ip-ranges/delete/', views.IPRangeBulkDeleteView.as_view(), name='iprange_bulk_delete'),
path('ip-ranges/<int:pk>/', views.IPRangeView.as_view(), name='iprange'),
path('ip-ranges/<int:pk>/edit/', views.IPRangeEditView.as_view(), name='iprange_edit'),
path('ip-ranges/<int:pk>/delete/', views.IPRangeDeleteView.as_view(), name='iprange_delete'),
path('ip-ranges/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='iprange_changelog', kwargs={'model': IPRange}),
path('ip-ranges/<int:pk>/journal/', ObjectJournalView.as_view(), name='iprange_journal', kwargs={'model': IPRange}),
path('ip-ranges/<int:pk>/ip-addresses/', views.IPRangeIPAddressesView.as_view(), name='iprange_ipaddresses'),
# IP addresses
path('ip-addresses/', views.IPAddressListView.as_view(), name='ipaddress_list'),
path('ip-addresses/add/', views.IPAddressEditView.as_view(), name='ipaddress_add'),

View File

@ -9,7 +9,7 @@ from utilities.utils import count_related
from virtualization.models import VirtualMachine, VMInterface
from . import filtersets, forms, tables
from .constants import *
from .models import Aggregate, IPAddress, Prefix, RIR, Role, RouteTarget, Service, VLAN, VLANGroup, VRF
from .models import *
from .utils import add_available_ipaddresses, add_available_prefixes, add_available_vlans
@ -503,6 +503,83 @@ class PrefixBulkDeleteView(generic.BulkDeleteView):
table = tables.PrefixTable
#
# IP Ranges
#
class IPRangeListView(generic.ObjectListView):
queryset = IPRange.objects.all()
filterset = filtersets.IPRangeFilterSet
filterset_form = forms.IPRangeFilterForm
table = tables.IPRangeTable
class IPRangeView(generic.ObjectView):
queryset = IPRange.objects.all()
class IPRangeIPAddressesView(generic.ObjectView):
queryset = IPRange.objects.all()
template_name = 'ipam/iprange/ip_addresses.html'
def get_extra_context(self, request, instance):
# Find all IPAddresses within this range
ipaddresses = instance.get_child_ips().restrict(request.user, 'view').prefetch_related(
'vrf', 'primary_ip4_for', 'primary_ip6_for'
)
# Add available IP addresses to the table if requested
# if request.GET.get('show_available', 'true') == 'true':
# ipaddresses = add_available_ipaddresses(instance.prefix, ipaddresses, instance.is_pool)
ip_table = tables.IPAddressTable(ipaddresses)
if request.user.has_perm('ipam.change_ipaddress') or request.user.has_perm('ipam.delete_ipaddress'):
ip_table.columns.show('pk')
paginate_table(ip_table, request)
# Compile permissions list for rendering the object table
permissions = {
'add': request.user.has_perm('ipam.add_ipaddress'),
'change': request.user.has_perm('ipam.change_ipaddress'),
'delete': request.user.has_perm('ipam.delete_ipaddress'),
}
return {
'ip_table': ip_table,
'permissions': permissions,
'active_tab': 'ip-addresses',
'show_available': request.GET.get('show_available', 'true') == 'true',
}
class IPRangeEditView(generic.ObjectEditView):
queryset = IPRange.objects.all()
model_form = forms.IPRangeForm
class IPRangeDeleteView(generic.ObjectDeleteView):
queryset = IPRange.objects.all()
class IPRangeBulkImportView(generic.BulkImportView):
queryset = IPRange.objects.all()
model_form = forms.IPRangeCSVForm
table = tables.IPRangeTable
class IPRangeBulkEditView(generic.BulkEditView):
queryset = IPRange.objects.prefetch_related('vrf', 'tenant')
filterset = filtersets.IPRangeFilterSet
table = tables.IPRangeTable
form = forms.IPRangeBulkEditForm
class IPRangeBulkDeleteView(generic.BulkDeleteView):
queryset = IPRange.objects.prefetch_related('vrf', 'tenant')
filterset = filtersets.IPRangeFilterSet
table = tables.IPRangeTable
#
# IP addresses
#

View File

@ -153,6 +153,8 @@ IPAM_MENU = Menu(
MenuGroup(
label="IP Addresses",
items=(
MenuItem(label="IP Ranges", url="ipam:iprange_list",
add_url="ipam:iprange_add", import_url="ipam:iprange_import"),
MenuItem(label="IP Addresses", url="ipam:ipaddress_list",
add_url="ipam:ipaddress_add", import_url="ipam:ipaddress_import"),
),

View File

@ -21,7 +21,7 @@ from dcim.models import (
)
from extras.choices import JobResultStatusChoices
from extras.models import ObjectChange, JobResult
from ipam.models import Aggregate, IPAddress, Prefix, VLAN, VRF
from ipam.models import Aggregate, IPAddress, IPRange, Prefix, VLAN, VRF
from netbox.constants import SEARCH_MAX_RESULTS, SEARCH_TYPES
from netbox.forms import SearchForm
from tenancy.models import Tenant
@ -68,6 +68,7 @@ class HomeView(View):
("ipam.view_vrf", "VRFs", VRF.objects.restrict(request.user, 'view').count),
("ipam.view_aggregate", "Aggregates", Aggregate.objects.restrict(request.user, 'view').count),
("ipam.view_prefix", "Prefixes", Prefix.objects.restrict(request.user, 'view').count),
("ipam.view_iprange", "IP Ranges", IPRange.objects.restrict(request.user, 'view').count),
("ipam.view_ipaddress", "IP Addresses", IPAddress.objects.restrict(request.user, 'view').count),
("ipam.view_vlan", "VLANs", VLAN.objects.restrict(request.user, 'view').count)

View File

@ -0,0 +1,95 @@
{% extends 'ipam/iprange/base.html' %}
{% load helpers %}
{% load plugins %}
{% block content %}
<div class="row">
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">
IP Range
</h5>
<div class="card-body">
<table class="table table-hover attr-table">
<tr>
<th scope="row">Family</th>
<td>IPv{{ object.family }}</td>
</tr>
<tr>
<th scope="row">Starting Address</th>
<td>{{ object.start_address }}</td>
</tr>
<tr>
<th scope="row">Ending Address</th>
<td>{{ object.end_address }}</td>
</tr>
<tr>
<th scope="row">Size</th>
<td>{{ object.size }}</td>
</tr>
<tr>
<th scope="row">Utilization</th>
<td>
{% utilization_graph object.utilization %}
</td>
</tr>
<tr>
<th scope="row">VRF</th>
<td>
{% if object.vrf %}
<a href="{{ object.vrf.get_absolute_url }}">{{ object.vrf }}</a> ({{ object.vrf.rd }})
{% else %}
<span>Global</span>
{% endif %}
</td>
</tr>
<tr>
<th scope="row">Role</th>
<td>
{% if object.role %}
<a href="{{ object.role.get_absolute_url }}">{{ object.role }}</a>
{% else %}
<span class="text-muted">None</span>
{% endif %}
</td>
</tr>
<tr>
<th scope="row">Status</th>
<td>
<span class="badge bg-{{ object.get_status_class }}">{{ object.get_status_display }}</span>
</td>
</tr>
<tr>
<th scope="row">Tenant</th>
<td>
{% if object.tenant %}
{% if object.tenant.group %}
<a href="{{ object.tenant.group.get_absolute_url }}">{{ object.tenant.group }}</a> /
{% endif %}
<a href="{{ object.tenant.get_absolute_url }}">{{ object.tenant }}</a>
{% else %}
<span class="text-muted">None</span>
{% endif %}
</td>
</tr>
<tr>
<th scope="row">Description</th>
<td>{{ object.description|placeholder }}</td>
</tr>
</table>
</div>
</div>
{% plugin_left_page object %}
</div>
<div class="col col-md-6">
{% include 'extras/inc/tags_panel.html' with tags=object.tags.all url='ipam:prefix_list' %}
{% include 'inc/custom_fields_panel.html' %}
{% plugin_right_page object %}
</div>
</div>
<div class="row">
<div class="col col-md-12">
{% plugin_full_width_page object %}
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,32 @@
{% extends 'generic/object.html' %}
{% load buttons %}
{% load helpers %}
{% block breadcrumbs %}
<li class="breadcrumb-item">
<a href="{% url 'ipam:iprange_list' %}">IP Ranges</a>
</li>
{% if object.vrf %}
<li class="breadcrumb-item">
<a href="{% url 'ipam:vrf' pk=object.vrf.pk %}">{{ object.vrf }}</a>
</li>
{% endif %}
<li class="breadcrumb-item">
{{ object }}
</li>
{% endblock %}
{% block tab_items %}
<li role="presentation" class="nav-item">
<a class="nav-link{% if not active_tab %} active{% endif %}" href="{{ object.get_absolute_url }}">
IP Range
</a>
</li>
{% if perms.ipam.view_ipaddress %}
<li role="presentation" class="nav-item">
<a class="nav-link{% if active_tab == 'ip-addresses' %} active{% endif %}" href="{% url 'ipam:iprange_ipaddresses' pk=object.pk %}">
IP Addresses <span class="badge bg-primary">{{ object.get_child_ips.count }}</span>
</a>
</li>
{% endif %}
{% endblock %}

View File

@ -0,0 +1,18 @@
{% extends 'ipam/iprange/base.html' %}
{% block extra_controls %}
{% if perms.ipam.add_ipaddress and active_tab == 'ip-addresses' and object.first_available_ip %}
<a href="{% url 'ipam:ipaddress_add' %}?address={{ object.first_available_ip }}&vrf={{ object.vrf.pk }}&tenant_group={{ object.tenant.group.pk }}&tenant={{ object.tenant.pk }}" class="btn btn-sm btn-outline-success m-1">
<i class="mdi mdi-plus-thick" aria-hidden="true"></i>
Add an IP Address
</a>
{% endif %}
{% endblock %}
{% block content %}
<div class="row">
<div class="col col-md-12">
{% include 'utilities/obj_table.html' with table=ip_table heading='IP Addresses' bulk_edit_url='ipam:ipaddress_bulk_edit' bulk_delete_url='ipam:ipaddress_bulk_delete' %}
</div>
</div>
{% endblock %}

View File

@ -440,12 +440,8 @@ class ViewTestCases:
response = self.client.get(self._get_url('list'))
self.assertHttpStatus(response, 200)
content = str(response.content)
if hasattr(self.model, 'name'):
self.assertIn(instance1.name, content)
self.assertNotIn(instance2.name, content)
elif hasattr(self.model, 'get_absolute_url'):
self.assertIn(instance1.get_absolute_url(), content)
self.assertNotIn(instance2.get_absolute_url(), content)
self.assertIn(instance1.get_absolute_url(), content)
self.assertNotIn(instance2.get_absolute_url(), content)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_export_objects(self):
@ -641,7 +637,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_bulk_edit_objects_with_permission(self):
pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
pk_list = list(self._get_queryset().values_list('pk', flat=True)[:3])
data = {
'pk': pk_list,
'_apply': True, # Form button