Merge branch 'develop-2.3' into develop

This commit is contained in:
Jeremy Stretch
2018-02-26 14:14:47 -05:00
90 changed files with 4624 additions and 1132 deletions

View File

@@ -3,9 +3,11 @@ from __future__ import unicode_literals
from collections import OrderedDict
from rest_framework import serializers
from rest_framework.reverse import reverse
from rest_framework.validators import UniqueTogetherValidator
from dcim.api.serializers import NestedDeviceSerializer, InterfaceSerializer, NestedSiteSerializer
from dcim.models import Interface
from extras.api.customfields import CustomFieldModelSerializer
from ipam.constants import (
IPADDRESS_ROLE_CHOICES, IPADDRESS_STATUS_CHOICES, IP_PROTOCOL_CHOICES, PREFIX_STATUS_CHOICES, VLAN_STATUS_CHOICES,
@@ -25,7 +27,10 @@ class VRFSerializer(CustomFieldModelSerializer):
class Meta:
model = VRF
fields = ['id', 'name', 'rd', 'tenant', 'enforce_unique', 'description', 'display_name', 'custom_fields']
fields = [
'id', 'name', 'rd', 'tenant', 'enforce_unique', 'description', 'display_name', 'custom_fields', 'created',
'last_updated',
]
class NestedVRFSerializer(serializers.ModelSerializer):
@@ -40,7 +45,9 @@ class WritableVRFSerializer(CustomFieldModelSerializer):
class Meta:
model = VRF
fields = ['id', 'name', 'rd', 'tenant', 'enforce_unique', 'description', 'custom_fields']
fields = [
'id', 'name', 'rd', 'tenant', 'enforce_unique', 'description', 'custom_fields', 'created', 'last_updated',
]
#
@@ -90,7 +97,9 @@ class AggregateSerializer(CustomFieldModelSerializer):
class Meta:
model = Aggregate
fields = ['id', 'family', 'prefix', 'rir', 'date_added', 'description', 'custom_fields']
fields = [
'id', 'family', 'prefix', 'rir', 'date_added', 'description', 'custom_fields', 'created', 'last_updated',
]
class NestedAggregateSerializer(serializers.ModelSerializer):
@@ -105,7 +114,7 @@ class WritableAggregateSerializer(CustomFieldModelSerializer):
class Meta:
model = Aggregate
fields = ['id', 'prefix', 'rir', 'date_added', 'description', 'custom_fields']
fields = ['id', 'prefix', 'rir', 'date_added', 'description', 'custom_fields', 'created', 'last_updated']
#
@@ -165,7 +174,7 @@ class VLANSerializer(CustomFieldModelSerializer):
model = VLAN
fields = [
'id', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'display_name',
'custom_fields',
'custom_fields', 'created', 'last_updated',
]
@@ -181,7 +190,10 @@ class WritableVLANSerializer(CustomFieldModelSerializer):
class Meta:
model = VLAN
fields = ['id', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'custom_fields']
fields = [
'id', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'custom_fields', 'created',
'last_updated',
]
validators = []
def validate(self, data):
@@ -215,7 +227,7 @@ class PrefixSerializer(CustomFieldModelSerializer):
model = Prefix
fields = [
'id', 'family', 'prefix', 'site', 'vrf', 'tenant', 'vlan', 'status', 'role', 'is_pool', 'description',
'custom_fields',
'custom_fields', 'created', 'last_updated',
]
@@ -233,23 +245,47 @@ class WritablePrefixSerializer(CustomFieldModelSerializer):
model = Prefix
fields = [
'id', 'prefix', 'site', 'vrf', 'tenant', 'vlan', 'status', 'role', 'is_pool', 'description',
'custom_fields',
'custom_fields', 'created', 'last_updated',
]
class AvailablePrefixSerializer(serializers.Serializer):
def to_representation(self, instance):
if self.context.get('vrf'):
vrf = NestedVRFSerializer(self.context['vrf'], context={'request': self.context['request']}).data
else:
vrf = None
return OrderedDict([
('family', instance.version),
('prefix', str(instance)),
('vrf', vrf),
])
#
# IP addresses
#
class IPAddressInterfaceSerializer(InterfaceSerializer):
class IPAddressInterfaceSerializer(serializers.ModelSerializer):
url = serializers.SerializerMethodField() # We're imitating a HyperlinkedIdentityField here
device = NestedDeviceSerializer()
virtual_machine = NestedVirtualMachineSerializer()
class Meta(InterfaceSerializer.Meta):
model = Interface
fields = [
'id', 'device', 'virtual_machine', 'name', 'form_factor', 'enabled', 'lag', 'mtu', 'mac_address',
'mgmt_only', 'description', 'is_connected', 'interface_connection', 'circuit_termination',
'id', 'url', 'device', 'virtual_machine', 'name',
]
def get_url(self, obj):
"""
Return a link to the Interface via either the DCIM API if the parent is a Device, or via the virtualization API
if the parent is a VirtualMachine.
"""
url_name = 'dcim-api:interface-detail' if obj.device else 'virtualization-api:interface-detail'
return reverse(url_name, kwargs={'pk': obj.pk}, request=self.context['request'])
class IPAddressSerializer(CustomFieldModelSerializer):
vrf = NestedVRFSerializer()
@@ -262,7 +298,7 @@ class IPAddressSerializer(CustomFieldModelSerializer):
model = IPAddress
fields = [
'id', 'family', 'address', 'vrf', 'tenant', 'status', 'role', 'interface', 'description', 'nat_inside',
'nat_outside', 'custom_fields',
'nat_outside', 'custom_fields', 'created', 'last_updated',
]
@@ -284,7 +320,7 @@ class WritableIPAddressSerializer(CustomFieldModelSerializer):
model = IPAddress
fields = [
'id', 'address', 'vrf', 'tenant', 'status', 'role', 'interface', 'description', 'nat_inside',
'custom_fields',
'custom_fields', 'created', 'last_updated',
]
@@ -314,7 +350,10 @@ class ServiceSerializer(serializers.ModelSerializer):
class Meta:
model = Service
fields = ['id', 'device', 'virtual_machine', 'name', 'port', 'protocol', 'ipaddresses', 'description']
fields = [
'id', 'device', 'virtual_machine', 'name', 'port', 'protocol', 'ipaddresses', 'description', 'created',
'last_updated',
]
# TODO: Figure out how to use model validation with ManyToManyFields. Calling clean() yields a ValueError.
@@ -322,4 +361,7 @@ class WritableServiceSerializer(serializers.ModelSerializer):
class Meta:
model = Service
fields = ['id', 'device', 'virtual_machine', 'name', 'port', 'protocol', 'ipaddresses', 'description']
fields = [
'id', 'device', 'virtual_machine', 'name', 'port', 'protocol', 'ipaddresses', 'description', 'created',
'last_updated',
]

View File

@@ -6,12 +6,11 @@ from rest_framework import status
from rest_framework.decorators import detail_route
from rest_framework.exceptions import PermissionDenied
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from extras.api.views import CustomFieldModelViewSet
from ipam import filters
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
from utilities.api import FieldChoicesViewSet, WritableSerializerMixin
from utilities.api import FieldChoicesViewSet, ModelViewSet
from . import serializers
@@ -33,7 +32,7 @@ class IPAMFieldChoicesViewSet(FieldChoicesViewSet):
# VRFs
#
class VRFViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
class VRFViewSet(CustomFieldModelViewSet):
queryset = VRF.objects.select_related('tenant')
serializer_class = serializers.VRFSerializer
write_serializer_class = serializers.WritableVRFSerializer
@@ -54,7 +53,7 @@ class RIRViewSet(ModelViewSet):
# Aggregates
#
class AggregateViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
class AggregateViewSet(CustomFieldModelViewSet):
queryset = Aggregate.objects.select_related('rir')
serializer_class = serializers.AggregateSerializer
write_serializer_class = serializers.WritableAggregateSerializer
@@ -75,12 +74,72 @@ class RoleViewSet(ModelViewSet):
# Prefixes
#
class PrefixViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
class PrefixViewSet(CustomFieldModelViewSet):
queryset = Prefix.objects.select_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role')
serializer_class = serializers.PrefixSerializer
write_serializer_class = serializers.WritablePrefixSerializer
filter_class = filters.PrefixFilter
@detail_route(url_path='available-prefixes', methods=['get', 'post'])
def available_prefixes(self, request, pk=None):
"""
A convenience method for returning available child prefixes within a parent.
"""
prefix = get_object_or_404(Prefix, pk=pk)
available_prefixes = prefix.get_available_prefixes()
if request.method == 'POST':
# Permissions check
if not request.user.has_perm('ipam.add_prefix'):
raise PermissionDenied()
# Normalize to a list of objects
requested_prefixes = request.data if isinstance(request.data, list) else [request.data]
# Allocate prefixes to the requested objects based on availability within the parent
for requested_prefix in requested_prefixes:
# Find the first available prefix equal to or larger than the requested size
for available_prefix in available_prefixes.iter_cidrs():
if requested_prefix['prefix_length'] >= available_prefix.prefixlen:
allocated_prefix = '{}/{}'.format(available_prefix.network, requested_prefix['prefix_length'])
requested_prefix['prefix'] = allocated_prefix
requested_prefix['vrf'] = prefix.vrf.pk if prefix.vrf else None
break
else:
return Response(
{
"detail": "Insufficient space is available to accommodate the requested prefix size(s)"
},
status=status.HTTP_400_BAD_REQUEST
)
# Remove the allocated prefix from the list of available prefixes
available_prefixes.remove(allocated_prefix)
# Initialize the serializer with a list or a single object depending on what was requested
if isinstance(request.data, list):
serializer = serializers.WritablePrefixSerializer(data=requested_prefixes, many=True)
else:
serializer = serializers.WritablePrefixSerializer(data=requested_prefixes[0])
# Create the new Prefix(es)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
else:
serializer = serializers.AvailablePrefixSerializer(available_prefixes.iter_cidrs(), many=True, context={
'request': request,
'vrf': prefix.vrf,
})
return Response(serializer.data)
@detail_route(url_path='available-ips', methods=['get', 'post'])
def available_ips(self, request, pk=None):
"""
@@ -97,28 +156,39 @@ class PrefixViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
if not request.user.has_perm('ipam.add_ipaddress'):
raise PermissionDenied()
# Find the first available IP address in the prefix
try:
ipaddress = list(prefix.get_available_ips())[0]
except IndexError:
# Normalize to a list of objects
requested_ips = request.data if isinstance(request.data, list) else [request.data]
# Determine if the requested number of IPs is available
available_ips = list(prefix.get_available_ips())
if len(available_ips) < len(requested_ips):
return Response(
{
"detail": "There are no available IPs within this prefix ({})".format(prefix)
"detail": "An insufficient number of IP addresses are available within the prefix {} ({} "
"requested, {} available)".format(prefix, len(requested_ips), len(available_ips))
},
status=status.HTTP_400_BAD_REQUEST
)
# Create the new IP address
data = request.data.copy()
data['address'] = '{}/{}'.format(ipaddress, prefix.prefix.prefixlen)
data['vrf'] = prefix.vrf.pk if prefix.vrf else None
serializer = serializers.WritableIPAddressSerializer(data=data)
# Assign addresses from the list of available IPs and copy VRF assignment from the parent prefix
for requested_ip in requested_ips:
requested_ip['address'] = available_ips.pop(0)
requested_ip['vrf'] = prefix.vrf.pk if prefix.vrf else None
# Initialize the serializer with a list or a single object depending on what was requested
if isinstance(request.data, list):
serializer = serializers.WritableIPAddressSerializer(data=requested_ips, many=True)
else:
serializer = serializers.WritableIPAddressSerializer(data=requested_ips[0])
# Create the new IP address(es)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# Determine the maximum amount of IPs to return
# Determine the maximum number of IPs to return
else:
try:
limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT))
@@ -146,11 +216,11 @@ class PrefixViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
# IP addresses
#
class IPAddressViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
class IPAddressViewSet(CustomFieldModelViewSet):
queryset = IPAddress.objects.select_related(
'vrf__tenant', 'tenant', 'nat_inside'
'vrf__tenant', 'tenant', 'nat_inside', 'interface__device__device_type', 'interface__virtual_machine'
).prefetch_related(
'interface__device', 'interface__virtual_machine'
'nat_outside'
)
serializer_class = serializers.IPAddressSerializer
write_serializer_class = serializers.WritableIPAddressSerializer
@@ -161,7 +231,7 @@ class IPAddressViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
# VLAN groups
#
class VLANGroupViewSet(WritableSerializerMixin, ModelViewSet):
class VLANGroupViewSet(ModelViewSet):
queryset = VLANGroup.objects.select_related('site')
serializer_class = serializers.VLANGroupSerializer
write_serializer_class = serializers.WritableVLANGroupSerializer
@@ -172,7 +242,7 @@ class VLANGroupViewSet(WritableSerializerMixin, ModelViewSet):
# VLANs
#
class VLANViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
class VLANViewSet(CustomFieldModelViewSet):
queryset = VLAN.objects.select_related('site', 'group', 'tenant', 'role')
serializer_class = serializers.VLANSerializer
write_serializer_class = serializers.WritableVLANSerializer
@@ -183,7 +253,7 @@ class VLANViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
# Services
#
class ServiceViewSet(WritableSerializerMixin, ModelViewSet):
class ServiceViewSet(ModelViewSet):
queryset = Service.objects.select_related('device')
serializer_class = serializers.ServiceSerializer
write_serializer_class = serializers.WritableServiceSerializer

View File

@@ -99,11 +99,6 @@ class PrefixFilter(CustomFieldFilterSet, django_filters.FilterSet):
method='search',
label='Search',
)
# TODO: Deprecate in v2.3.0
parent = django_filters.CharFilter(
method='search_within_include',
label='Parent prefix (deprecated)',
)
within = django_filters.CharFilter(
method='search_within',
label='Within prefix',
@@ -262,16 +257,15 @@ class IPAddressFilter(CustomFieldFilterSet, django_filters.FilterSet):
to_field_name='slug',
label='Tenant (slug)',
)
device_id = django_filters.ModelMultipleChoiceFilter(
name='interface__device',
queryset=Device.objects.all(),
label='Device (ID)',
device = django_filters.CharFilter(
method='filter_device',
name='name',
label='Device',
)
device = django_filters.ModelMultipleChoiceFilter(
name='interface__device__name',
queryset=Device.objects.all(),
to_field_name='name',
label='Device (name)',
device_id = django_filters.NumberFilter(
method='filter_device',
name='pk',
label='Device (ID)',
)
virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
name='interface__virtual_machine',
@@ -324,6 +318,14 @@ class IPAddressFilter(CustomFieldFilterSet, django_filters.FilterSet):
return queryset
return queryset.filter(address__net_mask_length=value)
def filter_device(self, queryset, name, value):
try:
device = Device.objects.select_related('device_type').get(**{name: value})
vc_interface_ids = [i['id'] for i in device.vc_interfaces.values('id')]
return queryset.filter(interface_id__in=vc_interface_ids)
except Device.DoesNotExist:
return queryset.none()
class VLANGroupFilter(django_filters.FilterSet):
site_id = django_filters.ModelMultipleChoiceFilter(

View File

@@ -931,8 +931,9 @@ class ServiceForm(BootstrapMixin, forms.ModelForm):
# Limit IP address choices to those assigned to interfaces of the parent device/VM
if self.instance.device:
vc_interface_ids = [i['id'] for i in self.instance.device.vc_interfaces.values('id')]
self.fields['ipaddresses'].queryset = IPAddress.objects.filter(
interface__device=self.instance.device
interface_id__in=vc_interface_ids
)
elif self.instance.virtual_machine:
self.fields['ipaddresses'].queryset = IPAddress.objects.filter(

View File

@@ -47,7 +47,7 @@ class VRFTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:vrf-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(VRF.objects.count(), 4)
@@ -55,6 +55,32 @@ class VRFTest(HttpStatusMixin, APITestCase):
self.assertEqual(vrf4.name, data['name'])
self.assertEqual(vrf4.rd, data['rd'])
def test_create_vrf_bulk(self):
data = [
{
'name': 'Test VRF 4',
'rd': '65000:4',
},
{
'name': 'Test VRF 5',
'rd': '65000:5',
},
{
'name': 'Test VRF 6',
'rd': '65000:6',
},
]
url = reverse('ipam-api:vrf-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(VRF.objects.count(), 6)
self.assertEqual(response.data[0]['name'], data[0]['name'])
self.assertEqual(response.data[1]['name'], data[1]['name'])
self.assertEqual(response.data[2]['name'], data[2]['name'])
def test_update_vrf(self):
data = {
@@ -63,7 +89,7 @@ class VRFTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:vrf-detail', kwargs={'pk': self.vrf1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(VRF.objects.count(), 3)
@@ -114,7 +140,7 @@ class RIRTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:rir-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(RIR.objects.count(), 4)
@@ -122,6 +148,32 @@ class RIRTest(HttpStatusMixin, APITestCase):
self.assertEqual(rir4.name, data['name'])
self.assertEqual(rir4.slug, data['slug'])
def test_create_rir_bulk(self):
data = [
{
'name': 'Test RIR 4',
'slug': 'test-rir-4',
},
{
'name': 'Test RIR 5',
'slug': 'test-rir-5',
},
{
'name': 'Test RIR 6',
'slug': 'test-rir-6',
},
]
url = reverse('ipam-api:rir-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(RIR.objects.count(), 6)
self.assertEqual(response.data[0]['name'], data[0]['name'])
self.assertEqual(response.data[1]['name'], data[1]['name'])
self.assertEqual(response.data[2]['name'], data[2]['name'])
def test_update_rir(self):
data = {
@@ -130,7 +182,7 @@ class RIRTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:rir-detail', kwargs={'pk': self.rir1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(RIR.objects.count(), 3)
@@ -183,7 +235,7 @@ class AggregateTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:aggregate-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Aggregate.objects.count(), 4)
@@ -191,6 +243,32 @@ class AggregateTest(HttpStatusMixin, APITestCase):
self.assertEqual(str(aggregate4.prefix), data['prefix'])
self.assertEqual(aggregate4.rir_id, data['rir'])
def test_create_aggregate_bulk(self):
data = [
{
'prefix': '100.0.0.0/8',
'rir': self.rir1.pk,
},
{
'prefix': '101.0.0.0/8',
'rir': self.rir1.pk,
},
{
'prefix': '102.0.0.0/8',
'rir': self.rir1.pk,
},
]
url = reverse('ipam-api:aggregate-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Aggregate.objects.count(), 6)
self.assertEqual(response.data[0]['prefix'], data[0]['prefix'])
self.assertEqual(response.data[1]['prefix'], data[1]['prefix'])
self.assertEqual(response.data[2]['prefix'], data[2]['prefix'])
def test_update_aggregate(self):
data = {
@@ -199,7 +277,7 @@ class AggregateTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:aggregate-detail', kwargs={'pk': self.aggregate1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(Aggregate.objects.count(), 3)
@@ -250,7 +328,7 @@ class RoleTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:role-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Role.objects.count(), 4)
@@ -258,6 +336,32 @@ class RoleTest(HttpStatusMixin, APITestCase):
self.assertEqual(role4.name, data['name'])
self.assertEqual(role4.slug, data['slug'])
def test_create_role_bulk(self):
data = [
{
'name': 'Test Role 4',
'slug': 'test-role-4',
},
{
'name': 'Test Role 5',
'slug': 'test-role-5',
},
{
'name': 'Test Role 6',
'slug': 'test-role-6',
},
]
url = reverse('ipam-api:role-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Role.objects.count(), 6)
self.assertEqual(response.data[0]['name'], data[0]['name'])
self.assertEqual(response.data[1]['name'], data[1]['name'])
self.assertEqual(response.data[2]['name'], data[2]['name'])
def test_update_role(self):
data = {
@@ -266,7 +370,7 @@ class RoleTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:role-detail', kwargs={'pk': self.role1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(Role.objects.count(), 3)
@@ -324,7 +428,7 @@ class PrefixTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:prefix-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Prefix.objects.count(), 4)
@@ -335,6 +439,29 @@ class PrefixTest(HttpStatusMixin, APITestCase):
self.assertEqual(prefix4.vlan_id, data['vlan'])
self.assertEqual(prefix4.role_id, data['role'])
def test_create_prefix_bulk(self):
data = [
{
'prefix': '10.0.1.0/24',
},
{
'prefix': '10.0.2.0/24',
},
{
'prefix': '10.0.3.0/24',
},
]
url = reverse('ipam-api:prefix-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Prefix.objects.count(), 6)
self.assertEqual(response.data[0]['prefix'], data[0]['prefix'])
self.assertEqual(response.data[1]['prefix'], data[1]['prefix'])
self.assertEqual(response.data[2]['prefix'], data[2]['prefix'])
def test_update_prefix(self):
data = {
@@ -346,7 +473,7 @@ class PrefixTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefix1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(Prefix.objects.count(), 3)
@@ -365,7 +492,73 @@ class PrefixTest(HttpStatusMixin, APITestCase):
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertEqual(Prefix.objects.count(), 2)
def test_available_ips(self):
def test_list_available_prefixes(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
Prefix.objects.create(prefix=IPNetwork('192.0.2.64/26'))
Prefix.objects.create(prefix=IPNetwork('192.0.2.192/27'))
url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
# Retrieve all available IPs
response = self.client.get(url, **self.header)
available_prefixes = ['192.0.2.0/26', '192.0.2.128/26', '192.0.2.224/27']
for i, p in enumerate(response.data):
self.assertEqual(p['prefix'], available_prefixes[i])
def test_create_single_available_prefix(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), is_pool=True)
url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
# Create four available prefixes with individual requests
prefixes_to_be_created = [
'192.0.2.0/30',
'192.0.2.4/30',
'192.0.2.8/30',
'192.0.2.12/30',
]
for i in range(4):
data = {
'prefix_length': 30,
'description': 'Test Prefix {}'.format(i + 1)
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['prefix'], prefixes_to_be_created[i])
self.assertEqual(response.data['description'], data['description'])
# Try to create one more prefix
response = self.client.post(url, {'prefix_length': 30}, **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('detail', response.data)
def test_create_multiple_available_prefixes(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), is_pool=True)
url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
# Try to create five /30s (only four are available)
data = [
{'prefix_length': 30, 'description': 'Test Prefix 1'},
{'prefix_length': 30, 'description': 'Test Prefix 2'},
{'prefix_length': 30, 'description': 'Test Prefix 3'},
{'prefix_length': 30, 'description': 'Test Prefix 4'},
{'prefix_length': 30, 'description': 'Test Prefix 5'},
]
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('detail', response.data)
# Verify that no prefixes were created (the entire /28 is still available)
response = self.client.get(url, **self.header)
self.assertEqual(response.data[0]['prefix'], '192.0.2.0/28')
# Create four /30s in a single request
response = self.client.post(url, data[:4], format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(len(response.data), 4)
def test_list_available_ips(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True)
url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
@@ -380,12 +573,17 @@ class PrefixTest(HttpStatusMixin, APITestCase):
response = self.client.get(url, **self.header)
self.assertEqual(len(response.data), 6) # 8 - 2 because prefix.is_pool = False
# Create all six available IPs
for i in range(6):
def test_create_single_available_ip(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), is_pool=True)
url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
# Create all four available IPs with individual requests
for i in range(1, 5):
data = {
'description': 'Test IP {}'.format(i)
}
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['description'], data['description'])
@@ -394,6 +592,27 @@ class PrefixTest(HttpStatusMixin, APITestCase):
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('detail', response.data)
def test_create_multiple_available_ips(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True)
url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
# Try to create nine IPs (only eight are available)
data = [{'description': 'Test IP {}'.format(i)} for i in range(1, 10)] # 9 IPs
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('detail', response.data)
# Verify that no IPs were created (eight are still available)
response = self.client.get(url, **self.header)
self.assertEqual(len(response.data), 8)
# Create all eight available IPs in a single request
data = [{'description': 'Test IP {}'.format(i)} for i in range(1, 9)] # 8 IPs
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(len(response.data), 8)
class IPAddressTest(HttpStatusMixin, APITestCase):
@@ -430,7 +649,7 @@ class IPAddressTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:ipaddress-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(IPAddress.objects.count(), 4)
@@ -438,6 +657,29 @@ class IPAddressTest(HttpStatusMixin, APITestCase):
self.assertEqual(str(ipaddress4.address), data['address'])
self.assertEqual(ipaddress4.vrf_id, data['vrf'])
def test_create_ipaddress_bulk(self):
data = [
{
'address': '192.168.0.4/24',
},
{
'address': '192.168.0.5/24',
},
{
'address': '192.168.0.6/24',
},
]
url = reverse('ipam-api:ipaddress-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(IPAddress.objects.count(), 6)
self.assertEqual(response.data[0]['address'], data[0]['address'])
self.assertEqual(response.data[1]['address'], data[1]['address'])
self.assertEqual(response.data[2]['address'], data[2]['address'])
def test_update_ipaddress(self):
data = {
@@ -446,7 +688,7 @@ class IPAddressTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:ipaddress-detail', kwargs={'pk': self.ipaddress1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(IPAddress.objects.count(), 3)
@@ -497,7 +739,7 @@ class VLANGroupTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:vlangroup-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(VLANGroup.objects.count(), 4)
@@ -505,6 +747,32 @@ class VLANGroupTest(HttpStatusMixin, APITestCase):
self.assertEqual(vlangroup4.name, data['name'])
self.assertEqual(vlangroup4.slug, data['slug'])
def test_create_vlangroup_bulk(self):
data = [
{
'name': 'Test VLAN Group 4',
'slug': 'test-vlan-group-4',
},
{
'name': 'Test VLAN Group 5',
'slug': 'test-vlan-group-5',
},
{
'name': 'Test VLAN Group 6',
'slug': 'test-vlan-group-6',
},
]
url = reverse('ipam-api:vlangroup-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(VLANGroup.objects.count(), 6)
self.assertEqual(response.data[0]['name'], data[0]['name'])
self.assertEqual(response.data[1]['name'], data[1]['name'])
self.assertEqual(response.data[2]['name'], data[2]['name'])
def test_update_vlangroup(self):
data = {
@@ -513,7 +781,7 @@ class VLANGroupTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:vlangroup-detail', kwargs={'pk': self.vlangroup1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(VLANGroup.objects.count(), 3)
@@ -564,7 +832,7 @@ class VLANTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:vlan-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(VLAN.objects.count(), 4)
@@ -572,6 +840,32 @@ class VLANTest(HttpStatusMixin, APITestCase):
self.assertEqual(vlan4.vid, data['vid'])
self.assertEqual(vlan4.name, data['name'])
def test_create_vlan_bulk(self):
data = [
{
'vid': 4,
'name': 'Test VLAN 4',
},
{
'vid': 5,
'name': 'Test VLAN 5',
},
{
'vid': 6,
'name': 'Test VLAN 6',
},
]
url = reverse('ipam-api:vlan-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(VLAN.objects.count(), 6)
self.assertEqual(response.data[0]['name'], data[0]['name'])
self.assertEqual(response.data[1]['name'], data[1]['name'])
self.assertEqual(response.data[2]['name'], data[2]['name'])
def test_update_vlan(self):
data = {
@@ -580,7 +874,7 @@ class VLANTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:vlan-detail', kwargs={'pk': self.vlan1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(VLAN.objects.count(), 3)
@@ -649,7 +943,7 @@ class ServiceTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:service-list')
response = self.client.post(url, data, **self.header)
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Service.objects.count(), 4)
@@ -659,6 +953,38 @@ class ServiceTest(HttpStatusMixin, APITestCase):
self.assertEqual(service4.protocol, data['protocol'])
self.assertEqual(service4.port, data['port'])
def test_create_service_bulk(self):
data = [
{
'device': self.device1.pk,
'name': 'Test Service 4',
'protocol': IP_PROTOCOL_TCP,
'port': 4,
},
{
'device': self.device1.pk,
'name': 'Test Service 5',
'protocol': IP_PROTOCOL_TCP,
'port': 5,
},
{
'device': self.device1.pk,
'name': 'Test Service 6',
'protocol': IP_PROTOCOL_TCP,
'port': 6,
},
]
url = reverse('ipam-api:service-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Service.objects.count(), 6)
self.assertEqual(response.data[0]['name'], data[0]['name'])
self.assertEqual(response.data[1]['name'], data[1]['name'])
self.assertEqual(response.data[2]['name'], data[2]['name'])
def test_update_service(self):
data = {
@@ -669,7 +995,7 @@ class ServiceTest(HttpStatusMixin, APITestCase):
}
url = reverse('ipam-api:service-detail', kwargs={'pk': self.service1.pk})
response = self.client.put(url, data, **self.header)
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(Service.objects.count(), 3)