This commit is contained in:
m-hau 2025-09-05 22:33:26 +00:00 committed by GitHub
commit f8032f1cd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 399 additions and 4 deletions

View File

@ -84,6 +84,12 @@ class InterfaceTemplateImportForm(forms.ModelForm):
label=_('Type'),
choices=InterfaceTypeChoices.CHOICES
)
bridge = forms.ModelChoiceField(
label=_('Bridge'),
queryset=InterfaceTemplate.objects.all(),
to_field_name='name',
required=False
)
poe_mode = forms.ChoiceField(
choices=InterfacePoEModeChoices,
required=False,
@ -103,10 +109,24 @@ class InterfaceTemplateImportForm(forms.ModelForm):
class Meta:
model = InterfaceTemplate
fields = [
'device_type', 'module_type', 'name', 'label', 'type', 'enabled', 'mgmt_only', 'description', 'poe_mode',
'poe_type', 'rf_role'
'device_type', 'module_type', 'name', 'label', 'type', 'enabled', 'mgmt_only', 'description', 'bridge',
'poe_mode', 'poe_type', 'rf_role'
]
def clean_device_type(self):
if device_type := self.cleaned_data['device_type']:
bridge = self.fields['bridge']
bridge.queryset = bridge.queryset.filter(device_type=device_type)
return device_type
def clean_module_type(self):
if module_type := self.cleaned_data['module_type']:
bridge = self.fields['bridge']
bridge.queryset = bridge.queryset.filter(module_type=module_type)
return module_type
class FrontPortTemplateImportForm(forms.ModelForm):
type = forms.ChoiceField(

View File

@ -985,6 +985,262 @@ inventory-items:
ii1 = InventoryItemTemplate.objects.first()
self.assertEqual(ii1.name, 'Inventory Item 1')
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_import_nolist(self):
# Add all required permissions to the test user
self.add_permissions(
'dcim.view_devicetype',
'dcim.add_devicetype',
'dcim.add_consoleporttemplate',
'dcim.add_consoleserverporttemplate',
'dcim.add_powerporttemplate',
'dcim.add_poweroutlettemplate',
'dcim.add_interfacetemplate',
'dcim.add_frontporttemplate',
'dcim.add_rearporttemplate',
'dcim.add_modulebaytemplate',
'dcim.add_devicebaytemplate',
'dcim.add_inventoryitemtemplate',
)
for value in ('', 'null', '3', '"My console port"', '{name: "My other console port"}'):
with self.subTest(value=value):
import_data = f'''
manufacturer: Manufacturer 1
model: TEST-2000
slug: test-2000
u_height: 1
console-ports: {value}
'''
form_data = {
'data': import_data,
'format': 'yaml'
}
response = self.client.post(reverse('dcim:devicetype_bulk_import'), data=form_data, follow=True)
self.assertHttpStatus(response, 200)
self.assertContains(response, "console-ports: Must be a list.")
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_import_nodict(self):
# Add all required permissions to the test user
self.add_permissions(
'dcim.view_devicetype',
'dcim.add_devicetype',
'dcim.add_consoleporttemplate',
'dcim.add_consoleserverporttemplate',
'dcim.add_powerporttemplate',
'dcim.add_poweroutlettemplate',
'dcim.add_interfacetemplate',
'dcim.add_frontporttemplate',
'dcim.add_rearporttemplate',
'dcim.add_modulebaytemplate',
'dcim.add_devicebaytemplate',
'dcim.add_inventoryitemtemplate',
)
for value in ('', 'null', '3', '"My console port"', '["My other console port"]'):
with self.subTest(value=value):
import_data = f'''
manufacturer: Manufacturer 1
model: TEST-3000
slug: test-3000
u_height: 1
console-ports:
- {value}
'''
form_data = {
'data': import_data,
'format': 'yaml'
}
response = self.client.post(reverse('dcim:devicetype_bulk_import'), data=form_data, follow=True)
self.assertHttpStatus(response, 200)
self.assertContains(response, "console-ports[0]: Must be a dictionary.")
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_import_interfacebridge(self):
IMPORT_DATA = """
manufacturer: Manufacturer 1
model: TEST-4000
slug: test-4000
u_height: 1
interfaces:
- name: Standalone 1
type: 1000base-t
- name: Bridge Interface 2
type: 1000base-t
bridge: Bridge
- name: Bridge Interface 1
type: 1000base-t
bridge: Bridge
- name: Standalone 2
type: 1000base-t
- name: Sub-Bridge Interface 2
type: 1000base-t
bridge: Sub-Bridge
- name: Bridge
type: 1000base-t
- name: Sub-Bridge
type: 1000base-t
bridge: Bridge
- name: Bridge Interface 3
type: 1000base-t
bridge: Bridge
- name: Sub-Bridge Interface 1
type: 1000base-t
bridge: Sub-Bridge
- name: Standalone 3
type: 1000base-t
"""
# Add all required permissions to the test user
self.add_permissions(
'dcim.view_devicetype',
'dcim.add_devicetype',
'dcim.add_consoleporttemplate',
'dcim.add_consoleserverporttemplate',
'dcim.add_powerporttemplate',
'dcim.add_poweroutlettemplate',
'dcim.add_interfacetemplate',
'dcim.add_frontporttemplate',
'dcim.add_rearporttemplate',
'dcim.add_modulebaytemplate',
'dcim.add_devicebaytemplate',
'dcim.add_inventoryitemtemplate',
)
form_data = {
'data': IMPORT_DATA,
'format': 'yaml'
}
response = self.client.post(reverse('dcim:devicetype_bulk_import'), data=form_data, follow=True)
self.assertHttpStatus(response, 200)
self.assertContains(response, "Imported 1 device types")
device_type = DeviceType.objects.get(model='TEST-4000')
self.assertEqual(device_type.interfacetemplates.count(), 10)
interfaces = InterfaceTemplate.objects.all().order_by('id')
self.assertEqual(interfaces[0].name, 'Standalone 1')
self.assertIsNone(interfaces[0].bridge)
self.assertEqual(interfaces[1].name, 'Standalone 2')
self.assertIsNone(interfaces[1].bridge)
self.assertEqual(interfaces[2].name, 'Bridge')
self.assertIsNone(interfaces[2].bridge)
self.assertEqual(interfaces[3].name, 'Standalone 3')
self.assertIsNone(interfaces[3].bridge)
self.assertEqual(interfaces[4].name, 'Bridge Interface 2')
self.assertEqual(interfaces[4].bridge.name, "Bridge")
self.assertEqual(interfaces[5].name, 'Bridge Interface 1')
self.assertEqual(interfaces[5].bridge.name, "Bridge")
self.assertEqual(interfaces[6].name, 'Sub-Bridge')
self.assertEqual(interfaces[6].bridge.name, "Bridge")
self.assertEqual(interfaces[7].name, 'Bridge Interface 3')
self.assertEqual(interfaces[7].bridge.name, "Bridge")
self.assertEqual(interfaces[8].name, 'Sub-Bridge Interface 2')
self.assertEqual(interfaces[8].bridge.name, "Sub-Bridge")
self.assertEqual(interfaces[9].name, 'Sub-Bridge Interface 1')
self.assertEqual(interfaces[9].bridge.name, "Sub-Bridge")
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_import_interfacebridge_cycle(self):
IMPORT_DATA = """
manufacturer: Manufacturer 1
model: TEST-5000
slug: test-5000
u_height: 1
interfaces:
- name: Cycle Interface 1
type: 1000base-t
bridge: Cycle Interface 2
- name: Cycle Interface 2
type: 1000base-t
bridge: Cycle Interface 3
- name: Cycle Interface 3
type: 1000base-t
bridge: Cycle Interface 1
- name: Unrelated Interface 1
type: 1000base-t
- name: Unrelated Interface 2
type: 1000base-t
bridge: Cycle Interface 1
- name: Unrelated Interface 3
type: 1000base-t
bridge: Cycle Interface 3
"""
# Add all required permissions to the test user
self.add_permissions(
'dcim.view_devicetype',
'dcim.add_devicetype',
'dcim.add_consoleporttemplate',
'dcim.add_consoleserverporttemplate',
'dcim.add_powerporttemplate',
'dcim.add_poweroutlettemplate',
'dcim.add_interfacetemplate',
'dcim.add_frontporttemplate',
'dcim.add_rearporttemplate',
'dcim.add_modulebaytemplate',
'dcim.add_devicebaytemplate',
'dcim.add_inventoryitemtemplate',
)
form_data = {
'data': IMPORT_DATA,
'format': 'yaml'
}
response = self.client.post(reverse('dcim:devicetype_bulk_import'), data=form_data, follow=True)
self.assertHttpStatus(response, 200)
self.assertContains(response, "interfaces: Dependency cycle [Cycle Interface 1, Cycle Interface 2, "
"Cycle Interface 3, Cycle Interface 1] detected")
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_import_interfacebridge_invalid(self):
IMPORT_DATA = """
manufacturer: Manufacturer 1
model: TEST-6000
slug: test-6000
u_height: 1
interfaces:
- name: Interface 1
type: 1000base-t
- name: Interface 2
type: 1000base-t
bridge: Non-existent Bridge
- name: Interface 3
type: 1000base-t
"""
# Add all required permissions to the test user
self.add_permissions(
'dcim.view_devicetype',
'dcim.add_devicetype',
'dcim.add_consoleporttemplate',
'dcim.add_consoleserverporttemplate',
'dcim.add_powerporttemplate',
'dcim.add_poweroutlettemplate',
'dcim.add_interfacetemplate',
'dcim.add_frontporttemplate',
'dcim.add_rearporttemplate',
'dcim.add_modulebaytemplate',
'dcim.add_devicebaytemplate',
'dcim.add_inventoryitemtemplate',
)
form_data = {
'data': IMPORT_DATA,
'format': 'yaml'
}
response = self.client.post(reverse('dcim:devicetype_bulk_import'), data=form_data, follow=True)
self.assertHttpStatus(response, 200)
self.assertContains(response, "interfaces[1] bridge: Select a valid choice. "
"That choice is not one of the available choices.")
def test_export_objects(self):
url = reverse('dcim:devicetype_list')
self.add_permissions('dcim.view_devicetype')

View File

@ -1,9 +1,11 @@
from collections import deque
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.core.paginator import EmptyPage, PageNotAnInteger
from django.db import router, transaction
from django.db.models import Prefetch
from django.forms import ModelMultipleChoiceField, MultipleHiddenInput, modelformset_factory
from django.forms import ModelMultipleChoiceField, MultipleHiddenInput, ValidationError, modelformset_factory
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils.html import escape
@ -1317,6 +1319,99 @@ class DeviceTypeImportView(generic.BulkImportView):
'inventory-items': forms.InventoryItemTemplateImportForm,
}
def _sort_interfaces(self, interfaces):
"""Sort the given enumerated interface list to satisfy any dependencies."""
if not interfaces:
return
# build the dependency graph
all_interface_names = set()
sorting_required = False
required_by = dict() # ifname to list of dependant subinterfaces # TODO replace list with ordered set
requires = dict() # ifname to set of depended-on superinterfaces
for idx, interface in interfaces:
if not isinstance(interface, dict): # TODO isinstance(MutableMapping)?
# not a dict, will fail validation anyway
# but still sort if required, to prevent false "bridge is invalid" errors on any other valid interfaces
continue
if not interface.get('name'):
# no interface name, will fail validation anyway
continue
ifname = interface['name']
all_interface_names.add(ifname)
requirements = set()
if bridge := interface.get('bridge'):
requirements.add(bridge)
requires[ifname] = requirements
for requirement in list(requirements):
required_by.setdefault(requirement, list()).append(ifname)
# if we haven't seen all requirements yet, sorting is needed
sorting_required |= not requirements.issubset(all_interface_names)
if not sorting_required:
return
# use Kahn's algorithm to build a topological sorting
workqueue = deque(ifname for ifname, requirements in requires.items() if not requirements)
ifname_ordering = list()
while workqueue:
ifname = workqueue.popleft()
ifname_ordering.append(ifname)
for dependant in list(required_by.get(ifname, list())):
requires[dependant].remove(ifname)
if not requires[dependant]:
workqueue.append(dependant)
if len(ifname_ordering) != len(set(ifname_ordering)):
# should never happen
raise ValueError("Interface ordering contains duplicates")
unsatisfied_requirements = list(ifname for ifname, deps in requires.items() if deps)
if unsatisfied_requirements:
def find_cycle(visited_interfaces):
"""Recursive depth-first search to identify cycles."""
for ifname in list(required_by.get(visited_interfaces[-1], list())):
if ifname in visited_interfaces:
# found a cycle and its start
start_index = visited_interfaces.index(ifname)
visited_interfaces.append(ifname)
return list(reversed(visited_interfaces[start_index:]))
result = find_cycle(visited_interfaces + [ifname])
if result:
return result
return None
# Check if there is a cycle
for ifname in unsatisfied_requirements:
cycle = find_cycle([ifname])
if cycle:
# stop at the first one, finding all while avoiding duplicates would be hard
raise ValidationError(
_("Dependency cycle [%(interfaces)s] detected"),
params={"interfaces": ", ".join(cycle)},
)
# no cycle, so the unsatisfied requirements must be due to requirements on non-existent interfaces,
# which will cause a validation error later when checking the individual interface objects.
# apply the topological sorting to the actual list
def get_sort_key(interface):
try:
return ifname_ordering.index(interface['name'])
except Exception:
# Everything broken/invalid goes to the beginning, so validation fails fast
return -1
interfaces.sort(key=lambda entry_tuple: get_sort_key(entry_tuple[1]))
def prep_related_object_list(self, field_name, enumerated_list):
if field_name == 'interfaces':
self._sort_interfaces(enumerated_list)
def prep_related_object_data(self, parent, data):
data.update({'device_type': parent})
return data

View File

@ -335,6 +335,13 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
def get_required_permission(self):
return get_permission_for_model(self.queryset.model, 'add')
def prep_related_object_list(self, field_name, enumerated_list):
"""
Hook to modify the enumerated list of related objects before it's passed to the related object form (for
example, to change the order).
"""
pass # TODO keep in-place only, or return modified list?
def prep_related_object_data(self, parent, data):
"""
Hook to modify the data for related objects before it's passed to the related object form (for example, to
@ -381,8 +388,25 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
# Iterate through the related object forms (if any), validating and saving each instance.
for field_name, related_object_form in self.related_object_forms.items():
related_objects = model_form.data.get(field_name, list())
if not isinstance(related_objects, list): # TODO isinstance(Sequence)?
import_form.add_error(None, f"{field_name}: {_('Must be a list.')}")
raise AbortTransaction()
related_objects = list(enumerate(related_objects))
try:
self.prep_related_object_list(field_name, related_objects)
except ValidationError as e:
for message in e.messages:
import_form.add_error(None, f"{field_name}: {message}")
raise AbortTransaction()
related_obj_pks = []
for i, rel_obj_data in enumerate(model_form.data.get(field_name, list())):
for i, rel_obj_data in related_objects:
if not isinstance(rel_obj_data, dict): # TODO isinstance(MutableMapping)?
import_form.add_error(None, f"{field_name}[{i}]: {_('Must be a dictionary.')}")
raise AbortTransaction()
rel_obj_data = self.prep_related_object_data(obj, rel_obj_data)
f = related_object_form(rel_obj_data)