diff --git a/netbox/ipam/filtersets.py b/netbox/ipam/filtersets.py index 2e9f56bbc..dbda8811f 100644 --- a/netbox/ipam/filtersets.py +++ b/netbox/ipam/filtersets.py @@ -16,6 +16,7 @@ from virtualization.models import VirtualMachine, VMInterface from .choices import * from .models import * +from rest_framework import serializers __all__ = ( 'AggregateFilterSet', @@ -599,7 +600,33 @@ class IPAddressFilterSet(NetBoxModelFilterSet, TenancyFilterSet): return queryset.none() return queryset.filter(q) + def parse_inet_addresses(self, value): + ''' + Parse networks or IP addresses and cast to a format + acceptable by the Postgres inet type. + + Skips invalid values. + ''' + parsed = [] + for addr in value: + if netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr): + parsed.append(addr) + continue + try: + network = netaddr.IPNetwork(addr) + parsed.append(str(network)) + except (AddrFormatError, ValueError): + continue + return parsed + def filter_address(self, queryset, name, value): + # Let's first parse the addresses passed + # as argument. If they are all invalid, + # we return an empty queryset + value = self.parse_inet_addresses(value) + if (len(value) == 0): + return queryset.none() + try: return queryset.filter(address__net_in=value) except ValidationError: diff --git a/netbox/ipam/tests/test_filtersets.py b/netbox/ipam/tests/test_filtersets.py index 13b3ae163..c53522d7a 100644 --- a/netbox/ipam/tests/test_filtersets.py +++ b/netbox/ipam/tests/test_filtersets.py @@ -10,6 +10,7 @@ from ipam.models import * from utilities.testing import ChangeLoggedFilterSetTests, create_test_device, create_test_virtualmachine from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine, VMInterface from tenancy.models import Tenant, TenantGroup +from rest_framework import serializers class ASNTestCase(TestCase, ChangeLoggedFilterSetTests): @@ -851,6 +852,26 @@ class IPAddressTestCase(TestCase, ChangeLoggedFilterSetTests): params = {'address': ['2001:db8::1/64', '2001:db8::1/65']} self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2) + # Check for valid edge cases. Note that Postgres inet type + # only accepts netmasks in the int form, so the filterset + # casts netmasks in the xxx.xxx.xxx.xxx format. + params = {'address': ['24']} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 0) + params = {'address': ['10.0.0.1/255.255.255.0']} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + params = {'address': ['10.0.0.1/255.255.255.0', '10.0.0.1/25']} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2) + + # Check for invalid input. + params = {'address': ['/24']} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 0) + params = {'address': ['10.0.0.1/255.255.999.0']} # Invalid netmask + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 0) + + # Check for partially invalid input. + params = {'address': ['10.0.0.1', '/24', '10.0.0.10/24']} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2) + def test_mask_length(self): params = {'mask_length': '24'} self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)