diff --git a/netbox/utilities/testing/base.py b/netbox/utilities/testing/base.py index 6d17fa1ec..1a0c3f46b 100644 --- a/netbox/utilities/testing/base.py +++ b/netbox/utilities/testing/base.py @@ -1,9 +1,11 @@ import json +from contextlib import contextmanager from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.postgres.fields import ArrayField, RangeField from django.core.exceptions import FieldDoesNotExist +from django.db import transaction from django.db.models import ManyToManyField, ManyToManyRel, JSONField from django.forms.models import model_to_dict from django.test import Client, TestCase as _TestCase @@ -36,6 +38,20 @@ class TestCase(_TestCase): self.client = Client() self.client.force_login(self.user) + @contextmanager + def cleanupSubTest(self, **params): + """ + Context manager that wraps subTest with automatic cleanup. + All database changes within the context will be rolled back. + """ + sid = transaction.savepoint() + + try: + with self.subTest(**params): + yield + finally: + transaction.savepoint_rollback(sid) + # # Permissions management # diff --git a/netbox/utilities/testing/views.py b/netbox/utilities/testing/views.py index da8a87098..e01a2e12d 100644 --- a/netbox/utilities/testing/views.py +++ b/netbox/utilities/testing/views.py @@ -152,7 +152,6 @@ class ViewTestCases: @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) def test_create_object_with_permission(self): - # Assign unconstrained permission obj_perm = ObjectPermission( name='Test permission', @@ -586,19 +585,56 @@ class ViewTestCases: response = self.client.post(**request) self.assertHttpStatus(response, 302) self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count()) - for instance in self._get_queryset().order_by('-pk')[:self.bulk_create_count]: + for instance in self._get_queryset().order_by('-pk')[: self.bulk_create_count]: self.assertInstanceEqual(instance, self.bulk_create_data, exclude=self.validation_excluded_fields) class BulkImportObjectsViewTestCase(ModelViewTestCase): """ Create multiple instances from imported data. - :csv_data: A list of CSV-formatted lines (starting with the headers) to be used for bulk object import. + :csv_data: CSV data for bulk import testing. Supports two formats: + + 1. Tuple/list format (backwards compatible): + csv_data = ( + "name,slug,description", + "Object 1,object-1,First object", + "Object 2,object-2,Second object", + ) + + 2. Dictionary format for multiple scenarios: + csv_data = { + 'default': ( + "name,slug,description", + "Object 1,object-1,First object", + ), + 'with_optional_fields': ( + "name,slug,description,comments", + "Object 2,object-2,Second object,With comments", + ) + } + + When using dictionary format, test_bulk_import_objects_with_permission() + runs each scenario as a separate subtest with clear output: + + test_bulk_import_objects_with_permission (scenario=default) ... ok + test_bulk_import_objects_with_permission (scenario=with_optional_fields) ... ok """ + csv_data = () - def _get_csv_data(self): - return '\n'.join(self.csv_data) + def _get_csv_data(self, scenario_name='default'): + """ + Get CSV data for testing. Supports both tuple/list and dictionary formats. + """ + if isinstance(self.csv_data, dict): + if scenario_name not in self.csv_data: + available = ', '.join(self.csv_data.keys()) + raise ValueError(f"Scenario '{scenario_name}' not found in csv_data. Available: {available}") + return '\n'.join(self.csv_data[scenario_name]) + elif isinstance(self.csv_data, (tuple, list)): + return '\n'.join(self.csv_data) + else: + raise TypeError(f'csv_data must be a tuple, list, or dictionary, got {type(self.csv_data)}') def _get_update_csv_data(self): return self.csv_update_data, '\n'.join(self.csv_update_data) @@ -620,10 +656,38 @@ class ViewTestCases: self.assertHttpStatus(response, 403) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) - def test_bulk_import_objects_with_permission(self): + def test_bulk_import_objects_with_permission(self, post_import_callback=None): + scenarios = self.csv_data.keys() if isinstance(self.csv_data, dict) else ['default'] + + # Assign model-level permission once for all scenarios + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model)) + + # Try GET with model-level permission (only once) + self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200) + + # Test each scenario + for scenario_name in scenarios: + with self.cleanupSubTest(scenario=scenario_name): + self._test_bulk_import_with_permission_scenario(scenario_name) + + if post_import_callback: + post_import_callback(scenario_name) + + def _test_bulk_import_with_permission_scenario(self, scenario_name): + """ + Helper method to test a single bulk import scenario. + """ initial_count = self._get_queryset().count() + + # Get CSV data for this scenario + scenario_data = self._get_csv_data(scenario_name) + expected_new_objects = len(scenario_data.splitlines()) - 1 + data = { - 'data': self._get_csv_data(), + 'data': scenario_data, 'format': ImportFormatChoices.CSV, 'csv_delimiter': CSVDelimiterChoices.AUTO, } @@ -632,32 +696,22 @@ class ViewTestCases: if issubclass(self.model, ChangeLoggingMixin): data['changelog_message'] = get_random_string(10) - # Assign model-level permission - obj_perm = ObjectPermission( - name='Test permission', - actions=['add'] - ) - obj_perm.save() - obj_perm.users.add(self.user) - obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model)) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200) - # Test POST with permission response = self.client.post(self._get_url('bulk_import'), data) self.assertHttpStatus(response, 302) - self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + + # Verify object count increase + self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects) # Verify ObjectChange creation if issubclass(self.model, ChangeLoggingMixin): request_id = response.headers.get('X-Request-ID') - self.assertIsNotNone(request_id, "Unable to determine request ID from response") + self.assertIsNotNone(request_id, 'Unable to determine request ID from response') objectchanges = ObjectChange.objects.filter( - changed_object_type=ContentType.objects.get_for_model(self.model), - request_id=request_id + changed_object_type=ContentType.objects.get_for_model(self.model), request_id=request_id ) - self.assertEqual(len(objectchanges), len(self.csv_data) - 1) + self.assertEqual(len(objectchanges), expected_new_objects) + for oc in objectchanges: self.assertEqual(oc.action, ObjectChangeActionChoices.ACTION_CREATE) self.assertEqual(oc.message, data['changelog_message']) @@ -701,35 +755,58 @@ class ViewTestCases: self.assertEqual(value, value) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) - def test_bulk_import_objects_with_constrained_permission(self): - initial_count = self._get_queryset().count() - data = { - 'data': self._get_csv_data(), - 'format': ImportFormatChoices.CSV, - 'csv_delimiter': CSVDelimiterChoices.AUTO, - } + def test_bulk_import_objects_with_constrained_permission(self, post_import_callback=None): + scenarios = self.csv_data.keys() if isinstance(self.csv_data, dict) else ['default'] - # Assign constrained permission + # Assign constrained permission (deny all initially) obj_perm = ObjectPermission( name='Test permission', constraints={'pk': 0}, # Dummy permission to deny all - actions=['add'] + actions=['add'], ) obj_perm.save() obj_perm.users.add(self.user) obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model)) - # Attempt to import non-permitted objects + # Test each scenario with constrained permissions + for scenario_name in scenarios: + with self.cleanupSubTest(scenario=scenario_name): + self._test_bulk_import_constrained_scenario(scenario_name, obj_perm) + + if post_import_callback: + post_import_callback(scenario_name) + + def _test_bulk_import_constrained_scenario(self, scenario_name, obj_perm): + """ + Helper method to test a single bulk import scenario with constrained permissions. + """ + initial_count = self._get_queryset().count() + + # Get CSV data for this scenario + scenario_data = self._get_csv_data(scenario_name) + expected_new_objects = len(scenario_data.splitlines()) - 1 + + data = { + 'data': scenario_data, + 'format': ImportFormatChoices.CSV, + 'csv_delimiter': CSVDelimiterChoices.AUTO, + } + + # Attempt to import non-permitted objects (should fail) self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 200) self.assertEqual(self._get_queryset().count(), initial_count) - # Update permission constraints + # Update permission constraints to allow all obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all obj_perm.save() - # Import permitted objects + # Import permitted objects (should succeed) self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 302) - self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects) + + # Reset permission constraints for next scenario + obj_perm.constraints = {'pk': 0} # Reset to deny all + obj_perm.save() class BulkEditObjectsViewTestCase(ModelViewTestCase): """