Add object permission support, tests for bulk import/edit/delete views

This commit is contained in:
Jeremy Stretch 2020-05-21 11:49:50 -04:00
parent a928d337d9
commit 5486cff441
3 changed files with 183 additions and 14 deletions

View File

@ -605,14 +605,15 @@ class PrefixDeleteView(ObjectPermissionRequiredMixin, ObjectDeleteView):
default_return_url = 'ipam:prefix_list'
class PrefixBulkImportView(PermissionRequiredMixin, BulkImportView):
class PrefixBulkImportView(ObjectPermissionRequiredMixin, BulkImportView):
permission_required = 'ipam.add_prefix'
queryset = Prefix.objects.all()
model_form = forms.PrefixCSVForm
table = tables.PrefixTable
default_return_url = 'ipam:prefix_list'
class PrefixBulkEditView(PermissionRequiredMixin, BulkEditView):
class PrefixBulkEditView(ObjectPermissionRequiredMixin, BulkEditView):
permission_required = 'ipam.change_prefix'
queryset = Prefix.objects.prefetch_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role')
filterset = filters.PrefixFilterSet
@ -621,7 +622,7 @@ class PrefixBulkEditView(PermissionRequiredMixin, BulkEditView):
default_return_url = 'ipam:prefix_list'
class PrefixBulkDeleteView(PermissionRequiredMixin, BulkDeleteView):
class PrefixBulkDeleteView(ObjectPermissionRequiredMixin, BulkDeleteView):
permission_required = 'ipam.delete_prefix'
queryset = Prefix.objects.prefetch_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role')
filterset = filters.PrefixFilterSet

View File

