diff --git a/netbox/ipam/api/views.py b/netbox/ipam/api/views.py index 3c8f3c43d..0e975146a 100644 --- a/netbox/ipam/api/views.py +++ b/netbox/ipam/api/views.py @@ -96,28 +96,41 @@ class PrefixViewSet(CustomFieldModelViewSet): if not request.user.has_perm('ipam.add_ipaddress'): raise PermissionDenied() - # Find the first available IP address in the prefix - try: - ipaddress = list(prefix.get_available_ips())[0] - except IndexError: + # Determine if the requested number of IPs is available + requested_count = len(request.data) if isinstance(request.data, list) else 1 + available_ips = list(prefix.get_available_ips()) + if len(available_ips) < requested_count: return Response( { - "detail": "There are no available IPs within this prefix ({})".format(prefix) + "detail": "An insufficient number of IP addresses are available within the prefix {} ({} " + "requested, {} available)".format(prefix, requested_count, len(available_ips)) }, status=status.HTTP_400_BAD_REQUEST ) - # Create the new IP address - data = request.data.copy() - data['address'] = '{}/{}'.format(ipaddress, prefix.prefix.prefixlen) - data['vrf'] = prefix.vrf.pk if prefix.vrf else None - serializer = serializers.WritableIPAddressSerializer(data=data) + # Deserializing multiple IP addresses + if isinstance(request.data, list): + request_data = list(request.data) # Need a mutable copy + for obj in request_data: + obj['address'] = available_ips.pop(0) + obj['vrf'] = prefix.vrf.pk if prefix.vrf else None + serializer = serializers.WritableIPAddressSerializer(data=request_data, many=True) + + # Deserializing a single IP address + else: + request_data = request.data.copy() # Need a mutable copy + request_data['address'] = available_ips.pop(0) + request_data['vrf'] = prefix.vrf.pk if prefix.vrf else None + serializer = serializers.WritableIPAddressSerializer(data=request_data) + + # Create the new IP address(es) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - # Determine the maximum amount of IPs to return + # Determine the maximum number of IPs to return else: try: limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT)) diff --git a/netbox/ipam/tests/test_api.py b/netbox/ipam/tests/test_api.py index cba624a59..b39116b9f 100644 --- a/netbox/ipam/tests/test_api.py +++ b/netbox/ipam/tests/test_api.py @@ -365,7 +365,7 @@ class PrefixTest(HttpStatusMixin, APITestCase): self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT) self.assertEqual(Prefix.objects.count(), 2) - def test_available_ips(self): + def test_list_available_ips(self): prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True) url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk}) @@ -380,12 +380,19 @@ class PrefixTest(HttpStatusMixin, APITestCase): response = self.client.get(url, **self.header) self.assertEqual(len(response.data), 6) # 8 - 2 because prefix.is_pool = False - # Create all six available IPs - for i in range(6): + def test_create_single_available_ip(self): + + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), is_pool=True) + url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk}) + + # Create all four available IPs with individual requests + for i in range(1, 5): data = { 'description': 'Test IP {}'.format(i) } response = self.client.post(url, data, **self.header) + if response.status_code != status.HTTP_201_CREATED: + assert False, response.content self.assertHttpStatus(response, status.HTTP_201_CREATED) self.assertEqual(response.data['description'], data['description']) @@ -394,6 +401,27 @@ class PrefixTest(HttpStatusMixin, APITestCase): self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) self.assertIn('detail', response.data) + def test_create_multiple_available_ips(self): + + prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True) + url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk}) + + # Try to create nine IPs (only eight are available) + data = [{'description': 'Test IP {}'.format(i)} for i in range(1, 10)] # 9 IPs + response = self.client.post(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn('detail', response.data) + + # Verify that no IPs were created (eight are still available) + response = self.client.get(url, **self.header) + self.assertEqual(len(response.data), 8) + + # Create all eight available IPs in a single request + data = [{'description': 'Test IP {}'.format(i)} for i in range(1, 9)] # 8 IPs + response = self.client.post(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_201_CREATED) + self.assertEqual(len(response.data), 8) + class IPAddressTest(HttpStatusMixin, APITestCase):