Merge branch 'main' into feature
CodeQL / Analyze (${{ matrix.language }}) (none, actions) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled
CodeQL / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Has been cancelled
CodeQL / Analyze (${{ matrix.language }}) (none, python) (push) Has been cancelled

This commit is contained in:
Jeremy Stretch
2025-09-16 12:00:48 -04:00
110 changed files with 299711 additions and 37418 deletions
+21 -6
View File
@@ -51,30 +51,45 @@ def serialize_object(obj, resolve_tags=True, extra=None, exclude=None):
return data
def deserialize_object(model, fields, pk=None):
def deserialize_object(model, data, pk=None):
"""
Instantiate an object from the given model and field data. Functions as
the complement to serialize_object().
"""
content_type = ContentType.objects.get_for_model(model)
data = data.copy()
m2m_data = {}
# Account for custom field data
if 'custom_fields' in fields:
fields['custom_field_data'] = fields.pop('custom_fields')
if 'custom_fields' in data:
data['custom_field_data'] = data.pop('custom_fields')
# Pop any assigned tags to handle the M2M relationships manually
if is_taggable(model) and fields.get('tags'):
if is_taggable(model) and data.get('tags'):
Tag = apps.get_model('extras', 'Tag')
m2m_data['tags'] = Tag.objects.filter(name__in=fields.pop('tags'))
m2m_data['tags'] = Tag.objects.filter(name__in=data.pop('tags'))
# Separate any non-field attributes for assignment after deserialization of the object
model_fields = [
field.name for field in model._meta.get_fields()
]
attrs = {
name: data.pop(name) for name in list(data.keys())
if name not in model_fields
}
# Employ Django's native Python deserializer to produce the instance
data = {
'model': '.'.join(content_type.natural_key()),
'pk': pk,
'fields': fields,
'fields': data,
}
instance = list(serializers.deserialize('python', [data]))[0]
# Assign non-field attributes
for name, value in attrs.items():
setattr(instance.object, name, value)
# Apply any additional M2M assignments
instance.m2m_data.update(**m2m_data)
@@ -14,7 +14,7 @@
{{ value|isodatetime }}
{% elif customfield.type == 'url' and value %}
<a href="{{ value }}">{{ value|truncatechars:70 }}</a>
{% elif customfield.type == 'json' and value %}
{% elif customfield.type == 'json' and value is not None %}
<pre>{{ value|json }}</pre>
{% elif customfield.type == 'multiselect' and value %}
{{ value|join:", " }}
+4 -4
View File
@@ -247,9 +247,9 @@ class APIViewTestCases:
if issubclass(self.model, ChangeLoggingMixin):
objectchange = ObjectChange.objects.get(
changed_object_type=ContentType.objects.get_for_model(instance),
changed_object_id=instance.pk
changed_object_id=instance.pk,
action=ObjectChangeActionChoices.ACTION_CREATE,
)
self.assertEqual(objectchange.action, ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(objectchange.message, data['changelog_message'])
def test_bulk_create_objects(self):
@@ -298,11 +298,11 @@ class APIViewTestCases:
]
objectchanges = ObjectChange.objects.filter(
changed_object_type=ContentType.objects.get_for_model(self.model),
changed_object_id__in=id_list
changed_object_id__in=id_list,
action=ObjectChangeActionChoices.ACTION_CREATE,
)
self.assertEqual(len(objectchanges), len(self.create_data))
for oc in objectchanges:
self.assertEqual(oc.action, ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(oc.message, changelog_message)
class UpdateObjectViewTestCase(APITestCase):
+16
View File
@@ -1,9 +1,11 @@
import json
from contextlib import contextmanager
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import ArrayField, RangeField
from django.core.exceptions import FieldDoesNotExist
from django.db import transaction
from django.db.models import ManyToManyField, ManyToManyRel, JSONField
from django.forms.models import model_to_dict
from django.test import Client, TestCase as _TestCase
@@ -36,6 +38,20 @@ class TestCase(_TestCase):
self.client = Client()
self.client.force_login(self.user)
@contextmanager
def cleanupSubTest(self, **params):
"""
Context manager that wraps subTest with automatic cleanup.
All database changes within the context will be rolled back.
"""
sid = transaction.savepoint()
try:
with self.subTest(**params):
yield
finally:
transaction.savepoint_rollback(sid)
#
# Permissions management
#
+111 -38
View File
@@ -152,7 +152,6 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
def test_create_object_with_permission(self):
# Assign unconstrained permission
obj_perm = ObjectPermission(
name='Test permission',
@@ -586,19 +585,59 @@ class ViewTestCases:
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count())
for instance in self._get_queryset().order_by('-pk')[:self.bulk_create_count]:
for instance in self._get_queryset().order_by('-pk')[: self.bulk_create_count]:
self.assertInstanceEqual(instance, self.bulk_create_data, exclude=self.validation_excluded_fields)
class BulkImportObjectsViewTestCase(ModelViewTestCase):
"""
Create multiple instances from imported data.
:csv_data: A list of CSV-formatted lines (starting with the headers) to be used for bulk object import.
:csv_data: CSV data for bulk import testing. Supports two formats:
1. Tuple/list format (backwards compatible):
csv_data = (
"name,slug,description",
"Object 1,object-1,First object",
"Object 2,object-2,Second object",
)
2. Dictionary format for multiple scenarios:
csv_data = {
'default': (
"name,slug,description",
"Object 1,object-1,First object",
),
'with_optional_fields': (
"name,slug,description,comments",
"Object 2,object-2,Second object,With comments",
)
}
When using dictionary format, test_bulk_import_objects_with_permission()
runs each scenario as a separate subtest with clear output:
test_bulk_import_objects_with_permission (scenario=default) ... ok
test_bulk_import_objects_with_permission (scenario=with_optional_fields) ... ok
"""
csv_data = ()
def _get_csv_data(self):
return '\n'.join(self.csv_data)
def get_scenarios(self):
return self.csv_data.keys() if isinstance(self.csv_data, dict) else ['default']
def _get_csv_data(self, scenario_name='default'):
"""
Get CSV data for testing. Supports both tuple/list and dictionary formats.
"""
if isinstance(self.csv_data, dict):
if scenario_name not in self.csv_data:
available = ', '.join(self.csv_data.keys())
raise ValueError(f"Scenario '{scenario_name}' not found in csv_data. Available: {available}")
return '\n'.join(self.csv_data[scenario_name])
elif isinstance(self.csv_data, (tuple, list)):
return '\n'.join(self.csv_data)
else:
raise TypeError(f'csv_data must be a tuple, list, or dictionary, got {type(self.csv_data)}')
def _get_update_csv_data(self):
return self.csv_update_data, '\n'.join(self.csv_update_data)
@@ -620,10 +659,36 @@ class ViewTestCases:
self.assertHttpStatus(response, 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
def test_bulk_import_objects_with_permission(self):
def test_bulk_import_objects_with_permission(self, post_import_callback=None):
# Assign model-level permission once for all scenarios
obj_perm = ObjectPermission(name='Test permission', actions=['add'])
obj_perm.save()
obj_perm.users.add(self.user)
obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
# Try GET with model-level permission (only once)
self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200)
# Test each scenario
for scenario_name in self.get_scenarios():
with self.cleanupSubTest(scenario=scenario_name):
self._test_bulk_import_with_permission_scenario(scenario_name)
if post_import_callback:
post_import_callback(scenario_name)
def _test_bulk_import_with_permission_scenario(self, scenario_name):
"""
Helper method to test a single bulk import scenario.
"""
initial_count = self._get_queryset().count()
# Get CSV data for this scenario
scenario_data = self._get_csv_data(scenario_name)
expected_new_objects = len(scenario_data.splitlines()) - 1
data = {
'data': self._get_csv_data(),
'data': scenario_data,
'format': ImportFormatChoices.CSV,
'csv_delimiter': CSVDelimiterChoices.AUTO,
}
@@ -632,34 +697,25 @@ class ViewTestCases:
if issubclass(self.model, ChangeLoggingMixin):
data['changelog_message'] = get_random_string(10)
# Assign model-level permission
obj_perm = ObjectPermission(
name='Test permission',
actions=['add']
)
obj_perm.save()
obj_perm.users.add(self.user)
obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
# Try GET with model-level permission
self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200)
# Test POST with permission
response = self.client.post(self._get_url('bulk_import'), data)
self.assertHttpStatus(response, 302)
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
# Verify object count increase
self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects)
# Verify ObjectChange creation
if issubclass(self.model, ChangeLoggingMixin):
request_id = response.headers.get('X-Request-ID')
self.assertIsNotNone(request_id, "Unable to determine request ID from response")
self.assertIsNotNone(request_id, 'Unable to determine request ID from response')
objectchanges = ObjectChange.objects.filter(
changed_object_type=ContentType.objects.get_for_model(self.model),
request_id=request_id
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE,
)
self.assertEqual(len(objectchanges), len(self.csv_data) - 1)
self.assertEqual(len(objectchanges), expected_new_objects)
for oc in objectchanges:
self.assertEqual(oc.action, ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(oc.message, data['changelog_message'])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
@@ -701,35 +757,52 @@ class ViewTestCases:
self.assertEqual(value, value)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
def test_bulk_import_objects_with_constrained_permission(self):
initial_count = self._get_queryset().count()
data = {
'data': self._get_csv_data(),
'format': ImportFormatChoices.CSV,
'csv_delimiter': CSVDelimiterChoices.AUTO,
}
# Assign constrained permission
def test_bulk_import_objects_with_constrained_permission(self, post_import_callback=None):
# Assign constrained permission (deny all initially)
obj_perm = ObjectPermission(
name='Test permission',
constraints={'pk': 0}, # Dummy permission to deny all
actions=['add']
actions=['add'],
)
obj_perm.save()
obj_perm.users.add(self.user)
obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
# Attempt to import non-permitted objects
# Test each scenario with constrained permissions
for scenario_name in self.get_scenarios():
with self.cleanupSubTest(scenario=scenario_name):
self._test_bulk_import_constrained_scenario(scenario_name, obj_perm)
if post_import_callback:
post_import_callback(scenario_name)
def _test_bulk_import_constrained_scenario(self, scenario_name, obj_perm):
"""
Helper method to test a single bulk import scenario with constrained permissions.
"""
initial_count = self._get_queryset().count()
# Get CSV data for this scenario
scenario_data = self._get_csv_data(scenario_name)
expected_new_objects = len(scenario_data.splitlines()) - 1
data = {
'data': scenario_data,
'format': ImportFormatChoices.CSV,
'csv_delimiter': CSVDelimiterChoices.AUTO,
}
# Attempt to import non-permitted objects (should fail)
self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 200)
self.assertEqual(self._get_queryset().count(), initial_count)
# Update permission constraints
# Update permission constraints to allow all
obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
obj_perm.save()
# Import permitted objects
# Import permitted objects (should succeed)
self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 302)
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects)
class BulkEditObjectsViewTestCase(ModelViewTestCase):
"""
@@ -0,0 +1,49 @@
from django.test import TestCase
from dcim.choices import SiteStatusChoices
from dcim.models import Site
from extras.models import Tag
from utilities.serialization import deserialize_object, serialize_object
class SerializationTestCase(TestCase):
@classmethod
def setUpTestData(cls):
tags = (
Tag(name='Tag 1', slug='tag-1'),
Tag(name='Tag 2', slug='tag-2'),
Tag(name='Tag 3', slug='tag-3'),
)
Tag.objects.bulk_create(tags)
def test_serialize_object(self):
site = Site.objects.create(
name='Site 1',
slug='site=1',
description='Ignore me',
)
site.tags.set(Tag.objects.all())
data = serialize_object(site, extra={'foo': 123}, exclude=['description'])
self.assertEqual(data['name'], site.name)
self.assertEqual(data['slug'], site.slug)
self.assertEqual(data['tags'], [tag.name for tag in Tag.objects.all()])
self.assertEqual(data['foo'], 123)
self.assertNotIn('description', data)
def test_deserialize_object(self):
data = {
'name': 'Site 1',
'slug': 'site-1',
'tags': ['Tag 1', 'Tag 2', 'Tag 3'],
'foo': 123,
}
instance = deserialize_object(Site, data, pk=123)
self.assertEqual(instance.object.pk, 123)
self.assertEqual(instance.object.name, data['name'])
self.assertEqual(instance.object.slug, data['slug'])
self.assertEqual(instance.object.status, SiteStatusChoices.STATUS_ACTIVE) # Default field value
self.assertEqual(instance.object.foo, data['foo']) # Non-field attribute
self.assertEqual(list(instance.m2m_data['tags']), list(Tag.objects.all()))