Closes #14279: Pass current request to custom validators

This commit is contained in:
Jeremy Stretch 2024-03-20 14:18:54 -04:00
parent a83b233341
commit 4ea1251d82
3 changed files with 98 additions and 30 deletions

View File

@ -1,3 +1,4 @@
import importlib
import logging import logging
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
@ -13,7 +14,6 @@ from core.signals import job_end, job_start
from extras.constants import EVENT_JOB_END, EVENT_JOB_START from extras.constants import EVENT_JOB_END, EVENT_JOB_START
from extras.events import process_event_rules from extras.events import process_event_rules
from extras.models import EventRule from extras.models import EventRule
from extras.validators import run_validators
from netbox.config import get_config from netbox.config import get_config
from netbox.context import current_request, events_queue from netbox.context import current_request, events_queue
from netbox.models.features import ChangeLoggingMixin from netbox.models.features import ChangeLoggingMixin
@ -22,6 +22,27 @@ from utilities.exceptions import AbortRequest
from .choices import ObjectChangeActionChoices from .choices import ObjectChangeActionChoices
from .events import enqueue_object, get_snapshots, serialize_for_event from .events import enqueue_object, get_snapshots, serialize_for_event
from .models import CustomField, ObjectChange, TaggedItem from .models import CustomField, ObjectChange, TaggedItem
from .validators import CustomValidator
def run_validators(instance, validators):
"""
Run the provided iterable of validators for the instance.
"""
request = current_request.get()
for validator in validators:
# Loading a validator class by dotted path
if type(validator) is str:
module, cls = validator.rsplit('.', 1)
validator = getattr(importlib.import_module(module), cls)()
# Constructing a new instance on the fly from a ruleset
elif type(validator) is dict:
validator = CustomValidator(validator)
validator(instance, request)
# #
# Change logging/webhooks # Change logging/webhooks

View File

@ -7,7 +7,9 @@ from ipam.models import ASN, RIR
from dcim.choices import SiteStatusChoices from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
from extras.validators import CustomValidator from extras.validators import CustomValidator
from users.models import User
from utilities.exceptions import AbortRequest from utilities.exceptions import AbortRequest
from utilities.utils import NetBoxFakeRequest
class MyValidator(CustomValidator): class MyValidator(CustomValidator):
@ -79,6 +81,13 @@ prohibited_validator = CustomValidator({
} }
}) })
request_validator = CustomValidator({
'request.user.username': {
'eq': 'Bob'
}
})
custom_validator = MyValidator() custom_validator = MyValidator()
@ -154,6 +163,28 @@ class CustomValidatorTest(TestCase):
def test_custom_valid(self): def test_custom_valid(self):
Site(name='foo', slug='foo').clean() Site(name='foo', slug='foo').clean()
@override_settings(CUSTOM_VALIDATORS={'dcim.site': [request_validator]})
def test_request_validation(self):
alice = User.objects.create(username='Alice')
bob = User.objects.create(username='Bob')
request = NetBoxFakeRequest({
'META': {},
'POST': {},
'GET': {},
'FILES': {},
'user': alice,
'path': '',
})
site = Site(name='abc', slug='abc')
# Attempt to create the Site as Alice
with self.assertRaises(ValidationError):
request_validator(site, request)
# Creating the Site as Bob should succeed
request.user = bob
request_validator(site, request)
class CustomValidatorConfigTest(TestCase): class CustomValidatorConfigTest(TestCase):

View File

@ -1,4 +1,5 @@
import importlib import inspect
import operator
from django.core import validators from django.core import validators
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
@ -74,6 +75,8 @@ class CustomValidator:
:param validation_rules: A dictionary mapping object attributes to validation rules :param validation_rules: A dictionary mapping object attributes to validation rules
""" """
REQUEST_TOKEN = 'request'
VALIDATORS = { VALIDATORS = {
'eq': IsEqualValidator, 'eq': IsEqualValidator,
'neq': IsNotEqualValidator, 'neq': IsNotEqualValidator,
@ -88,25 +91,56 @@ class CustomValidator:
def __init__(self, validation_rules=None): def __init__(self, validation_rules=None):
self.validation_rules = validation_rules or {} self.validation_rules = validation_rules or {}
assert type(self.validation_rules) is dict, "Validation rules must be passed as a dictionary" if type(self.validation_rules) is not dict:
raise ValueError(_("Validation rules must be passed as a dictionary"))
def __call__(self, instance): def __call__(self, instance, request=None):
# Validate instance attributes per validation rules """
for attr_name, rules in self.validation_rules.items(): Validate the instance and (optional) request against the validation rule(s).
attr = self._getattr(instance, attr_name) """
for attr_path, rules in self.validation_rules.items():
# The rule applies to the current request
if attr_path.split('.')[0] == self.REQUEST_TOKEN:
# Skip if no request has been provided (we can't validate)
if request is None:
continue
attr = self._get_request_attr(request, attr_path)
# The rule applies to the instance
else:
attr = self._get_instance_attr(instance, attr_path)
# Validate the attribute's value against each of the rules defined for it
for descriptor, value in rules.items(): for descriptor, value in rules.items():
validator = self.get_validator(descriptor, value) validator = self.get_validator(descriptor, value)
try: try:
validator(attr) validator(attr)
except ValidationError as exc: except ValidationError as exc:
# Re-package the raised ValidationError to associate it with the specific attr raise ValidationError(
raise ValidationError({attr_name: exc}) _("Custom validation failed for {attribute}: {exception}").format(
attribute=attr_path, exception=exc
)
)
# Execute custom validation logic (if any) # Execute custom validation logic (if any)
self.validate(instance) # TODO: Remove in v4.1
# Inspect the validate() method, which may have been overridden, to determine
# whether we should pass the request (maintains backward compatibility for pre-v4.0)
if 'request' in inspect.signature(self.validate).parameters:
self.validate(instance, request)
else:
self.validate(instance)
@staticmethod @staticmethod
def _getattr(instance, name): def _get_request_attr(request, name):
name = name.split('.', maxsplit=1)[1] # Remove token
try:
return operator.attrgetter(name)(request)
except AttributeError:
raise ValidationError(_('Invalid attribute "{name}" for request').format(name=name))
@staticmethod
def _get_instance_attr(instance, name):
# Attempt to resolve many-to-many fields to their stored values # Attempt to resolve many-to-many fields to their stored values
m2m_fields = [f.name for f in instance._meta.local_many_to_many] m2m_fields = [f.name for f in instance._meta.local_many_to_many]
if name in m2m_fields: if name in m2m_fields:
@ -137,7 +171,7 @@ class CustomValidator:
validator_cls = self.VALIDATORS.get(descriptor) validator_cls = self.VALIDATORS.get(descriptor)
return validator_cls(value) return validator_cls(value)
def validate(self, instance): def validate(self, instance, request):
""" """
Custom validation method, to be overridden by the user. Validation failures should Custom validation method, to be overridden by the user. Validation failures should
raise a ValidationError exception. raise a ValidationError exception.
@ -151,21 +185,3 @@ class CustomValidator:
if field is not None: if field is not None:
raise ValidationError({field: message}) raise ValidationError({field: message})
raise ValidationError(message) raise ValidationError(message)
def run_validators(instance, validators):
"""
Run the provided iterable of validators for the instance.
"""
for validator in validators:
# Loading a validator class by dotted path
if type(validator) is str:
module, cls = validator.rsplit('.', 1)
validator = getattr(importlib.import_module(module), cls)()
# Constructing a new instance on the fly from a ruleset
elif type(validator) is dict:
validator = CustomValidator(validator)
validator(instance)