From 4ea1251d8201ca2fe56d73e6a8f0a3ef3ace1e2b Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Wed, 20 Mar 2024 14:18:54 -0400 Subject: [PATCH] Closes #14279: Pass current request to custom validators --- netbox/extras/signals.py | 23 +++++- netbox/extras/tests/test_customvalidation.py | 31 ++++++++ netbox/extras/validators.py | 74 ++++++++++++-------- 3 files changed, 98 insertions(+), 30 deletions(-) diff --git a/netbox/extras/signals.py b/netbox/extras/signals.py index 833ce0036..ed3da7bfd 100644 --- a/netbox/extras/signals.py +++ b/netbox/extras/signals.py @@ -1,3 +1,4 @@ +import importlib import logging from django.contrib.contenttypes.models import ContentType @@ -13,7 +14,6 @@ from core.signals import job_end, job_start from extras.constants import EVENT_JOB_END, EVENT_JOB_START from extras.events import process_event_rules from extras.models import EventRule -from extras.validators import run_validators from netbox.config import get_config from netbox.context import current_request, events_queue from netbox.models.features import ChangeLoggingMixin @@ -22,6 +22,27 @@ from utilities.exceptions import AbortRequest from .choices import ObjectChangeActionChoices from .events import enqueue_object, get_snapshots, serialize_for_event from .models import CustomField, ObjectChange, TaggedItem +from .validators import CustomValidator + + +def run_validators(instance, validators): + """ + Run the provided iterable of validators for the instance. + """ + request = current_request.get() + for validator in validators: + + # Loading a validator class by dotted path + if type(validator) is str: + module, cls = validator.rsplit('.', 1) + validator = getattr(importlib.import_module(module), cls)() + + # Constructing a new instance on the fly from a ruleset + elif type(validator) is dict: + validator = CustomValidator(validator) + + validator(instance, request) + # # Change logging/webhooks diff --git a/netbox/extras/tests/test_customvalidation.py b/netbox/extras/tests/test_customvalidation.py index d74ad599b..a5b321b08 100644 --- a/netbox/extras/tests/test_customvalidation.py +++ b/netbox/extras/tests/test_customvalidation.py @@ -7,7 +7,9 @@ from ipam.models import ASN, RIR from dcim.choices import SiteStatusChoices from dcim.models import Site from extras.validators import CustomValidator +from users.models import User from utilities.exceptions import AbortRequest +from utilities.utils import NetBoxFakeRequest class MyValidator(CustomValidator): @@ -79,6 +81,13 @@ prohibited_validator = CustomValidator({ } }) + +request_validator = CustomValidator({ + 'request.user.username': { + 'eq': 'Bob' + } +}) + custom_validator = MyValidator() @@ -154,6 +163,28 @@ class CustomValidatorTest(TestCase): def test_custom_valid(self): Site(name='foo', slug='foo').clean() + @override_settings(CUSTOM_VALIDATORS={'dcim.site': [request_validator]}) + def test_request_validation(self): + alice = User.objects.create(username='Alice') + bob = User.objects.create(username='Bob') + request = NetBoxFakeRequest({ + 'META': {}, + 'POST': {}, + 'GET': {}, + 'FILES': {}, + 'user': alice, + 'path': '', + }) + site = Site(name='abc', slug='abc') + + # Attempt to create the Site as Alice + with self.assertRaises(ValidationError): + request_validator(site, request) + + # Creating the Site as Bob should succeed + request.user = bob + request_validator(site, request) + class CustomValidatorConfigTest(TestCase): diff --git a/netbox/extras/validators.py b/netbox/extras/validators.py index 30c9397d5..8d91ca66b 100644 --- a/netbox/extras/validators.py +++ b/netbox/extras/validators.py @@ -1,4 +1,5 @@ -import importlib +import inspect +import operator from django.core import validators from django.core.exceptions import ValidationError @@ -74,6 +75,8 @@ class CustomValidator: :param validation_rules: A dictionary mapping object attributes to validation rules """ + REQUEST_TOKEN = 'request' + VALIDATORS = { 'eq': IsEqualValidator, 'neq': IsNotEqualValidator, @@ -88,25 +91,56 @@ class CustomValidator: def __init__(self, validation_rules=None): self.validation_rules = validation_rules or {} - assert type(self.validation_rules) is dict, "Validation rules must be passed as a dictionary" + if type(self.validation_rules) is not dict: + raise ValueError(_("Validation rules must be passed as a dictionary")) - def __call__(self, instance): - # Validate instance attributes per validation rules - for attr_name, rules in self.validation_rules.items(): - attr = self._getattr(instance, attr_name) + def __call__(self, instance, request=None): + """ + Validate the instance and (optional) request against the validation rule(s). + """ + for attr_path, rules in self.validation_rules.items(): + + # The rule applies to the current request + if attr_path.split('.')[0] == self.REQUEST_TOKEN: + # Skip if no request has been provided (we can't validate) + if request is None: + continue + attr = self._get_request_attr(request, attr_path) + # The rule applies to the instance + else: + attr = self._get_instance_attr(instance, attr_path) + + # Validate the attribute's value against each of the rules defined for it for descriptor, value in rules.items(): validator = self.get_validator(descriptor, value) try: validator(attr) except ValidationError as exc: - # Re-package the raised ValidationError to associate it with the specific attr - raise ValidationError({attr_name: exc}) + raise ValidationError( + _("Custom validation failed for {attribute}: {exception}").format( + attribute=attr_path, exception=exc + ) + ) # Execute custom validation logic (if any) - self.validate(instance) + # TODO: Remove in v4.1 + # Inspect the validate() method, which may have been overridden, to determine + # whether we should pass the request (maintains backward compatibility for pre-v4.0) + if 'request' in inspect.signature(self.validate).parameters: + self.validate(instance, request) + else: + self.validate(instance) @staticmethod - def _getattr(instance, name): + def _get_request_attr(request, name): + name = name.split('.', maxsplit=1)[1] # Remove token + try: + return operator.attrgetter(name)(request) + except AttributeError: + raise ValidationError(_('Invalid attribute "{name}" for request').format(name=name)) + + @staticmethod + def _get_instance_attr(instance, name): # Attempt to resolve many-to-many fields to their stored values m2m_fields = [f.name for f in instance._meta.local_many_to_many] if name in m2m_fields: @@ -137,7 +171,7 @@ class CustomValidator: validator_cls = self.VALIDATORS.get(descriptor) return validator_cls(value) - def validate(self, instance): + def validate(self, instance, request): """ Custom validation method, to be overridden by the user. Validation failures should raise a ValidationError exception. @@ -151,21 +185,3 @@ class CustomValidator: if field is not None: raise ValidationError({field: message}) raise ValidationError(message) - - -def run_validators(instance, validators): - """ - Run the provided iterable of validators for the instance. - """ - for validator in validators: - - # Loading a validator class by dotted path - if type(validator) is str: - module, cls = validator.rsplit('.', 1) - validator = getattr(importlib.import_module(module), cls)() - - # Constructing a new instance on the fly from a ruleset - elif type(validator) is dict: - validator = CustomValidator(validator) - - validator(instance)