From ad3df60fdd381a2cd5d6369ba4be40869a8551d7 Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Thu, 5 Sep 2024 15:56:30 -0400 Subject: [PATCH] Add new INET lookups for net_host_lt/gt/lte/gte comparisons irrespective of subnet inclusion --- netbox/ipam/fields.py | 4 ++++ netbox/ipam/lookups.py | 40 ++++++++++++++++++++++++++++++++ netbox/ipam/models/ip.py | 14 +++++------ netbox/ipam/tests/test_models.py | 29 +++++++++++++++++++++++ 4 files changed, 80 insertions(+), 7 deletions(-) diff --git a/netbox/ipam/fields.py b/netbox/ipam/fields.py index 20341005d..a8682e600 100644 --- a/netbox/ipam/fields.py +++ b/netbox/ipam/fields.py @@ -105,6 +105,10 @@ IPAddressField.register_lookup(lookups.NetIn) IPAddressField.register_lookup(lookups.NetHostContained) IPAddressField.register_lookup(lookups.NetFamily) IPAddressField.register_lookup(lookups.NetMaskLength) +IPAddressField.register_lookup(lookups.NetHostLessThan) +IPAddressField.register_lookup(lookups.NetHostLessThanOrEqual) +IPAddressField.register_lookup(lookups.NetHostGreaterThan) +IPAddressField.register_lookup(lookups.NetHostGreaterThanOrEqual) class ASNField(models.BigIntegerField): diff --git a/netbox/ipam/lookups.py b/netbox/ipam/lookups.py index c6abb5a26..95fd1ce05 100644 --- a/netbox/ipam/lookups.py +++ b/netbox/ipam/lookups.py @@ -154,6 +154,46 @@ class NetHostContained(Lookup): return 'CAST(HOST(%s) AS INET) <<= %s' % (lhs, rhs), params +class NetHostGreaterThan(Lookup): + lookup_name = 'net_host_gt' + + def as_sql(self, qn, connection): + lhs, lhs_params = self.process_lhs(qn, connection) + rhs, rhs_params = self.process_rhs(qn, connection) + params = lhs_params + rhs_params + return 'CAST(HOST(%s) AS INET) > INET %s' % (lhs, rhs), params + + +class NetHostLessThan(Lookup): + lookup_name = 'net_host_lt' + + def as_sql(self, qn, connection): + lhs, lhs_params = self.process_lhs(qn, connection) + rhs, rhs_params = self.process_rhs(qn, connection) + params = lhs_params + rhs_params + return 'CAST(HOST(%s) AS INET) < INET %s' % (lhs, rhs), params + + +class NetHostGreaterThanOrEqual(Lookup): + lookup_name = 'net_host_gte' + + def as_sql(self, qn, connection): + lhs, lhs_params = self.process_lhs(qn, connection) + rhs, rhs_params = self.process_rhs(qn, connection) + params = lhs_params + rhs_params + return 'CAST(HOST(%s) AS INET) >= INET %s' % (lhs, rhs), params + + +class NetHostLessThanOrEqual(Lookup): + lookup_name = 'net_host_lte' + + def as_sql(self, qn, connection): + lhs, lhs_params = self.process_lhs(qn, connection) + rhs, rhs_params = self.process_rhs(qn, connection) + params = lhs_params + rhs_params + return 'CAST(HOST(%s) AS INET) <= INET %s' % (lhs, rhs), params + + class NetFamily(Transform): lookup_name = 'family' function = 'FAMILY' diff --git a/netbox/ipam/models/ip.py b/netbox/ipam/models/ip.py index 0b8e3a8df..09d0ba1e4 100644 --- a/netbox/ipam/models/ip.py +++ b/netbox/ipam/models/ip.py @@ -580,15 +580,15 @@ class IPRange(ContactsMixin, PrimaryModel): }) # Check for overlapping ranges - overlapping_range = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter( - Q(start_address__gte=self.start_address, start_address__lte=self.end_address) | # Starts inside - Q(end_address__gte=self.start_address, end_address__lte=self.end_address) | # Ends inside - Q(start_address__lte=self.start_address, end_address__gte=self.end_address) # Starts & ends outside - ).first() - if overlapping_range: + overlapping_ranges = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter( + Q(start_address__net_host_gte=self.start_address.ip, start_address__net_host_lte=self.end_address.ip) | # Starts inside + Q(end_address__net_host_gte=self.start_address.ip, end_address__net_host_lte=self.end_address.ip) | # Ends inside + Q(start_address__net_host_lte=self.start_address.ip, end_address__net_host_gte=self.end_address.ip) # Starts & ends outside + ) + if overlapping_ranges.exists(): raise ValidationError( _("Defined addresses overlap with range {overlapping_range} in VRF {vrf}").format( - overlapping_range=overlapping_range, + overlapping_range=overlapping_ranges.first(), vrf=self.vrf )) diff --git a/netbox/ipam/tests/test_models.py b/netbox/ipam/tests/test_models.py index 39eb33a4f..5c2c7c79a 100644 --- a/netbox/ipam/tests/test_models.py +++ b/netbox/ipam/tests/test_models.py @@ -36,6 +36,35 @@ class TestAggregate(TestCase): self.assertEqual(aggregate.get_utilization(), 100) +class TestIPRange(TestCase): + + def test_overlapping_range(self): + iprange_192_168 = IPRange.objects.create(start_address=IPNetwork('192.168.0.1/22'), end_address=IPNetwork('192.168.0.49/22')) + iprange_192_168.clean() + iprange_3_1_99 = IPRange.objects.create(start_address=IPNetwork('1.2.3.1/24'), end_address=IPNetwork('1.2.3.99/24')) + iprange_3_1_99.clean() + iprange_3_100_199 = IPRange.objects.create(start_address=IPNetwork('1.2.3.100/24'), end_address=IPNetwork('1.2.3.199/24')) + iprange_3_100_199.clean() + iprange_3_200_255 = IPRange.objects.create(start_address=IPNetwork('1.2.3.200/24'), end_address=IPNetwork('1.2.3.255/24')) + iprange_3_200_255.clean() + iprange_4_1_99 = IPRange.objects.create(start_address=IPNetwork('1.2.4.1/24'), end_address=IPNetwork('1.2.4.99/24')) + iprange_4_1_99.clean() + iprange_4_200 = IPRange.objects.create(start_address=IPNetwork('1.2.4.200/24'), end_address=IPNetwork('1.2.4.255/24')) + iprange_4_200.clean() + # Overlapping range entirely within existing + with self.assertRaises(ValidationError): + iprange_3_123_124 = IPRange.objects.create(start_address=IPNetwork('1.2.3.123/26'), end_address=IPNetwork('1.2.3.124/26')) + iprange_3_123_124.clean() + # Overlapping range starting within existing + with self.assertRaises(ValidationError): + iprange_4_98_101 = IPRange.objects.create(start_address=IPNetwork('1.2.4.98/24'), end_address=IPNetwork('1.2.4.101/24')) + iprange_4_98_101.clean() + # Overlapping range ending within existing + with self.assertRaises(ValidationError): + iprange_4_198_201 = IPRange.objects.create(start_address=IPNetwork('1.2.4.198/24'), end_address=IPNetwork('1.2.4.201/24')) + iprange_4_198_201.clean() + + class TestPrefix(TestCase): def test_get_duplicates(self):