diff --git a/netbox/ipam/views.py b/netbox/ipam/views.py index 0c7d0770f..ace85bc1a 100644 --- a/netbox/ipam/views.py +++ b/netbox/ipam/views.py @@ -605,14 +605,15 @@ class PrefixDeleteView(ObjectPermissionRequiredMixin, ObjectDeleteView): default_return_url = 'ipam:prefix_list' -class PrefixBulkImportView(PermissionRequiredMixin, BulkImportView): +class PrefixBulkImportView(ObjectPermissionRequiredMixin, BulkImportView): permission_required = 'ipam.add_prefix' + queryset = Prefix.objects.all() model_form = forms.PrefixCSVForm table = tables.PrefixTable default_return_url = 'ipam:prefix_list' -class PrefixBulkEditView(PermissionRequiredMixin, BulkEditView): +class PrefixBulkEditView(ObjectPermissionRequiredMixin, BulkEditView): permission_required = 'ipam.change_prefix' queryset = Prefix.objects.prefetch_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role') filterset = filters.PrefixFilterSet @@ -621,7 +622,7 @@ class PrefixBulkEditView(PermissionRequiredMixin, BulkEditView): default_return_url = 'ipam:prefix_list' -class PrefixBulkDeleteView(PermissionRequiredMixin, BulkDeleteView): +class PrefixBulkDeleteView(ObjectPermissionRequiredMixin, BulkDeleteView): permission_required = 'ipam.delete_prefix' queryset = Prefix.objects.prefetch_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role') filterset = filters.PrefixFilterSet diff --git a/netbox/netbox/tests/test_authentication.py b/netbox/netbox/tests/test_authentication.py index 03d0a1dc3..d82ef6752 100644 --- a/netbox/netbox/tests/test_authentication.py +++ b/netbox/netbox/tests/test_authentication.py @@ -342,6 +342,14 @@ class ObjectPermissionViewTestCase(TestCase): 'confirm': True } + # Attempt to delete object without permission + request = { + 'path': reverse('ipam:prefix_delete', kwargs={'pk': self.prefixes[0].pk}), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 403) + # Assign object permission obj_perm = ObjectPermission( model=ContentType.objects.get_for_model(Prefix), @@ -372,6 +380,147 @@ class ObjectPermissionViewTestCase(TestCase): self.assertHttpStatus(response, 404) self.assertTrue(Prefix.objects.filter(pk=self.prefixes[3].pk).exists()) + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_bulk_import_objects(self): + initial_count = Prefix.objects.count() + form_data = { + 'csv': "prefix,status,site\n" + "10.0.9.0/24,Active,Site 1\n" + "10.0.10.0/24,Active,Site 2\n" + "10.0.11.0/24,Active,Site 3\n", + } + + # Attempt to import objects without permission + request = { + 'path': reverse('ipam:prefix_import'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 403) + self.assertEqual(initial_count, Prefix.objects.count()) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={'site__name': 'Site 1'}, + can_add=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Attempt to create non-permitted objects + request = { + 'path': reverse('ipam:prefix_import'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 200) + self.assertEqual(Prefix.objects.count(), initial_count) + + # Create a permitted object + form_data = { + 'csv': "prefix,status,site\n" + "10.0.9.0/24,Active,Site 1\n" + "10.0.10.0/24,Active,Site 1\n" + "10.0.11.0/24,Active,Site 1\n", + } + request = { + 'path': reverse('ipam:prefix_import'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 200) + self.assertEqual(Prefix.objects.count(), initial_count + 3) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_bulk_edit_objects(self): + form_data = { + 'pk': [p.pk for p in self.prefixes], + 'status': 'reserved', + '_apply': True, + } + + # Attempt to edit objects without permission + request = { + 'path': reverse('ipam:prefix_bulk_edit'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={'site__name': 'Site 1'}, + can_change=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Attempt to edit non-permitted objects + request = { + 'path': reverse('ipam:prefix_bulk_edit'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 200) + self.assertEqual(Prefix.objects.get(pk=self.prefixes[3].pk).status, 'active') + + # Edit permitted objects + form_data['pk'] = [p.pk for p in self.prefixes[:3]] + request = { + 'path': reverse('ipam:prefix_bulk_edit'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 302) + self.assertEqual(Prefix.objects.get(pk=self.prefixes[0].pk).status, 'reserved') + + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_bulk_delete_objects(self): + form_data = { + 'pk': [p.pk for p in self.prefixes], + 'confirm': True, + '_confirm': True, + } + + # Attempt to delete objects without permission + request = { + 'path': reverse('ipam:prefix_bulk_delete'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={'site__name': 'Site 1'}, + can_view=True, + can_delete=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Attempt to delete non-permitted object + request = { + 'path': reverse('ipam:prefix_bulk_delete'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 200) + self.assertTrue(Prefix.objects.filter(pk=self.prefixes[3].pk).exists()) + + # Delete permitted objects + form_data['pk'] = [p.pk for p in self.prefixes[:3]] + request = { + 'path': reverse('ipam:prefix_bulk_delete'), + 'data': form_data, + } + response = self.client.post(**request) + self.assertHttpStatus(response, 302) + self.assertFalse(Prefix.objects.filter(pk=self.prefixes[0].pk).exists()) + class ObjectPermissionAPIViewTestCase(TestCase): client_class = APIClient diff --git a/netbox/utilities/views.py b/netbox/utilities/views.py index d9eace90b..44dd40d90 100644 --- a/netbox/utilities/views.py +++ b/netbox/utilities/views.py @@ -297,9 +297,9 @@ class ObjectEditView(GetReturnURLMixin, View): return redirect(self.get_return_url(request, obj)) except ObjectDoesNotExist: - logger.debug("Object save failed due to object-level permissions violation") - # TODO: Link user to personal permissions view - form.add_error(None, "Object save failed due to object-level permissions violation") + msg = "Object save failed due to object-level permissions violation" + logger.debug(msg) + form.add_error(None, msg) else: logger.debug("Form validation failed") @@ -576,11 +576,13 @@ class BulkImportView(GetReturnURLMixin, View): """ Import objects in bulk (CSV format). - model_form: The form used to create each imported object - table: The django-tables2 Table used to render the list of imported objects - template_name: The name of the template - widget_attrs: A dict of attributes to apply to the import widget (e.g. to require a session key) + :param queryset: Base queryset for the model + :param model_form: The form used to create each imported object + :param table: The django-tables2 Table used to render the list of imported objects + :param template_name: The name of the template + :param widget_attrs: A dict of attributes to apply to the import widget (e.g. to require a session key) """ + queryset = None model_form = None table = None template_name = 'utilities/obj_bulk_import.html' @@ -634,6 +636,10 @@ class BulkImportView(GetReturnURLMixin, View): form.add_error('csv', "Row {} {}: {}".format(row, field, err[0])) raise ValidationError("") + # Enforce object-level permissions + if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs): + raise ObjectDoesNotExist + # Compile a table containing the imported objects obj_table = self.table(new_objs) @@ -650,6 +656,11 @@ class BulkImportView(GetReturnURLMixin, View): except ValidationError: pass + except ObjectDoesNotExist: + msg = "Object import failed due to object-level permissions violation" + logger.debug(msg) + form.add_error(None, msg) + else: logger.debug("Form validation failed") @@ -707,7 +718,7 @@ class BulkEditView(GetReturnURLMixin, View): with transaction.atomic(): - updated_count = 0 + updated_objects = [] for obj in model.objects.filter(pk__in=form.cleaned_data['pk']): # Update standard fields. If a field is listed in _nullify, delete its value. @@ -736,6 +747,7 @@ class BulkEditView(GetReturnURLMixin, View): obj.full_clean() obj.save() + updated_objects.append(obj) logger.debug(f"Saved {obj} (PK: {obj.pk})") # Update custom fields @@ -765,10 +777,12 @@ class BulkEditView(GetReturnURLMixin, View): if form.cleaned_data.get('remove_tags', None): obj.tags.remove(*form.cleaned_data['remove_tags']) - updated_count += 1 + # Enforce object-level permissions + if self.queryset.filter(pk__in=[obj.pk for obj in updated_objects]).count() != len(updated_objects): + raise ObjectDoesNotExist - if updated_count: - msg = 'Updated {} {}'.format(updated_count, model._meta.verbose_name_plural) + if updated_objects: + msg = 'Updated {} {}'.format(len(updated_objects), model._meta.verbose_name_plural) logger.info(msg) messages.success(self.request, msg) @@ -777,6 +791,11 @@ class BulkEditView(GetReturnURLMixin, View): except ValidationError as e: messages.error(self.request, "{} failed validation: {}".format(obj, e)) + except ObjectDoesNotExist: + msg = "Object update failed due to object-level permissions violation" + logger.debug(msg) + form.add_error(None, msg) + else: logger.debug("Form validation failed")