From a928d337d902ee72bd4a1e5127fde6c0e9c4694b Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Thu, 21 May 2020 10:51:40 -0400 Subject: [PATCH] Add object permission support for create/update/delete API views --- netbox/netbox/tests/test_authentication.py | 127 +++++++++++++++------ netbox/utilities/api.py | 67 +++++++---- 2 files changed, 138 insertions(+), 56 deletions(-) diff --git a/netbox/netbox/tests/test_authentication.py b/netbox/netbox/tests/test_authentication.py index 64dd83783..03d0a1dc3 100644 --- a/netbox/netbox/tests/test_authentication.py +++ b/netbox/netbox/tests/test_authentication.py @@ -460,35 +460,98 @@ class ObjectPermissionAPIViewTestCase(TestCase): self.assertEqual(response.status_code, 200) self.assertEqual(response.data['count'], 3) - # TODO - # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - # def test_create_object(self): - # url = reverse('ipam-api:prefix-list') - # data = { - # 'prefix': '10.0.9.0/24', - # 'site': self.sites[1].pk, - # } - # initial_count = Prefix.objects.count() - # - # # Attempt to create an object without permission - # response = self.client.post(url, data, format='json', **self.header) - # self.assertEqual(response.status_code, 403) - # - # # Assign object permission - # obj_perm = ObjectPermission( - # model=ContentType.objects.get_for_model(Prefix), - # attrs={'site__name': 'Site 1'}, - # can_view=True - # ) - # obj_perm.save() - # obj_perm.users.add(self.user) - # - # # Attempt to create a non-permitted object - # response = self.client.post(url, data, format='json', **self.header) - # self.assertEqual(response.status_code, 403) - # self.assertEqual(Prefix.objects.count(), initial_count) - # - # # Create a permitted object - # response = self.client.post(url, data, format='json', **self.header) - # self.assertEqual(response.status_code, 200) - # self.assertEqual(Prefix.objects.count(), initial_count + 1) + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_create_object(self): + url = reverse('ipam-api:prefix-list') + data = { + 'prefix': '10.0.9.0/24', + 'site': self.sites[1].pk, + } + initial_count = Prefix.objects.count() + + # Attempt to create an object without permission + response = self.client.post(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={'site__name': 'Site 1'}, + can_add=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Attempt to create a non-permitted object + response = self.client.post(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 403) + self.assertEqual(Prefix.objects.count(), initial_count) + + # Create a permitted object + data['site'] = self.sites[0].pk + response = self.client.post(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 201) + self.assertEqual(Prefix.objects.count(), initial_count + 1) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_edit_object(self): + + # Attempt to edit an object without permission + data = {'site': self.sites[0].pk} + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.patch(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={'site__name': 'Site 1'}, + can_change=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Attempt to edit a non-permitted object + data = {'site': self.sites[0].pk} + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk}) + response = self.client.patch(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 404) + + # Edit a permitted object + data['status'] = 'reserved' + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.patch(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 200) + + # Attempt to modify a permitted object to a non-permitted object + data['site'] = self.sites[1].pk + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.patch(url, data, format='json', **self.header) + self.assertEqual(response.status_code, 403) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_delete_object(self): + + # Attempt to delete an object without permission + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.delete(url, format='json', **self.header) + self.assertEqual(response.status_code, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={'site__name': 'Site 1'}, + can_delete=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Attempt to delete a non-permitted object + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk}) + response = self.client.delete(url, format='json', **self.header) + self.assertEqual(response.status_code, 404) + + # Delete a permitted object + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.delete(url, format='json', **self.header) + self.assertEqual(response.status_code, 204) diff --git a/netbox/utilities/api.py b/netbox/utilities/api.py index 9ec587369..745f812ff 100644 --- a/netbox/utilities/api.py +++ b/netbox/utilities/api.py @@ -4,7 +4,8 @@ from collections import OrderedDict import pytz from django.conf import settings from django.contrib.contenttypes.models import ContentType -from django.core.exceptions import FieldError, MultipleObjectsReturned, ObjectDoesNotExist +from django.core.exceptions import FieldError, MultipleObjectsReturned, ObjectDoesNotExist, PermissionDenied +from django.db import transaction from django.db.models import ManyToManyField, ProtectedError from django.urls import reverse from rest_framework.exceptions import APIException @@ -14,6 +15,7 @@ from rest_framework.response import Response from rest_framework.serializers import Field, ModelSerializer, ValidationError from rest_framework.viewsets import ModelViewSet as _ModelViewSet +from netbox.api import TokenPermissions from users.models import ObjectPermission from .utils import dict_to_filter_params, dynamic_import @@ -329,11 +331,13 @@ class ModelViewSet(_ModelViewSet): if not request.user.is_authenticated or request.user.is_superuser: return - # Determine the required permission - permission_required = "{}.view_{}".format( - self.queryset.model._meta.app_label, - self.queryset.model._meta.model_name - ) + # TODO: Move this to a cleaner function + # Determine the required permission based on the request method + kwargs = { + 'app_label': self.queryset.model._meta.app_label, + 'model_name': self.queryset.model._meta.model_name + } + permission_required = TokenPermissions.perms_map[request.method][0] % kwargs # Enforce object-level permissions if permission_required not in {*request.user._user_perm_cache, *request.user._group_perm_cache}: @@ -361,34 +365,49 @@ class ModelViewSet(_ModelViewSet): **kwargs ) - def list(self, *args, **kwargs): + def _validate_objects(self, instance): """ - Call to super to allow for caching + Check that the provided instance or list of instances are matched by the current queryset. This confirms that + any newly created or modified objects abide by the attributes granted by any applicable ObjectPermissions. """ - return super().list(*args, **kwargs) - - def retrieve(self, *args, **kwargs): - """ - Call to super to allow for caching - """ - return super().retrieve(*args, **kwargs) - - # - # Logging - # + if type(instance) is list: + # Check that all instances are still included in the view's queryset + conforming_count = self.queryset.filter(pk__in=[obj.pk for obj in instance]).count() + if conforming_count != len(instance): + raise ObjectDoesNotExist + else: + # Check that the instance is matched by the view's queryset + self.queryset.get(pk=instance.pk) def perform_create(self, serializer): - model = serializer.child.Meta.model if hasattr(serializer, 'many') else serializer.Meta.model + model = self.queryset.model logger = logging.getLogger('netbox.api.views.ModelViewSet') logger.info(f"Creating new {model._meta.verbose_name}") - return super().perform_create(serializer) + + # Enforce object-level permissions on save() + try: + with transaction.atomic(): + instance = serializer.save() + self._validate_objects(instance) + except ObjectDoesNotExist: + raise PermissionDenied() def perform_update(self, serializer): + model = self.queryset.model logger = logging.getLogger('netbox.api.views.ModelViewSet') - logger.info(f"Updating {serializer.instance} (PK: {serializer.instance.pk})") - return super().perform_update(serializer) + logger.info(f"Updating {model._meta.verbose_name} {serializer.instance} (PK: {serializer.instance.pk})") + + # Enforce object-level permissions on save() + try: + with transaction.atomic(): + instance = serializer.save() + self._validate_objects(instance) + except ObjectDoesNotExist: + raise PermissionDenied() def perform_destroy(self, instance): + model = self.queryset.model logger = logging.getLogger('netbox.api.views.ModelViewSet') - logger.info(f"Deleting {instance} (PK: {instance.pk})") + logger.info(f"Deleting {model._meta.verbose_name} {instance} (PK: {instance.pk})") + return super().perform_destroy(instance)