Add object permission support for create/update/delete API views

This commit is contained in:
Jeremy Stretch 2020-05-21 10:51:40 -04:00
parent fa8407371b
commit a928d337d9
2 changed files with 138 additions and 56 deletions

View File

@ -460,35 +460,98 @@ class ObjectPermissionAPIViewTestCase(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['count'], 3) self.assertEqual(response.data['count'], 3)
# TODO @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
# @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_create_object(self):
# def test_create_object(self): url = reverse('ipam-api:prefix-list')
# url = reverse('ipam-api:prefix-list') data = {
# data = { 'prefix': '10.0.9.0/24',
# 'prefix': '10.0.9.0/24', 'site': self.sites[1].pk,
# 'site': self.sites[1].pk, }
# } initial_count = Prefix.objects.count()
# initial_count = Prefix.objects.count()
# # Attempt to create an object without permission
# # Attempt to create an object without permission response = self.client.post(url, data, format='json', **self.header)
# response = self.client.post(url, data, format='json', **self.header) self.assertEqual(response.status_code, 403)
# self.assertEqual(response.status_code, 403)
# # Assign object permission
# # Assign object permission obj_perm = ObjectPermission(
# obj_perm = ObjectPermission( model=ContentType.objects.get_for_model(Prefix),
# model=ContentType.objects.get_for_model(Prefix), attrs={'site__name': 'Site 1'},
# attrs={'site__name': 'Site 1'}, can_add=True
# can_view=True )
# ) obj_perm.save()
# obj_perm.save() obj_perm.users.add(self.user)
# obj_perm.users.add(self.user)
# # Attempt to create a non-permitted object
# # Attempt to create a non-permitted object response = self.client.post(url, data, format='json', **self.header)
# response = self.client.post(url, data, format='json', **self.header) self.assertEqual(response.status_code, 403)
# self.assertEqual(response.status_code, 403) self.assertEqual(Prefix.objects.count(), initial_count)
# self.assertEqual(Prefix.objects.count(), initial_count)
# # Create a permitted object
# # Create a permitted object data['site'] = self.sites[0].pk
# response = self.client.post(url, data, format='json', **self.header) response = self.client.post(url, data, format='json', **self.header)
# self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 201)
# self.assertEqual(Prefix.objects.count(), initial_count + 1) self.assertEqual(Prefix.objects.count(), initial_count + 1)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_edit_object(self):
# Attempt to edit an object without permission
data = {'site': self.sites[0].pk}
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
response = self.client.patch(url, data, format='json', **self.header)
self.assertEqual(response.status_code, 403)
# Assign object permission
obj_perm = ObjectPermission(
model=ContentType.objects.get_for_model(Prefix),
attrs={'site__name': 'Site 1'},
can_change=True
)
obj_perm.save()
obj_perm.users.add(self.user)
# Attempt to edit a non-permitted object
data = {'site': self.sites[0].pk}
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})
response = self.client.patch(url, data, format='json', **self.header)
self.assertEqual(response.status_code, 404)
# Edit a permitted object
data['status'] = 'reserved'
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
response = self.client.patch(url, data, format='json', **self.header)
self.assertEqual(response.status_code, 200)
# Attempt to modify a permitted object to a non-permitted object
data['site'] = self.sites[1].pk
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
response = self.client.patch(url, data, format='json', **self.header)
self.assertEqual(response.status_code, 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_delete_object(self):
# Attempt to delete an object without permission
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
response = self.client.delete(url, format='json', **self.header)
self.assertEqual(response.status_code, 403)
# Assign object permission
obj_perm = ObjectPermission(
model=ContentType.objects.get_for_model(Prefix),
attrs={'site__name': 'Site 1'},
can_delete=True
)
obj_perm.save()
obj_perm.users.add(self.user)
# Attempt to delete a non-permitted object
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})
response = self.client.delete(url, format='json', **self.header)
self.assertEqual(response.status_code, 404)
# Delete a permitted object
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
response = self.client.delete(url, format='json', **self.header)
self.assertEqual(response.status_code, 204)

View File

