mirror of
https://github.com/netbox-community/netbox.git
synced 2025-08-28 10:16:10 -06:00
parent
f2b29273d0
commit
8b397f3b42
@ -1,4 +1,5 @@
|
|||||||
from django.db.models import CharField, Lookup
|
from django.db.models import CharField, JSONField, Lookup
|
||||||
|
from django.db.models.fields.json import KeyTextTransform
|
||||||
|
|
||||||
from .fields import CachedValueField
|
from .fields import CachedValueField
|
||||||
|
|
||||||
@ -18,6 +19,30 @@ class Empty(Lookup):
|
|||||||
return f"CAST(LENGTH({sql}) AS BOOLEAN) IS TRUE", params
|
return f"CAST(LENGTH({sql}) AS BOOLEAN) IS TRUE", params
|
||||||
|
|
||||||
|
|
||||||
|
class JSONEmpty(Lookup):
|
||||||
|
"""
|
||||||
|
Support "empty" lookups for JSONField keys.
|
||||||
|
|
||||||
|
A key is considered empty if it is "", null, or does not exist.
|
||||||
|
"""
|
||||||
|
lookup_name = "empty"
|
||||||
|
|
||||||
|
def as_sql(self, compiler, connection):
|
||||||
|
# self.lhs.lhs is the parent expression (could be a JSONField or another KeyTransform)
|
||||||
|
# Rebuild the expression using KeyTextTransform to guarantee ->> (text)
|
||||||
|
text_expr = KeyTextTransform(self.lhs.key_name, self.lhs.lhs)
|
||||||
|
lhs_sql, lhs_params = compiler.compile(text_expr)
|
||||||
|
|
||||||
|
value = self.rhs
|
||||||
|
if value not in (True, False):
|
||||||
|
raise ValueError("The 'empty' lookup only accepts True or False.")
|
||||||
|
|
||||||
|
condition = '' if value else 'NOT '
|
||||||
|
sql = f"(NULLIF({lhs_sql}, '') IS {condition}NULL)"
|
||||||
|
|
||||||
|
return sql, lhs_params
|
||||||
|
|
||||||
|
|
||||||
class NetHost(Lookup):
|
class NetHost(Lookup):
|
||||||
"""
|
"""
|
||||||
Similar to ipam.lookups.NetHost, but casts the field to INET.
|
Similar to ipam.lookups.NetHost, but casts the field to INET.
|
||||||
@ -45,5 +70,6 @@ class NetContainsOrEquals(Lookup):
|
|||||||
|
|
||||||
|
|
||||||
CharField.register_lookup(Empty)
|
CharField.register_lookup(Empty)
|
||||||
|
JSONField.register_lookup(JSONEmpty)
|
||||||
CachedValueField.register_lookup(NetHost)
|
CachedValueField.register_lookup(NetHost)
|
||||||
CachedValueField.register_lookup(NetContainsOrEquals)
|
CachedValueField.register_lookup(NetContainsOrEquals)
|
||||||
|
@ -600,11 +600,19 @@ class CustomField(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
|
|||||||
kwargs = {
|
kwargs = {
|
||||||
'field_name': f'custom_field_data__{self.name}'
|
'field_name': f'custom_field_data__{self.name}'
|
||||||
}
|
}
|
||||||
|
# Native numeric filters will use `isnull` by default for empty lookups, but
|
||||||
|
# JSON fields require `empty` (see bug #20012).
|
||||||
|
if lookup_expr == 'isnull':
|
||||||
|
lookup_expr = 'empty'
|
||||||
if lookup_expr is not None:
|
if lookup_expr is not None:
|
||||||
kwargs['lookup_expr'] = lookup_expr
|
kwargs['lookup_expr'] = lookup_expr
|
||||||
|
|
||||||
|
# 'Empty' lookup is always a boolean
|
||||||
|
if lookup_expr == 'empty':
|
||||||
|
filter_class = django_filters.BooleanFilter
|
||||||
|
|
||||||
# Text/URL
|
# Text/URL
|
||||||
if self.type in (
|
elif self.type in (
|
||||||
CustomFieldTypeChoices.TYPE_TEXT,
|
CustomFieldTypeChoices.TYPE_TEXT,
|
||||||
CustomFieldTypeChoices.TYPE_LONGTEXT,
|
CustomFieldTypeChoices.TYPE_LONGTEXT,
|
||||||
CustomFieldTypeChoices.TYPE_URL,
|
CustomFieldTypeChoices.TYPE_URL,
|
||||||
|
@ -1615,6 +1615,7 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
'cf11': manufacturers[2].pk,
|
'cf11': manufacturers[2].pk,
|
||||||
'cf12': [manufacturers[2].pk, manufacturers[3].pk],
|
'cf12': [manufacturers[2].pk, manufacturers[3].pk],
|
||||||
}),
|
}),
|
||||||
|
Site(name='Site 4', slug='site-4'),
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_filter_integer(self):
|
def test_filter_integer(self):
|
||||||
@ -1624,6 +1625,7 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.assertEqual(self.filterset({'cf_cf1__gte': [200]}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf1__gte': [200]}, self.queryset).qs.count(), 2)
|
||||||
self.assertEqual(self.filterset({'cf_cf1__lt': [200]}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf1__lt': [200]}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf1__lte': [200]}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf1__lte': [200]}, self.queryset).qs.count(), 2)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf1__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_decimal(self):
|
def test_filter_decimal(self):
|
||||||
self.assertEqual(self.filterset({'cf_cf2': [100.1, 200.2]}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf2': [100.1, 200.2]}, self.queryset).qs.count(), 2)
|
||||||
@ -1632,6 +1634,7 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.assertEqual(self.filterset({'cf_cf2__gte': [200.2]}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf2__gte': [200.2]}, self.queryset).qs.count(), 2)
|
||||||
self.assertEqual(self.filterset({'cf_cf2__lt': [200.2]}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf2__lt': [200.2]}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf2__lte': [200.2]}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf2__lte': [200.2]}, self.queryset).qs.count(), 2)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf2__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_boolean(self):
|
def test_filter_boolean(self):
|
||||||
self.assertEqual(self.filterset({'cf_cf3': True}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf3': True}, self.queryset).qs.count(), 2)
|
||||||
@ -1648,6 +1651,7 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.assertEqual(self.filterset({'cf_cf4__niew': ['bar']}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf4__niew': ['bar']}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf4__ie': ['FOO']}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf4__ie': ['FOO']}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf4__nie': ['FOO']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf4__nie': ['FOO']}, self.queryset).qs.count(), 2)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf4__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_text_loose(self):
|
def test_filter_text_loose(self):
|
||||||
self.assertEqual(self.filterset({'cf_cf5': ['foo']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf5': ['foo']}, self.queryset).qs.count(), 2)
|
||||||
@ -1659,6 +1663,7 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.assertEqual(self.filterset({'cf_cf6__gte': ['2016-06-27']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf6__gte': ['2016-06-27']}, self.queryset).qs.count(), 2)
|
||||||
self.assertEqual(self.filterset({'cf_cf6__lt': ['2016-06-27']}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf6__lt': ['2016-06-27']}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf6__lte': ['2016-06-27']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf6__lte': ['2016-06-27']}, self.queryset).qs.count(), 2)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf6__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_url_strict(self):
|
def test_filter_url_strict(self):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -1674,17 +1679,20 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.assertEqual(self.filterset({'cf_cf7__niew': ['.com']}, self.queryset).qs.count(), 0)
|
self.assertEqual(self.filterset({'cf_cf7__niew': ['.com']}, self.queryset).qs.count(), 0)
|
||||||
self.assertEqual(self.filterset({'cf_cf7__ie': ['HTTP://A.EXAMPLE.COM']}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf7__ie': ['HTTP://A.EXAMPLE.COM']}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf7__nie': ['HTTP://A.EXAMPLE.COM']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf7__nie': ['HTTP://A.EXAMPLE.COM']}, self.queryset).qs.count(), 2)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf7__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_url_loose(self):
|
def test_filter_url_loose(self):
|
||||||
self.assertEqual(self.filterset({'cf_cf8': ['example.com']}, self.queryset).qs.count(), 3)
|
self.assertEqual(self.filterset({'cf_cf8': ['example.com']}, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
def test_filter_select(self):
|
def test_filter_select(self):
|
||||||
self.assertEqual(self.filterset({'cf_cf9': ['A', 'B']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf9': ['A', 'B']}, self.queryset).qs.count(), 2)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf9__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_multiselect(self):
|
def test_filter_multiselect(self):
|
||||||
self.assertEqual(self.filterset({'cf_cf10': ['A']}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf10': ['A']}, self.queryset).qs.count(), 1)
|
||||||
self.assertEqual(self.filterset({'cf_cf10': ['A', 'C']}, self.queryset).qs.count(), 2)
|
self.assertEqual(self.filterset({'cf_cf10': ['A', 'C']}, self.queryset).qs.count(), 2)
|
||||||
self.assertEqual(self.filterset({'cf_cf10': ['null']}, self.queryset).qs.count(), 1)
|
self.assertEqual(self.filterset({'cf_cf10': ['null']}, self.queryset).qs.count(), 1) # Contains a literal null
|
||||||
|
self.assertEqual(self.filterset({'cf_cf10__empty': True}, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
def test_filter_object(self):
|
def test_filter_object(self):
|
||||||
manufacturer_ids = Manufacturer.objects.values_list('id', flat=True)
|
manufacturer_ids = Manufacturer.objects.values_list('id', flat=True)
|
||||||
@ -1692,6 +1700,7 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.filterset({'cf_cf11': [manufacturer_ids[0], manufacturer_ids[1]]}, self.queryset).qs.count(),
|
self.filterset({'cf_cf11': [manufacturer_ids[0], manufacturer_ids[1]]}, self.queryset).qs.count(),
|
||||||
2
|
2
|
||||||
)
|
)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf11__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
def test_filter_multiobject(self):
|
def test_filter_multiobject(self):
|
||||||
manufacturer_ids = Manufacturer.objects.values_list('id', flat=True)
|
manufacturer_ids = Manufacturer.objects.values_list('id', flat=True)
|
||||||
@ -1703,3 +1712,4 @@ class CustomFieldModelFilterTest(TestCase):
|
|||||||
self.filterset({'cf_cf12': [manufacturer_ids[3]]}, self.queryset).qs.count(),
|
self.filterset({'cf_cf12': [manufacturer_ids[3]]}, self.queryset).qs.count(),
|
||||||
3
|
3
|
||||||
)
|
)
|
||||||
|
self.assertEqual(self.filterset({'cf_cf12__empty': True}, self.queryset).qs.count(), 1)
|
||||||
|
@ -29,6 +29,13 @@ __all__ = (
|
|||||||
'OrganizationalModelFilterSet',
|
'OrganizationalModelFilterSet',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
STANDARD_LOOKUPS = (
|
||||||
|
'exact',
|
||||||
|
'iexact',
|
||||||
|
'in',
|
||||||
|
'contains',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# FilterSets
|
# FilterSets
|
||||||
@ -159,7 +166,7 @@ class BaseFilterSet(django_filters.FilterSet):
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
# Skip nonstandard lookup expressions
|
# Skip nonstandard lookup expressions
|
||||||
if existing_filter.method is not None or existing_filter.lookup_expr not in ['exact', 'iexact', 'in']:
|
if existing_filter.method is not None or existing_filter.lookup_expr not in STANDARD_LOOKUPS:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# Choose the lookup expression map based on the filter type
|
# Choose the lookup expression map based on the filter type
|
||||||
|
Loading…
Reference in New Issue
Block a user