Files
netbox/netbox/extras/tests/test_models.py
Aditya Sharma e81ccb9be6
Some checks failed
CodeQL / Analyze (actions) (push) Has been cancelled
CodeQL / Analyze (javascript-typescript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled
CI / build (20.x, 3.14) (push) Has been cancelled
Lock threads / lock (push) Has been cancelled
Close stale issues/PRs / stale (push) Has been cancelled
Close incomplete issues / stale (push) Has been cancelled
Update translation strings / makemessages (push) Has been cancelled
Fixes #21214: Clean up AutoSyncRecord when detaching from DataSource (#21219)
Co-authored-by: adionit7 <adionit7@users.noreply.github.com>
2026-01-21 16:38:27 -06:00

807 lines
28 KiB
Python

import tempfile
from pathlib import Path
from django.contrib.contenttypes.models import ContentType
from django.core.files.uploadedfile import SimpleUploadedFile
from django.forms import ValidationError
from django.test import tag, TestCase
from core.models import AutoSyncRecord, DataSource, ObjectType
from dcim.models import Device, DeviceRole, DeviceType, Location, Manufacturer, Platform, Region, Site, SiteGroup
from extras.models import ConfigContext, ConfigContextProfile, ConfigTemplate, ImageAttachment, Tag, TaggedItem
from tenancy.models import Tenant, TenantGroup
from utilities.exceptions import AbortRequest
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
class ImageAttachmentTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.ct_rack = ContentType.objects.get(app_label='dcim', model='rack')
cls.image_content = b''
def _stub_image_attachment(self, object_id, image_filename, name=None):
"""
Creates an instance of ImageAttachment with the provided object_id and image_name.
This method prepares a stubbed image attachment to test functionalities that
require an ImageAttachment object.
The function initializes the attachment with a specified file name and
pre-defined image content.
"""
ia = ImageAttachment(
object_type=self.ct_rack,
object_id=object_id,
name=name,
image=SimpleUploadedFile(
name=image_filename,
content=self.image_content,
content_type='image/jpeg',
),
)
return ia
def test_filename_strips_expected_prefix(self):
"""
Tests that the filename of the image attachment is stripped of the expected
prefix.
"""
ia = self._stub_image_attachment(12, 'image-attachments/rack_12_My_File.png')
self.assertEqual(ia.filename, 'My_File.png')
def test_filename_legacy_nested_path_returns_basename(self):
"""
Tests if the filename of a legacy-nested path correctly returns only the basename.
"""
# e.g. "image-attachments/rack_12_5/31/23.jpg" -> "23.jpg"
ia = self._stub_image_attachment(12, 'image-attachments/rack_12_5/31/23.jpg')
self.assertEqual(ia.filename, '23.jpg')
def test_filename_no_prefix_returns_basename(self):
"""
Tests that the filename property correctly returns the basename for an image
attachment that has no leading prefix in its path.
"""
ia = self._stub_image_attachment(42, 'image-attachments/just_name.webp')
self.assertEqual(ia.filename, 'just_name.webp')
def test_mismatched_prefix_is_not_stripped(self):
"""
Tests that a mismatched prefix in the filename is not stripped.
"""
# Prefix does not match object_id -> leave as-is (basename only)
ia = self._stub_image_attachment(12, 'image-attachments/rack_13_other.png')
self.assertEqual('rack_13_other.png', ia.filename)
def test_str_uses_name_when_present(self):
"""
Tests that the `str` representation of the object uses the
`name` attribute when provided.
"""
ia = self._stub_image_attachment(12, 'image-attachments/rack_12_file.png', name='Human title')
self.assertEqual('Human title', str(ia))
def test_str_falls_back_to_filename(self):
"""
Tests that the `str` representation of the object falls back to
the filename if the name attribute is not set.
"""
ia = self._stub_image_attachment(12, 'image-attachments/rack_12_file.png', name='')
self.assertEqual('file.png', str(ia))
class TagTest(TestCase):
def test_default_ordering_weight_then_name_is_set(self):
Tag.objects.create(name='Tag 1', slug='tag-1', weight=3000)
Tag.objects.create(name='Tag 2', slug='tag-2') # Default: 1000
Tag.objects.create(name='Tag 3', slug='tag-3', weight=2000)
Tag.objects.create(name='Tag 4', slug='tag-4', weight=2000)
tags = Tag.objects.all()
self.assertEqual(tags[0].slug, 'tag-2')
self.assertEqual(tags[1].slug, 'tag-3')
self.assertEqual(tags[2].slug, 'tag-4')
self.assertEqual(tags[3].slug, 'tag-1')
def test_tag_related_manager_ordering_weight_then_name(self):
tags = [
Tag.objects.create(name='Tag 1', slug='tag-1', weight=3000),
Tag.objects.create(name='Tag 2', slug='tag-2'), # Default: 1000
Tag.objects.create(name='Tag 3', slug='tag-3', weight=2000),
Tag.objects.create(name='Tag 4', slug='tag-4', weight=2000),
]
site = Site.objects.create(name='Site 1')
for _tag in tags:
site.tags.add(_tag)
site.save()
site = Site.objects.first()
tags = site.tags.all()
self.assertEqual(tags[0].slug, 'tag-2')
self.assertEqual(tags[1].slug, 'tag-3')
self.assertEqual(tags[2].slug, 'tag-4')
self.assertEqual(tags[3].slug, 'tag-1')
def test_create_tag_unicode(self):
tag = Tag(name='Testing Unicode: 台灣')
tag.save()
self.assertEqual(tag.slug, 'testing-unicode-台灣')
def test_object_type_validation(self):
region = Region.objects.create(name='Region 1', slug='region-1')
sitegroup = SiteGroup.objects.create(name='Site Group 1', slug='site-group-1')
# Create a Tag that can only be applied to Regions
tag = Tag.objects.create(name='Tag 1', slug='tag-1')
tag.object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'region'))
# Apply the Tag to a Region
region.tags.add(tag)
self.assertIn(tag, region.tags.all())
# Apply the Tag to a SiteGroup
with self.assertRaises(AbortRequest):
sitegroup.tags.add(tag)
class ConfigContextTest(TestCase):
"""
These test cases deal with the weighting, ordering, and deep merge logic of config context data.
It also ensures the various config context querysets are consistent.
"""
@classmethod
def setUpTestData(cls):
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
devicetype = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1', slug='device-type-1')
role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
region = Region.objects.create(name='Region')
sitegroup = SiteGroup.objects.create(name='Site Group')
site = Site.objects.create(name='Site 1', slug='site-1', region=region, group=sitegroup)
location = Location.objects.create(name='Location 1', slug='location-1', site=site)
Platform.objects.create(name='Platform')
tenantgroup = TenantGroup.objects.create(name='Tenant Group')
Tenant.objects.create(name='Tenant', group=tenantgroup)
Tag.objects.create(name='Tag', slug='tag')
Tag.objects.create(name='Tag2', slug='tag2')
Device.objects.create(
name='Device 1',
device_type=devicetype,
role=role,
site=site,
location=location
)
def test_higher_weight_wins(self):
device = Device.objects.first()
context1 = ConfigContext(
name="context 1",
weight=101,
data={
"a": 123,
"b": 456,
"c": 777
}
)
context2 = ConfigContext(
name="context 2",
weight=100,
data={
"a": 123,
"b": 456,
"c": 789
}
)
ConfigContext.objects.bulk_create([context1, context2])
expected_data = {
"a": 123,
"b": 456,
"c": 777
}
self.assertEqual(device.get_config_context(), expected_data)
def test_name_ordering_after_weight(self):
device = Device.objects.first()
context1 = ConfigContext(
name="context 1",
weight=100,
data={
"a": 123,
"b": 456,
"c": 777
}
)
context2 = ConfigContext(
name="context 2",
weight=100,
data={
"a": 123,
"b": 456,
"c": 789
}
)
ConfigContext.objects.bulk_create([context1, context2])
expected_data = {
"a": 123,
"b": 456,
"c": 789
}
self.assertEqual(device.get_config_context(), expected_data)
def test_schema_validation(self):
"""
Check that the JSON schema defined by the assigned profile is enforced.
"""
profile = ConfigContextProfile.objects.create(
name="Config context profile 1",
schema={
"properties": {
"foo": {
"type": "string"
}
},
"required": [
"foo"
]
}
)
with self.assertRaises(ValidationError):
# Missing required attribute
ConfigContext(name="CC1", profile=profile, data={}).clean()
with self.assertRaises(ValidationError):
# Invalid attribute type
ConfigContext(name="CC1", profile=profile, data={"foo": 123}).clean()
ConfigContext(name="CC1", profile=profile, data={"foo": "bar"}).clean()
def test_annotation_same_as_get_for_object(self):
"""
This test incorporates features from all of the above tests cases to ensure
the annotate_config_context_data() and get_for_object() queryset methods are the same.
"""
device = Device.objects.first()
context1 = ConfigContext(
name="context 1",
weight=101,
data={
"a": 123,
"b": 456,
"c": 777
}
)
context2 = ConfigContext(
name="context 2",
weight=100,
data={
"a": 123,
"b": 456,
"c": 789
}
)
context3 = ConfigContext(
name="context 3",
weight=99,
data={
"d": 1
}
)
context4 = ConfigContext(
name="context 4",
weight=99,
data={
"d": 2
}
)
ConfigContext.objects.bulk_create([context1, context2, context3, context4])
annotated_queryset = Device.objects.filter(name=device.name).annotate_config_context_data()
self.assertEqual(device.get_config_context(), annotated_queryset[0].get_config_context())
def test_annotation_same_as_get_for_object_device_relations(self):
region = Region.objects.first()
sitegroup = SiteGroup.objects.first()
site = Site.objects.first()
location = Location.objects.first()
platform = Platform.objects.first()
tenantgroup = TenantGroup.objects.first()
tenant = Tenant.objects.first()
tag = Tag.objects.first()
region_context = ConfigContext.objects.create(
name="region",
weight=100,
data={
"region": 1
}
)
region_context.regions.add(region)
sitegroup_context = ConfigContext.objects.create(
name="sitegroup",
weight=100,
data={
"sitegroup": 1
}
)
sitegroup_context.site_groups.add(sitegroup)
site_context = ConfigContext.objects.create(
name="site",
weight=100,
data={
"site": 1
}
)
site_context.sites.add(site)
location_context = ConfigContext.objects.create(
name="location",
weight=100,
data={
"location": 1
}
)
location_context.locations.add(location)
platform_context = ConfigContext.objects.create(
name="platform",
weight=100,
data={
"platform": 1
}
)
platform_context.platforms.add(platform)
tenant_group_context = ConfigContext.objects.create(
name="tenant group",
weight=100,
data={
"tenant_group": 1
}
)
tenant_group_context.tenant_groups.add(tenantgroup)
tenant_context = ConfigContext.objects.create(
name="tenant",
weight=100,
data={
"tenant": 1
}
)
tenant_context.tenants.add(tenant)
tag_context = ConfigContext.objects.create(
name="tag",
weight=100,
data={
"tag": 1
}
)
tag_context.tags.add(tag)
device = Device.objects.create(
name="Device 2",
site=site,
location=location,
tenant=tenant,
platform=platform,
role=DeviceRole.objects.first(),
device_type=DeviceType.objects.first()
)
device.tags.add(tag)
annotated_queryset = Device.objects.filter(name=device.name).annotate_config_context_data()
self.assertEqual(device.get_config_context(), annotated_queryset[0].get_config_context())
def test_annotation_same_as_get_for_object_virtualmachine_relations(self):
region = Region.objects.first()
sitegroup = SiteGroup.objects.first()
site = Site.objects.first()
platform = Platform.objects.first()
tenantgroup = TenantGroup.objects.first()
tenant = Tenant.objects.first()
tag = Tag.objects.first()
cluster_type = ClusterType.objects.create(name="Cluster Type")
cluster_group = ClusterGroup.objects.create(name="Cluster Group")
cluster = Cluster.objects.create(
name="Cluster",
group=cluster_group,
type=cluster_type,
scope=site,
)
region_context = ConfigContext.objects.create(
name="region",
weight=100,
data={"region": 1}
)
region_context.regions.add(region)
sitegroup_context = ConfigContext.objects.create(
name="sitegroup",
weight=100,
data={"sitegroup": 1}
)
sitegroup_context.site_groups.add(sitegroup)
site_context = ConfigContext.objects.create(
name="site",
weight=100,
data={"site": 1}
)
site_context.sites.add(site)
platform_context = ConfigContext.objects.create(
name="platform",
weight=100,
data={"platform": 1}
)
platform_context.platforms.add(platform)
tenant_group_context = ConfigContext.objects.create(
name="tenant group",
weight=100,
data={"tenant_group": 1}
)
tenant_group_context.tenant_groups.add(tenantgroup)
tenant_context = ConfigContext.objects.create(
name="tenant",
weight=100,
data={"tenant": 1}
)
tenant_context.tenants.add(tenant)
tag_context = ConfigContext.objects.create(
name="tag",
weight=100,
data={"tag": 1}
)
tag_context.tags.add(tag)
cluster_type_context = ConfigContext.objects.create(
name="cluster type",
weight=100,
data={"cluster_type": 1}
)
cluster_type_context.cluster_types.add(cluster_type)
cluster_group_context = ConfigContext.objects.create(
name="cluster group",
weight=100,
data={"cluster_group": 1}
)
cluster_group_context.cluster_groups.add(cluster_group)
cluster_context = ConfigContext.objects.create(
name="cluster",
weight=100,
data={"cluster": 1}
)
cluster_context.clusters.add(cluster)
virtual_machine = VirtualMachine.objects.create(
name="VM 1",
cluster=cluster,
tenant=tenant,
platform=platform,
role=DeviceRole.objects.first()
)
virtual_machine.tags.add(tag)
annotated_queryset = VirtualMachine.objects.filter(name=virtual_machine.name).annotate_config_context_data()
self.assertEqual(virtual_machine.get_config_context(), annotated_queryset[0].get_config_context())
def test_virtualmachine_site_context(self):
"""
Check that config context associated with a site applies to a VM whether the VM is assigned
directly to that site or via its cluster.
"""
site = Site.objects.first()
cluster_type = ClusterType.objects.create(name="Cluster Type")
cluster = Cluster.objects.create(name="Cluster", type=cluster_type, scope=site)
vm_role = DeviceRole.objects.first()
# Create a ConfigContext associated with the site
context = ConfigContext.objects.create(
name="context1",
weight=100,
data={"foo": True}
)
context.sites.add(site)
# Create one VM assigned directly to the site, and one assigned via the cluster
vm1 = VirtualMachine.objects.create(name="VM 1", site=site, role=vm_role)
vm2 = VirtualMachine.objects.create(name="VM 2", cluster=cluster, role=vm_role)
# Check that their individually rendered config contexts are identical
self.assertEqual(
vm1.get_config_context(),
vm2.get_config_context()
)
# Check that their annotated config contexts are identical
vms = VirtualMachine.objects.filter(pk__in=(vm1.pk, vm2.pk)).annotate_config_context_data()
self.assertEqual(
vms[0].get_config_context(),
vms[1].get_config_context()
)
def test_valid_local_context_data(self):
device = Device.objects.first()
device.local_context_data = None
device.clean()
device.local_context_data = {"foo": "bar"}
device.clean()
def test_invalid_local_context_data(self):
device = Device.objects.first()
device.local_context_data = ""
with self.assertRaises(ValidationError):
device.clean()
device.local_context_data = 0
with self.assertRaises(ValidationError):
device.clean()
device.local_context_data = False
with self.assertRaises(ValidationError):
device.clean()
device.local_context_data = 'foo'
with self.assertRaises(ValidationError):
device.clean()
@tag('regression')
def test_multiple_tags_return_distinct_objects(self):
"""
Tagged items use a generic relationship, which results in duplicate rows being returned when queried.
This is combated by appending distinct() to the config context querysets. This test creates a config
context assigned to two tags and ensures objects related to those same two tags result in only a single
config context record being returned.
See https://github.com/netbox-community/netbox/issues/5314
"""
site = Site.objects.first()
platform = Platform.objects.first()
tenant = Tenant.objects.first()
tags = Tag.objects.all()
tag_context = ConfigContext.objects.create(
name="tag",
weight=100,
data={
"tag": 1
}
)
tag_context.tags.set(tags)
device = Device.objects.create(
name="Device 3",
site=site,
tenant=tenant,
platform=platform,
role=DeviceRole.objects.first(),
device_type=DeviceType.objects.first()
)
device.tags.set(tags)
annotated_queryset = Device.objects.filter(name=device.name).annotate_config_context_data()
self.assertEqual(ConfigContext.objects.get_for_object(device).count(), 1)
self.assertEqual(device.get_config_context(), annotated_queryset[0].get_config_context())
@tag('regression')
def test_multiple_tags_return_distinct_objects_with_separate_config_contexts(self):
"""
Tagged items use a generic relationship, which results in duplicate rows being returned when queried.
This is combated by appending distinct() to the config context querysets. This test creates a config
context assigned to two tags and ensures objects related to those same two tags result in only a single
config context record being returned.
This test case is separate from the above in that it deals with multiple config context objects in play.
See https://github.com/netbox-community/netbox/issues/5387
"""
site = Site.objects.first()
platform = Platform.objects.first()
tenant = Tenant.objects.first()
tag1, tag2 = list(Tag.objects.all())
tag_context_1 = ConfigContext.objects.create(
name="tag-1",
weight=100,
data={
"tag": 1
}
)
tag_context_1.tags.add(tag1)
tag_context_2 = ConfigContext.objects.create(
name="tag-2",
weight=100,
data={
"tag": 1
}
)
tag_context_2.tags.add(tag2)
device = Device.objects.create(
name="Device 3",
site=site,
tenant=tenant,
platform=platform,
role=DeviceRole.objects.first(),
device_type=DeviceType.objects.first()
)
device.tags.set([tag1, tag2])
annotated_queryset = Device.objects.filter(name=device.name).annotate_config_context_data()
self.assertEqual(ConfigContext.objects.get_for_object(device).count(), 2)
self.assertEqual(device.get_config_context(), annotated_queryset[0].get_config_context())
@tag('performance', 'regression')
def test_config_context_annotation_query_optimization(self):
"""
Regression test for issue #20327: Ensure config context annotation
doesn't use expensive DISTINCT on main query.
Verifies that DISTINCT is only used in tag subquery where needed,
not on the main device query which is expensive for large datasets.
"""
device = Device.objects.first()
queryset = Device.objects.filter(pk=device.pk).annotate_config_context_data()
# Main device query should NOT use DISTINCT
self.assertFalse(queryset.query.distinct)
# Check that tag subqueries DO use DISTINCT by inspecting the annotation
config_annotation = queryset.query.annotations.get('config_context_data')
self.assertIsNotNone(config_annotation)
def find_tag_subqueries(where_node):
"""Find subqueries in WHERE clause that relate to tag filtering"""
subqueries = []
def traverse(node):
if hasattr(node, 'children'):
for child in node.children:
try:
if child.rhs.query.model is TaggedItem:
subqueries.append(child.rhs.query)
except AttributeError:
traverse(child)
traverse(where_node)
return subqueries
# Find subqueries in the WHERE clause that should have DISTINCT
tag_subqueries = find_tag_subqueries(config_annotation.query.where)
distinct_subqueries = [sq for sq in tag_subqueries if sq.distinct]
# Verify we found at least one DISTINCT subquery for tags
self.assertEqual(len(distinct_subqueries), 1)
self.assertTrue(distinct_subqueries[0].distinct)
class ConfigTemplateTest(TestCase):
"""
TODO: These test cases deal with the weighting, ordering, and deep merge logic of config context data.
"""
MAIN_TEMPLATE = """
{%- include 'base.j2' %}
""".strip()
BASE_TEMPLATE = """
Hi
""".strip()
@classmethod
def _create_template_file(cls, templates_dir, file_name, content):
template_file_name = file_name
if not template_file_name.endswith('j2'):
template_file_name += '.j2'
temp_file_path = templates_dir / template_file_name
with open(temp_file_path, 'w') as f:
f.write(content)
@classmethod
def setUpTestData(cls):
temp_dir = tempfile.TemporaryDirectory()
templates_dir = Path(temp_dir.name) / "templates"
templates_dir.mkdir(parents=True, exist_ok=True)
cls._create_template_file(templates_dir, 'base.j2', cls.BASE_TEMPLATE)
cls._create_template_file(templates_dir, 'main.j2', cls.MAIN_TEMPLATE)
data_source = DataSource(
name="Test DataSource",
type="local",
source_url=str(templates_dir),
)
data_source.save()
data_source.sync()
base_config_template = ConfigTemplate(
name="BaseTemplate",
data_file=data_source.datafiles.filter(path__endswith='base.j2').first()
)
base_config_template.clean()
base_config_template.save()
cls.base_config_template = base_config_template
main_config_template = ConfigTemplate(
name="MainTemplate",
data_file=data_source.datafiles.filter(path__endswith='main.j2').first()
)
main_config_template.clean()
main_config_template.save()
cls.main_config_template = main_config_template
@tag('regression')
def test_config_template_with_data_source(self):
self.assertEqual(self.BASE_TEMPLATE, self.base_config_template.render({}))
@tag('regression')
def test_config_template_with_data_source_nested_templates(self):
self.assertEqual(self.BASE_TEMPLATE, self.main_config_template.render({}))
@tag('regression')
def test_autosyncrecord_cleanup_on_detach(self):
"""Test that AutoSyncRecord is deleted when detaching from DataSource."""
with tempfile.TemporaryDirectory() as temp_dir:
templates_dir = Path(temp_dir) / "templates"
templates_dir.mkdir(parents=True, exist_ok=True)
self._create_template_file(templates_dir, 'test.j2', 'Test content')
data_source = DataSource(
name="Test DataSource for Detach",
type="local",
source_url=str(templates_dir),
)
data_source.save()
data_source.sync()
data_file = data_source.datafiles.filter(path__endswith='test.j2').first()
# Create a ConfigTemplate with data_file and auto_sync_enabled
config_template = ConfigTemplate(
name="TestTemplateForDetach",
data_file=data_file,
auto_sync_enabled=True
)
config_template.clean()
config_template.save()
# Verify AutoSyncRecord was created
object_type = ObjectType.objects.get_for_model(ConfigTemplate)
autosync_records = AutoSyncRecord.objects.filter(
object_type=object_type,
object_id=config_template.pk
)
self.assertEqual(autosync_records.count(), 1, "AutoSyncRecord should be created")
# Detach from DataSource
config_template.data_file = None
config_template.data_source = None
config_template.auto_sync_enabled = False
config_template.clean()
config_template.save()
# Verify AutoSyncRecord was deleted
autosync_records = AutoSyncRecord.objects.filter(
object_type=object_type,
object_id=config_template.pk
)
self.assertEqual(autosync_records.count(), 0, "AutoSyncRecord should be deleted after detaching")