From 8c40148ca730f21ec65b0ffa53d0e5bf924603bc Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Wed, 20 May 2020 16:47:33 -0400 Subject: [PATCH] Add object permission tests for get and list API views --- netbox/netbox/tests/test_authentication.py | 121 +++++++++++++++++++++ netbox/utilities/api.py | 10 +- 2 files changed, 128 insertions(+), 3 deletions(-) diff --git a/netbox/netbox/tests/test_authentication.py b/netbox/netbox/tests/test_authentication.py index 18bf251d4..64dd83783 100644 --- a/netbox/netbox/tests/test_authentication.py +++ b/netbox/netbox/tests/test_authentication.py @@ -371,3 +371,124 @@ class ObjectPermissionViewTestCase(TestCase): response = self.client.post(**request) self.assertHttpStatus(response, 404) self.assertTrue(Prefix.objects.filter(pk=self.prefixes[3].pk).exists()) + + +class ObjectPermissionAPIViewTestCase(TestCase): + client_class = APIClient + + @classmethod + def setUpTestData(cls): + + cls.sites = ( + Site(name='Site 1', slug='site-1'), + Site(name='Site 2', slug='site-2'), + Site(name='Site 3', slug='site-3'), + ) + Site.objects.bulk_create(cls.sites) + + cls.prefixes = ( + Prefix(prefix=IPNetwork('10.0.0.0/24'), site=cls.sites[0]), + Prefix(prefix=IPNetwork('10.0.1.0/24'), site=cls.sites[0]), + Prefix(prefix=IPNetwork('10.0.2.0/24'), site=cls.sites[0]), + Prefix(prefix=IPNetwork('10.0.3.0/24'), site=cls.sites[1]), + Prefix(prefix=IPNetwork('10.0.4.0/24'), site=cls.sites[1]), + Prefix(prefix=IPNetwork('10.0.5.0/24'), site=cls.sites[1]), + Prefix(prefix=IPNetwork('10.0.6.0/24'), site=cls.sites[2]), + Prefix(prefix=IPNetwork('10.0.7.0/24'), site=cls.sites[2]), + Prefix(prefix=IPNetwork('10.0.8.0/24'), site=cls.sites[2]), + ) + Prefix.objects.bulk_create(cls.prefixes) + + def setUp(self): + """ + Create a test user and token for API calls. + """ + self.user = User.objects.create(username='testuser') + self.token = Token.objects.create(user=self.user) + self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)} + + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_get_object(self): + + # Attempt to retrieve object without permission + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={ + 'site__name': 'Site 1', + }, + can_view=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Retrieve permitted object + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk}) + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 200) + + # Attempt to retrieve non-permitted object + url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk}) + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 404) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + def test_list_objects(self): + url = reverse('ipam-api:prefix-list') + + # Attempt to list objects without permission + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 403) + + # Assign object permission + obj_perm = ObjectPermission( + model=ContentType.objects.get_for_model(Prefix), + attrs={ + 'site__name': 'Site 1', + }, + can_view=True + ) + obj_perm.save() + obj_perm.users.add(self.user) + + # Retrieve all objects. Only permitted objects should be returned. + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data['count'], 3) + + # TODO + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_create_object(self): + # url = reverse('ipam-api:prefix-list') + # data = { + # 'prefix': '10.0.9.0/24', + # 'site': self.sites[1].pk, + # } + # initial_count = Prefix.objects.count() + # + # # Attempt to create an object without permission + # response = self.client.post(url, data, format='json', **self.header) + # self.assertEqual(response.status_code, 403) + # + # # Assign object permission + # obj_perm = ObjectPermission( + # model=ContentType.objects.get_for_model(Prefix), + # attrs={'site__name': 'Site 1'}, + # can_view=True + # ) + # obj_perm.save() + # obj_perm.users.add(self.user) + # + # # Attempt to create a non-permitted object + # response = self.client.post(url, data, format='json', **self.header) + # self.assertEqual(response.status_code, 403) + # self.assertEqual(Prefix.objects.count(), initial_count) + # + # # Create a permitted object + # response = self.client.post(url, data, format='json', **self.header) + # self.assertEqual(response.status_code, 200) + # self.assertEqual(Prefix.objects.count(), initial_count + 1) diff --git a/netbox/utilities/api.py b/netbox/utilities/api.py index 405c26878..9ec587369 100644 --- a/netbox/utilities/api.py +++ b/netbox/utilities/api.py @@ -329,11 +329,15 @@ class ModelViewSet(_ModelViewSet): if not request.user.is_authenticated or request.user.is_superuser: return - permission_required = 'dcim.view_site' + # Determine the required permission + permission_required = "{}.view_{}".format( + self.queryset.model._meta.app_label, + self.queryset.model._meta.model_name + ) # Enforce object-level permissions - if permission_required not in self.request.user._perm_cache: - attrs = ObjectPermission.objects.get_attr_constraints(self.request.user, permission_required) + if permission_required not in {*request.user._user_perm_cache, *request.user._group_perm_cache}: + attrs = ObjectPermission.objects.get_attr_constraints(request.user, permission_required) if attrs: # Update the view's QuerySet to filter only the permitted objects self.queryset = self.queryset.filter(attrs)