mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-22 20:12:00 -06:00
Extended prefix 'available-ips' endpoint to accept multiple objects (related to #1553)
This commit is contained in:
parent
c3e5106b04
commit
4f2dc50b5c
@ -96,28 +96,41 @@ class PrefixViewSet(CustomFieldModelViewSet):
|
||||
if not request.user.has_perm('ipam.add_ipaddress'):
|
||||
raise PermissionDenied()
|
||||
|
||||
# Find the first available IP address in the prefix
|
||||
try:
|
||||
ipaddress = list(prefix.get_available_ips())[0]
|
||||
except IndexError:
|
||||
# Determine if the requested number of IPs is available
|
||||
requested_count = len(request.data) if isinstance(request.data, list) else 1
|
||||
available_ips = list(prefix.get_available_ips())
|
||||
if len(available_ips) < requested_count:
|
||||
return Response(
|
||||
{
|
||||
"detail": "There are no available IPs within this prefix ({})".format(prefix)
|
||||
"detail": "An insufficient number of IP addresses are available within the prefix {} ({} "
|
||||
"requested, {} available)".format(prefix, requested_count, len(available_ips))
|
||||
},
|
||||
status=status.HTTP_400_BAD_REQUEST
|
||||
)
|
||||
|
||||
# Create the new IP address
|
||||
data = request.data.copy()
|
||||
data['address'] = '{}/{}'.format(ipaddress, prefix.prefix.prefixlen)
|
||||
data['vrf'] = prefix.vrf.pk if prefix.vrf else None
|
||||
serializer = serializers.WritableIPAddressSerializer(data=data)
|
||||
# Deserializing multiple IP addresses
|
||||
if isinstance(request.data, list):
|
||||
request_data = list(request.data) # Need a mutable copy
|
||||
for obj in request_data:
|
||||
obj['address'] = available_ips.pop(0)
|
||||
obj['vrf'] = prefix.vrf.pk if prefix.vrf else None
|
||||
serializer = serializers.WritableIPAddressSerializer(data=request_data, many=True)
|
||||
|
||||
# Deserializing a single IP address
|
||||
else:
|
||||
request_data = request.data.copy() # Need a mutable copy
|
||||
request_data['address'] = available_ips.pop(0)
|
||||
request_data['vrf'] = prefix.vrf.pk if prefix.vrf else None
|
||||
serializer = serializers.WritableIPAddressSerializer(data=request_data)
|
||||
|
||||
# Create the new IP address(es)
|
||||
if serializer.is_valid():
|
||||
serializer.save()
|
||||
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
||||
|
||||
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
# Determine the maximum amount of IPs to return
|
||||
# Determine the maximum number of IPs to return
|
||||
else:
|
||||
try:
|
||||
limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT))
|
||||
|
@ -365,7 +365,7 @@ class PrefixTest(HttpStatusMixin, APITestCase):
|
||||
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
|
||||
self.assertEqual(Prefix.objects.count(), 2)
|
||||
|
||||
def test_available_ips(self):
|
||||
def test_list_available_ips(self):
|
||||
|
||||
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True)
|
||||
url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
|
||||
@ -380,12 +380,19 @@ class PrefixTest(HttpStatusMixin, APITestCase):
|
||||
response = self.client.get(url, **self.header)
|
||||
self.assertEqual(len(response.data), 6) # 8 - 2 because prefix.is_pool = False
|
||||
|
||||
# Create all six available IPs
|
||||
for i in range(6):
|
||||
def test_create_single_available_ip(self):
|
||||
|
||||
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), is_pool=True)
|
||||
url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
|
||||
|
||||
# Create all four available IPs with individual requests
|
||||
for i in range(1, 5):
|
||||
data = {
|
||||
'description': 'Test IP {}'.format(i)
|
||||
}
|
||||
response = self.client.post(url, data, **self.header)
|
||||
if response.status_code != status.HTTP_201_CREATED:
|
||||
assert False, response.content
|
||||
self.assertHttpStatus(response, status.HTTP_201_CREATED)
|
||||
self.assertEqual(response.data['description'], data['description'])
|
||||
|
||||
@ -394,6 +401,27 @@ class PrefixTest(HttpStatusMixin, APITestCase):
|
||||
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn('detail', response.data)
|
||||
|
||||
def test_create_multiple_available_ips(self):
|
||||
|
||||
prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True)
|
||||
url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
|
||||
|
||||
# Try to create nine IPs (only eight are available)
|
||||
data = [{'description': 'Test IP {}'.format(i)} for i in range(1, 10)] # 9 IPs
|
||||
response = self.client.post(url, data, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn('detail', response.data)
|
||||
|
||||
# Verify that no IPs were created (eight are still available)
|
||||
response = self.client.get(url, **self.header)
|
||||
self.assertEqual(len(response.data), 8)
|
||||
|
||||
# Create all eight available IPs in a single request
|
||||
data = [{'description': 'Test IP {}'.format(i)} for i in range(1, 9)] # 8 IPs
|
||||
response = self.client.post(url, data, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_201_CREATED)
|
||||
self.assertEqual(len(response.data), 8)
|
||||
|
||||
|
||||
class IPAddressTest(HttpStatusMixin, APITestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user