@ -4,7 +4,8 @@ from collections import OrderedDict
import pytz import pytz
from django.conf import settings from django.conf import settings
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import FieldError, MultipleObjectsReturned, ObjectDoesNotExist from django.core.exceptions import FieldError, MultipleObjectsReturned, ObjectDoesNotExist, PermissionDenied
from django.db import transaction
from django.db.models import ManyToManyField, ProtectedError from django.db.models import ManyToManyField, ProtectedError
from django.urls import reverse from django.urls import reverse
from rest_framework.exceptions import APIException from rest_framework.exceptions import APIException
@ -14,6 +15,7 @@ from rest_framework.response import Response
from rest_framework.serializers import Field, ModelSerializer, ValidationError from rest_framework.serializers import Field, ModelSerializer, ValidationError
from rest_framework.viewsets import ModelViewSet as _ModelViewSet from rest_framework.viewsets import ModelViewSet as _ModelViewSet
from netbox.api import TokenPermissions
from users.models import ObjectPermission from users.models import ObjectPermission
from .utils import dict_to_filter_params, dynamic_import from .utils import dict_to_filter_params, dynamic_import
@ -329,11 +331,13 @@ class ModelViewSet(_ModelViewSet):
if not request.user.is_authenticated or request.user.is_superuser: if not request.user.is_authenticated or request.user.is_superuser:
return return
# Determine the required permission # TODO: Move this to a cleaner function
permission_required = "{}.view_{}".format( # Determine the required permission based on the request method
self.queryset.model._meta.app_label, kwargs = {
self.queryset.model._meta.model_name 'app_label': self.queryset.model._meta.app_label,
) 'model_name': self.queryset.model._meta.model_name
}
permission_required = TokenPermissions.perms_map[request.method][0] % kwargs
# Enforce object-level permissions # Enforce object-level permissions
if permission_required not in {*request.user._user_perm_cache, *request.user._group_perm_cache}: if permission_required not in {*request.user._user_perm_cache, *request.user._group_perm_cache}:
@ -361,34 +365,49 @@ class ModelViewSet(_ModelViewSet):
**kwargs **kwargs
) )
def list(self, *args, **kwargs): def _validate_objects(self, instance):
""" """
Call to super to allow for caching Check that the provided instance or list of instances are matched by the current queryset. This confirms that
any newly created or modified objects abide by the attributes granted by any applicable ObjectPermissions.
""" """
return super().list(*args, **kwargs) if type(instance) is list:
# Check that all instances are still included in the view's queryset
def retrieve(self, *args, **kwargs): conforming_count = self.queryset.filter(pk__in=[obj.pk for obj in instance]).count()
""" if conforming_count != len(instance):
Call to super to allow for caching raise ObjectDoesNotExist
""" else:
return super().retrieve(*args, **kwargs) # Check that the instance is matched by the view's queryset
self.queryset.get(pk=instance.pk)
#
# Logging
#
def perform_create(self, serializer): def perform_create(self, serializer):
model = serializer.child.Meta.model if hasattr(serializer, 'many') else serializer.Meta.model model = self.queryset.model
logger = logging.getLogger('netbox.api.views.ModelViewSet') logger = logging.getLogger('netbox.api.views.ModelViewSet')
logger.info(f"Creating new {model._meta.verbose_name}") logger.info(f"Creating new {model._meta.verbose_name}")
return super().perform_create(serializer)
# Enforce object-level permissions on save()
try:
with transaction.atomic():
instance = serializer.save()
self._validate_objects(instance)
except ObjectDoesNotExist:
raise PermissionDenied()
def perform_update(self, serializer): def perform_update(self, serializer):
model = self.queryset.model
logger = logging.getLogger('netbox.api.views.ModelViewSet') logger = logging.getLogger('netbox.api.views.ModelViewSet')
logger.info(f"Updating {serializer.instance} (PK: {serializer.instance.pk})") logger.info(f"Updating {model._meta.verbose_name} {serializer.instance} (PK: {serializer.instance.pk})")
return super().perform_update(serializer)
# Enforce object-level permissions on save()
try:
with transaction.atomic():
instance = serializer.save()
self._validate_objects(instance)
except ObjectDoesNotExist:
raise PermissionDenied()
def perform_destroy(self, instance): def perform_destroy(self, instance):
model = self.queryset.model
logger = logging.getLogger('netbox.api.views.ModelViewSet') logger = logging.getLogger('netbox.api.views.ModelViewSet')
logger.info(f"Deleting {instance} (PK: {instance.pk})") logger.info(f"Deleting {model._meta.verbose_name} {instance} (PK: {instance.pk})")
return super().perform_destroy(instance) return super().perform_destroy(instance)