@ -342,6 +342,14 @@ class ObjectPermissionViewTestCase(TestCase):
'confirm': True
}
# Attempt to delete object without permission
request = {
'path': reverse('ipam:prefix_delete', kwargs={'pk': self.prefixes[0].pk}),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 403)
# Assign object permission
obj_perm = ObjectPermission(
model=ContentType.objects.get_for_model(Prefix),
@ -372,6 +380,147 @@ class ObjectPermissionViewTestCase(TestCase):
self.assertHttpStatus(response, 404)
self.assertTrue(Prefix.objects.filter(pk=self.prefixes[3].pk).exists())
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_import_objects(self):
initial_count = Prefix.objects.count()
form_data = {
'csv': "prefix,status,site\n"
"10.0.9.0/24,Active,Site 1\n"
"10.0.10.0/24,Active,Site 2\n"
"10.0.11.0/24,Active,Site 3\n",
}
# Attempt to import objects without permission
request = {
'path': reverse('ipam:prefix_import'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 403)
self.assertEqual(initial_count, Prefix.objects.count())
# Assign object permission
obj_perm = ObjectPermission(
model=ContentType.objects.get_for_model(Prefix),
attrs={'site__name': 'Site 1'},
can_add=True
)
obj_perm.save()
obj_perm.users.add(self.user)
# Attempt to create non-permitted objects
request = {
'path': reverse('ipam:prefix_import'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 200)
self.assertEqual(Prefix.objects.count(), initial_count)
# Create a permitted object
form_data = {
'csv': "prefix,status,site\n"
"10.0.9.0/24,Active,Site 1\n"
"10.0.10.0/24,Active,Site 1\n"
"10.0.11.0/24,Active,Site 1\n",
}
request = {
'path': reverse('ipam:prefix_import'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 200)
self.assertEqual(Prefix.objects.count(), initial_count + 3)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_edit_objects(self):
form_data = {
'pk': [p.pk for p in self.prefixes],
'status': 'reserved',
'_apply': True,
}
# Attempt to edit objects without permission
request = {
'path': reverse('ipam:prefix_bulk_edit'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 403)
# Assign object permission
obj_perm = ObjectPermission(
model=ContentType.objects.get_for_model(Prefix),
attrs={'site__name': 'Site 1'},
can_change=True
)
obj_perm.save()
obj_perm.users.add(self.user)
# Attempt to edit non-permitted objects
request = {
'path': reverse('ipam:prefix_bulk_edit'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 200)
self.assertEqual(Prefix.objects.get(pk=self.prefixes[3].pk).status, 'active')
# Edit permitted objects
form_data['pk'] = [p.pk for p in self.prefixes[:3]]
request = {
'path': reverse('ipam:prefix_bulk_edit'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
self.assertEqual(Prefix.objects.get(pk=self.prefixes[0].pk).status, 'reserved')
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects(self):
form_data = {
'pk': [p.pk for p in self.prefixes],
'confirm': True,
'_confirm': True,
}
# Attempt to delete objects without permission
request = {
'path': reverse('ipam:prefix_bulk_delete'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 403)
# Assign object permission
obj_perm = ObjectPermission(
model=ContentType.objects.get_for_model(Prefix),
attrs={'site__name': 'Site 1'},
can_view=True,
can_delete=True
)
obj_perm.save()
obj_perm.users.add(self.user)
# Attempt to delete non-permitted object
request = {
'path': reverse('ipam:prefix_bulk_delete'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 200)
self.assertTrue(Prefix.objects.filter(pk=self.prefixes[3].pk).exists())
# Delete permitted objects
form_data['pk'] = [p.pk for p in self.prefixes[:3]]
request = {
'path': reverse('ipam:prefix_bulk_delete'),
'data': form_data,
}
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
self.assertFalse(Prefix.objects.filter(pk=self.prefixes[0].pk).exists())
class ObjectPermissionAPIViewTestCase(TestCase):
client_class = APIClient

View File

@ -297,9 +297,9 @@ class ObjectEditView(GetReturnURLMixin, View):
return redirect(self.get_return_url(request, obj))
except ObjectDoesNotExist:
logger.debug("Object save failed due to object-level permissions violation")
# TODO: Link user to personal permissions view
form.add_error(None, "Object save failed due to object-level permissions violation")
msg = "Object save failed due to object-level permissions violation"
logger.debug(msg)
form.add_error(None, msg)
else:
logger.debug("Form validation failed")
@ -576,11 +576,13 @@ class BulkImportView(GetReturnURLMixin, View):
"""
Import objects in bulk (CSV format).
model_form: The form used to create each imported object
table: The django-tables2 Table used to render the list of imported objects
template_name: The name of the template
widget_attrs: A dict of attributes to apply to the import widget (e.g. to require a session key)
:param queryset: Base queryset for the model
:param model_form: The form used to create each imported object
:param table: The django-tables2 Table used to render the list of imported objects
:param template_name: The name of the template
:param widget_attrs: A dict of attributes to apply to the import widget (e.g. to require a session key)
"""
queryset = None
model_form = None
table = None
template_name = 'utilities/obj_bulk_import.html'
@ -634,6 +636,10 @@ class BulkImportView(GetReturnURLMixin, View):
form.add_error('csv', "Row {} {}: {}".format(row, field, err[0]))
raise ValidationError("")
# Enforce object-level permissions
if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs):
raise ObjectDoesNotExist
# Compile a table containing the imported objects
obj_table = self.table(new_objs)
@ -650,6 +656,11 @@ class BulkImportView(GetReturnURLMixin, View):
except ValidationError:
pass
except ObjectDoesNotExist:
msg = "Object import failed due to object-level permissions violation"
logger.debug(msg)
form.add_error(None, msg)
else:
logger.debug("Form validation failed")
@ -707,7 +718,7 @@ class BulkEditView(GetReturnURLMixin, View):
with transaction.atomic():
updated_count = 0
updated_objects = []
for obj in model.objects.filter(pk__in=form.cleaned_data['pk']):
# Update standard fields. If a field is listed in _nullify, delete its value.
@ -736,6 +747,7 @@ class BulkEditView(GetReturnURLMixin, View):
obj.full_clean()
obj.save()
updated_objects.append(obj)
logger.debug(f"Saved {obj} (PK: {obj.pk})")
# Update custom fields
@ -765,10 +777,12 @@ class BulkEditView(GetReturnURLMixin, View):
if form.cleaned_data.get('remove_tags', None):
obj.tags.remove(*form.cleaned_data['remove_tags'])
updated_count += 1
# Enforce object-level permissions
if self.queryset.filter(pk__in=[obj.pk for obj in updated_objects]).count() != len(updated_objects):
raise ObjectDoesNotExist
if updated_count:
msg = 'Updated {} {}'.format(updated_count, model._meta.verbose_name_plural)
if updated_objects:
msg = 'Updated {} {}'.format(len(updated_objects), model._meta.verbose_name_plural)
logger.info(msg)
messages.success(self.request, msg)
@ -777,6 +791,11 @@ class BulkEditView(GetReturnURLMixin, View):
except ValidationError as e:
messages.error(self.request, "{} failed validation: {}".format(obj, e))
except ObjectDoesNotExist:
msg = "Object update failed due to object-level permissions violation"
logger.debug(msg)
form.add_error(None, msg)
else:
logger.debug("Form validation failed")