From c111c083152db2b99b0507d42218fcc7365f9abb Mon Sep 17 00:00:00 2001 From: Jonathan Senecal Date: Thu, 20 Nov 2025 12:06:16 +0000 Subject: [PATCH] Add dynamic parent resolution for cable CSV imports Replace device-specific fields with generic parent fields to support circuits, power panels, and other cable termination types. --- netbox/dcim/forms/bulk_import.py | 194 ++++++++++++++++++++++------- netbox/dcim/tests/test_forms.py | 206 ++++++++++++++++++++++++++++++- netbox/dcim/tests/test_views.py | 39 +++++- 3 files changed, 389 insertions(+), 50 deletions(-) diff --git a/netbox/dcim/forms/bulk_import.py b/netbox/dcim/forms/bulk_import.py index dc01f26da..e5a103a37 100644 --- a/netbox/dcim/forms/bulk_import.py +++ b/netbox/dcim/forms/bulk_import.py @@ -5,6 +5,7 @@ from django.core.exceptions import ObjectDoesNotExist from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ +from circuits.models import Circuit from dcim.choices import * from dcim.constants import * from dcim.models import * @@ -1398,19 +1399,52 @@ class MACAddressImportForm(NetBoxModelImportForm): # class CableImportForm(NetBoxModelImportForm): + """ + CSV bulk import form for cables. + + Supports dynamic parent model resolution - terminations are identified by their parent + object (device, circuit, or power panel) and termination name. + + The parent field resolves to different models based on the termination type + See CABLE_PARENT_MAPPING for supported termination types. + """ + + # Map cable termination content types to their parent model and lookup field. + # + # This mapping enables dynamic parent model resolution during cable CSV imports. + # Each entry maps a termination type to a tuple of (parent_content_type, accessor): + # + # Format: 'app.model': ('parent_app.ParentModel', 'accessor') + # + CABLE_PARENT_MAPPING = { + 'dcim.interface': ('dcim.Device', 'name'), + 'dcim.consoleport': ('dcim.Device', 'name'), + 'dcim.consoleserverport': ('dcim.Device', 'name'), + 'dcim.powerport': ('dcim.Device', 'name'), + 'dcim.poweroutlet': ('dcim.Device', 'name'), + 'dcim.frontport': ('dcim.Device', 'name'), + 'dcim.rearport': ('dcim.Device', 'name'), + 'circuits.circuittermination': ('circuits.Circuit', 'cid'), + 'dcim.powerfeed': ('dcim.PowerPanel', 'name'), + } + # Map parent model name to (parent_field_name, termination_name_field, value_transform) + TERMINATION_FIELDS = { + 'Circuit': ('circuit', 'term_side', str.upper), + 'Device': ('device', 'name', None), + 'PowerPanel': ('power_panel', 'name', None), + } + # Termination A side_a_site = CSVModelChoiceField( label=_('Side A site'), queryset=Site.objects.all(), required=False, to_field_name='name', - help_text=_('Site of parent device A (if any)'), + help_text=_('Site of parent A (if any)') ) - side_a_device = CSVModelChoiceField( - label=_('Side A device'), - queryset=Device.objects.all(), - to_field_name='name', - help_text=_('Device name') + side_a_parent = forms.CharField( + label=_('Side A parent'), + help_text=_('Device name, Circuit CID, or Power Panel name') ) side_a_type = CSVContentTypeField( label=_('Side A type'), @@ -1429,13 +1463,11 @@ class CableImportForm(NetBoxModelImportForm): queryset=Site.objects.all(), required=False, to_field_name='name', - help_text=_('Site of parent device B (if any)'), + help_text=_('Site of parent B (if any)') ) - side_b_device = CSVModelChoiceField( - label=_('Side B device'), - queryset=Device.objects.all(), - to_field_name='name', - help_text=_('Device name') + side_b_parent = forms.CharField( + label=_('Side B parent'), + help_text=_('Device name, Circuit CID, or Power Panel name') ) side_b_type = CSVContentTypeField( label=_('Side B type'), @@ -1484,7 +1516,7 @@ class CableImportForm(NetBoxModelImportForm): class Meta: model = Cable fields = [ - 'side_a_site', 'side_a_device', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_device', 'side_b_type', + 'side_a_site', 'side_a_parent', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_parent', 'side_b_type', 'side_b_name', 'type', 'status', 'tenant', 'label', 'color', 'length', 'length_unit', 'description', 'comments', 'tags', ] @@ -1492,21 +1524,6 @@ class CableImportForm(NetBoxModelImportForm): def __init__(self, data=None, *args, **kwargs): super().__init__(data, *args, **kwargs) - if data: - # Limit choices for side_a_device to the assigned side_a_site - if side_a_site := data.get('side_a_site'): - side_a_device_params = {f'site__{self.fields["side_a_site"].to_field_name}': side_a_site} - self.fields['side_a_device'].queryset = self.fields['side_a_device'].queryset.filter( - **side_a_device_params - ) - - # Limit choices for side_b_device to the assigned side_b_site - if side_b_site := data.get('side_b_site'): - side_b_device_params = {f'site__{self.fields["side_b_site"].to_field_name}': side_b_site} - self.fields['side_b_device'].queryset = self.fields['side_b_device'].queryset.filter( - **side_b_device_params - ) - def _clean_side(self, side): """ Derive a Cable's A/B termination objects. @@ -1515,31 +1532,118 @@ class CableImportForm(NetBoxModelImportForm): """ assert side in 'ab', f"Invalid side designation: {side}" - device = self.cleaned_data.get(f'side_{side}_device') content_type = self.cleaned_data.get(f'side_{side}_type') + site = self.cleaned_data.get(f'side_{side}_site') + parent_value = self.cleaned_data.get(f'side_{side}_parent') name = self.cleaned_data.get(f'side_{side}_name') - if not device or not content_type or not name: + + if not parent_value or not content_type or not name: # pragma: no cover return None - model = content_type.model_class() + # Get the parent model mapping from the submitted content_type + parent_map = self.CABLE_PARENT_MAPPING.get(f'{content_type.app_label}.{content_type.model}') + # This should never happen + assert parent_map, ( + 'Unknown cable termination content type parent mapping: ' + f'{content_type.app_label}.{content_type.model}' + ) + + parent_content_type, parent_accessor = parent_map + parent_app_label, parent_model_name = parent_content_type.split('.') + + # Get the parent model class try: - if device.virtual_chassis and device.virtual_chassis.master == device and \ - model.objects.filter(device=device, name=name).count() == 0: - termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name) - else: - termination_object = model.objects.get(device=device, name=name) - if termination_object.cable is not None and termination_object.cable != self.instance: - raise forms.ValidationError( - _("Side {side_upper}: {device} {termination_object} is already connected").format( - side_upper=side.upper(), device=device, termination_object=termination_object - ) - ) - except ObjectDoesNotExist: + parent_ct = ContentType.objects.get(app_label=parent_app_label.lower(), model=parent_model_name.lower()) + parent_model: Device | PowerPanel | Circuit = parent_ct.model_class() + except ContentType.DoesNotExist: # pragma: no cover + # This should never happen + raise AssertionError(f'Unknown cable termination parent content type: {parent_content_type}') + + # Build query for parent lookup + parent_query = {parent_accessor: parent_value} + # Add site to query if provided + if site: + parent_query['site'] = site + + # Look up the parent object + try: + parent_object = parent_model.objects.get(**parent_query) + except parent_model.DoesNotExist: raise forms.ValidationError( - _("{side_upper} side termination not found: {device} {name}").format( - side_upper=side.upper(), device=device, name=name + _('Side {side_upper}: {model_name} not found: {value}').format( + side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value ) ) + except parent_model.MultipleObjectsReturned: + raise forms.ValidationError( + _('Side {side_upper}: Multiple {model_name} objects found: {value}').format( + side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value + ) + ) + + # Get the termination model class + termination_model = content_type.model_class() + + # Build the query to find the termination object + field_mapping = self.TERMINATION_FIELDS.get(parent_model.__name__) + if not field_mapping: # pragma: no cover + return None + + parent_field, name_field, value_transform = field_mapping + query = {parent_field: parent_object} + + if value_transform: + name = value_transform(name) + + if name: + query[name_field] = name + + # Add site to query if provided (for site-scoped parents) + if site and parent_field in ('device', 'power_panel'): + query[f'{parent_field}__site'] = site + + # Look up the termination object + try: + # Handle virtual chassis for device-based terminations + if (parent_field == 'device' and + parent_object.virtual_chassis and + parent_object.virtual_chassis.master == parent_object and + termination_model.objects.filter(**query).count() == 0): + query[f'{parent_field}__in'] = parent_object.virtual_chassis.members.all() + query.pop(parent_field, None) + termination_object = termination_model.objects.get(**query) + else: + termination_object = termination_model.objects.get(**query) + + # Check if already connected to a cable + if termination_object.cable is not None and termination_object.cable != self.instance: + raise forms.ValidationError( + _('Side {side_upper}: {parent} {termination} is already connected').format( + side_upper=side.upper(), parent=parent_object, termination=termination_object + ) + ) + + # Circuit terminations can also be connected to provider networks + if (name_field == 'term_side' and + hasattr(termination_object, '_provider_network') and + termination_object._provider_network is not None): + raise forms.ValidationError( + _('Side {side_upper}: {parent} {termination} is already connected to a provider network').format( + side_upper=side.upper(), parent=parent_object, termination=termination_object + ) + ) + except termination_model.DoesNotExist: + raise forms.ValidationError( + _('Side {side_upper}: {model_name} not found: {parent} {name}').format( + side_upper=side.upper(), + model_name=termination_model.__name__, + parent=parent_object, name=name or '', + ), + ) + except termination_model.MultipleObjectsReturned: # pragma: no cover + # This should never happen + raise AssertionError('Multiple termination objects returned for query: {query}'.format(query=query)) + setattr(self.instance, f'{side}_terminations', [termination_object]) return termination_object diff --git a/netbox/dcim/tests/test_forms.py b/netbox/dcim/tests/test_forms.py index fa654f789..6f230ce9a 100644 --- a/netbox/dcim/tests/test_forms.py +++ b/netbox/dcim/tests/test_forms.py @@ -1,8 +1,9 @@ from django.test import TestCase +from circuits.models import Circuit, CircuitTermination, CircuitType, Provider, ProviderNetwork from dcim.choices import ( - DeviceFaceChoices, DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, PortTypeChoices, - PowerOutletStatusChoices, + CableTypeChoices, DeviceFaceChoices, DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, + PortTypeChoices, PowerOutletStatusChoices, ) from dcim.forms import * from dcim.models import * @@ -411,3 +412,204 @@ class InterfaceTestCase(TestCase): self.assertNotIn('untagged_vlan', form.cleaned_data.keys()) self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) self.assertNotIn('qinq_svlan', form.cleaned_data.keys()) + + +class CableImportFormTestCase(TestCase): + """ + Test cases for CableImportForm error handling and edge cases. + + Note: Happy path scenarios (successful cable creation) are covered by + dcim.tests.test_views.CableTestCase which tests the bulk import view. + These tests focus on validation errors and edge cases not covered by the view tests. + """ + + @classmethod + def setUpTestData(cls): + # Create sites + cls.site_a = Site.objects.create(name='Site A', slug='site-a') + cls.site_b = Site.objects.create(name='Site B', slug='site-b') + + # Create manufacturer and device type + manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1') + device_type = DeviceType.objects.create( + manufacturer=manufacturer, + model='Device Type 1', + slug='device-type-1', + ) + role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1', color='ff0000') + + # Create devices + cls.device_a1 = Device.objects.create( + name='Device-A1', + device_type=device_type, + role=role, + site=cls.site_a, + ) + cls.device_a2 = Device.objects.create( + name='Device-A2', + device_type=device_type, + role=role, + site=cls.site_a, + ) + # Device with same name in different site + cls.device_b_duplicate = Device.objects.create( + name='Device-A1', # Same name as device_a1 + device_type=device_type, + role=role, + site=cls.site_b, + ) + + # Create interfaces + cls.interface_a1_eth0 = Interface.objects.create( + device=cls.device_a1, + name='eth0', + type=InterfaceTypeChoices.TYPE_1GE_FIXED, + ) + cls.interface_a2_eth0 = Interface.objects.create( + device=cls.device_a2, + name='eth0', + type=InterfaceTypeChoices.TYPE_1GE_FIXED, + ) + + # Create circuit for testing circuit not found error + provider = Provider.objects.create(name='Provider 1', slug='provider-1') + circuit_type = CircuitType.objects.create(name='Circuit Type 1', slug='circuit-type-1') + cls.circuit = Circuit.objects.create( + provider=provider, + type=circuit_type, + cid='CIRCUIT-001', + ) + cls.circuit_term_a = CircuitTermination.objects.create( + circuit=cls.circuit, + term_side='A', + ) + + # Create provider network for testing provider network validation + cls.provider_network = ProviderNetwork.objects.create( + provider=provider, + name='Provider Network 1', + ) + + def test_device_not_found(self): + """Test error when parent device is not found.""" + form = CableImportForm(data={ + 'side_a_site': 'Site A', + 'side_a_parent': 'NonexistentDevice', + 'side_a_type': 'dcim.interface', + 'side_a_name': 'eth0', + 'side_b_site': 'Site A', + 'side_b_parent': 'Device-A2', + 'side_b_type': 'dcim.interface', + 'side_b_name': 'eth0', + 'type': CableTypeChoices.TYPE_CAT6, + 'status': 'connected', + }) + self.assertFalse(form.is_valid()) + self.assertIn('Side A: Device not found: NonexistentDevice', str(form.errors)) + + def test_circuit_not_found(self): + """Test error when circuit is not found.""" + form = CableImportForm(data={ + 'side_a_site': None, + 'side_a_parent': 'NONEXISTENT-CID', + 'side_a_type': 'circuits.circuittermination', + 'side_a_name': 'A', + 'side_b_site': 'Site A', + 'side_b_parent': 'Device-A1', + 'side_b_type': 'dcim.interface', + 'side_b_name': 'eth0', + 'type': CableTypeChoices.TYPE_MMF_OM4, + 'status': 'connected', + }) + self.assertFalse(form.is_valid()) + self.assertIn('Side A: Circuit not found: NONEXISTENT-CID', str(form.errors)) + + def test_termination_not_found(self): + """Test error when termination is not found on parent.""" + form = CableImportForm(data={ + 'side_a_site': 'Site A', + 'side_a_parent': 'Device-A1', + 'side_a_type': 'dcim.interface', + 'side_a_name': 'eth999', # Nonexistent interface + 'side_b_site': 'Site A', + 'side_b_parent': 'Device-A2', + 'side_b_type': 'dcim.interface', + 'side_b_name': 'eth0', + 'type': CableTypeChoices.TYPE_CAT6, + 'status': 'connected', + }) + self.assertFalse(form.is_valid()) + self.assertIn('Side A: Interface not found', str(form.errors)) + + def test_termination_already_cabled(self): + """Test error when termination is already connected to a cable.""" + # Create an existing cable + existing_cable = Cable.objects.create(type=CableTypeChoices.TYPE_CAT6, status='connected') + self.interface_a1_eth0.cable = existing_cable + self.interface_a1_eth0.save() + + form = CableImportForm(data={ + 'side_a_site': 'Site A', + 'side_a_parent': 'Device-A1', + 'side_a_type': 'dcim.interface', + 'side_a_name': 'eth0', + 'side_b_site': 'Site A', + 'side_b_parent': 'Device-A2', + 'side_b_type': 'dcim.interface', + 'side_b_name': 'eth0', + 'type': CableTypeChoices.TYPE_CAT6, + 'status': 'connected', + }) + self.assertFalse(form.is_valid()) + self.assertIn('already connected', str(form.errors)) + + def test_circuit_termination_with_provider_network(self): + """Test error when circuit termination is already connected to a provider network.""" + from django.contrib.contenttypes.models import ContentType + + # Connect circuit termination to provider network + circuit_term = CircuitTermination.objects.get(pk=self.circuit_term_a.pk) + pn_ct = ContentType.objects.get_for_model(ProviderNetwork) + circuit_term.termination_type = pn_ct + circuit_term.termination_id = self.provider_network.pk + circuit_term.save() + + try: + form = CableImportForm(data={ + 'side_a_site': None, + 'side_a_parent': 'CIRCUIT-001', + 'side_a_type': 'circuits.circuittermination', + 'side_a_name': 'A', + 'side_b_site': 'Site A', + 'side_b_parent': 'Device-A1', + 'side_b_type': 'dcim.interface', + 'side_b_name': 'eth0', + 'type': CableTypeChoices.TYPE_MMF_OM4, + 'status': 'connected', + }) + self.assertFalse(form.is_valid()) + self.assertIn('already connected to a provider network', str(form.errors)) + finally: + # Clean up: remove provider network connection + circuit_term.termination_type = None + circuit_term.termination_id = None + circuit_term.save() + + def test_multiple_parents_without_site(self): + """Test error when multiple parent objects are found without site scoping.""" + # Device-A1 exists in both site_a and site_b + # Try to find device without specifying site + form = CableImportForm(data={ + 'side_a_site': '', # Empty site - should cause multiple matches + 'side_a_parent': 'Device-A1', + 'side_a_type': 'dcim.interface', + 'side_a_name': 'eth0', + 'side_b_site': 'Site A', + 'side_b_parent': 'Device-A2', + 'side_b_type': 'dcim.interface', + 'side_b_name': 'eth0', + 'type': CableTypeChoices.TYPE_CAT6, + 'status': 'connected', + }) + self.assertFalse(form.is_valid()) + self.assertIn('Multiple Device objects found', str(form.errors)) diff --git a/netbox/dcim/tests/test_views.py b/netbox/dcim/tests/test_views.py index 6a34df652..785b12456 100644 --- a/netbox/dcim/tests/test_views.py +++ b/netbox/dcim/tests/test_views.py @@ -11,6 +11,7 @@ from core.models import ObjectType from dcim.choices import * from dcim.constants import * from dcim.models import * +from circuits.models import Circuit, CircuitTermination, CircuitType, Provider from ipam.models import ASN, RIR, VLAN, VRF from netbox.choices import CSVDelimiterChoices, ImportFormatChoices, WeightUnitChoices from tenancy.models import Tenant @@ -3495,7 +3496,7 @@ class CableTestCase( Interface(device=devices[4], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[4], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), - # Device 1, Site 2 + # Device 5, Site 2 Interface(device=devices[5], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[5], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[5], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), @@ -3507,6 +3508,22 @@ class CableTestCase( ) Interface.objects.bulk_create(interfaces) + ConsolePort.objects.create(device=devices[0], name='Console 1') + ConsoleServerPort.objects.create(device=devices[1], name='Console Server 1') + + power_panel = PowerPanel.objects.create(site=sites[0], name='Power Panel 1') + PowerFeed.objects.create(power_panel=power_panel, name='Feed 1') + PowerPort.objects.create(device=devices[0], name='PSU1') + + provider = Provider.objects.create(name='Provider 1', slug='provider-1') + circuit_type = CircuitType.objects.create(name='Circuit Type 1', slug='circuit-type-1') + circuit = Circuit.objects.create(provider=provider, type=circuit_type, cid='CIRCUIT-001') + circuit_terminations = ( + CircuitTermination(circuit=circuit, term_side='A'), + CircuitTermination(circuit=circuit, term_side='Z'), + ) + CircuitTermination.objects.bulk_create(circuit_terminations) + cable1 = Cable(a_terminations=[interfaces[0]], b_terminations=[interfaces[3]], type=CableTypeChoices.TYPE_CAT6) cable1.save() cable2 = Cable(a_terminations=[interfaces[1]], b_terminations=[interfaces[4]], type=CableTypeChoices.TYPE_CAT6) @@ -3532,7 +3549,7 @@ class CableTestCase( cls.csv_data = { 'default': ( - "side_a_device,side_a_type,side_a_name,side_b_device,side_b_type,side_b_name", + "side_a_parent,side_a_type,side_a_name,side_b_parent,side_b_type,side_b_name", "Device 4,dcim.interface,Interface 1,Device 5,dcim.interface,Interface 1", "Device 3,dcim.interface,Interface 2,Device 4,dcim.interface,Interface 2", "Device 3,dcim.interface,Interface 3,Device 4,dcim.interface,Interface 3", @@ -3545,12 +3562,28 @@ class CableTestCase( 'site-filtering': ( # Ensure that CSV bulk import supports assigning terminations from parent devices # that share the same device name, provided those devices belong to different sites. - "side_a_site,side_a_device,side_a_type,side_a_name,side_b_site,side_b_device,side_b_type,side_b_name", + "side_a_site,side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name", "Site 1,Device 3,dcim.interface,Interface 1,Site 2,Device 1,dcim.interface,Interface 1", "Site 1,Device 3,dcim.interface,Interface 2,Site 2,Device 1,dcim.interface,Interface 2", "Site 1,Device 3,dcim.interface,Interface 3,Site 2,Device 1,dcim.interface,Interface 3", "Site 1,Device 1,dcim.interface,Device 2 Interface,Site 2,Device 1,dcim.interface,Interface 4", "Site 1,Device 1,dcim.interface,Device 3 Interface,Site 2,Device 1,dcim.interface,Interface 5", + ), + 'circuits': ( + # Test circuit termination to interface cables + "side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name", + "CIRCUIT-001,circuits.circuittermination,A,Site 1,Device 4,dcim.interface,Interface 2", + "CIRCUIT-001,circuits.circuittermination,z,Site 2,Device 5,dcim.interface,Interface 2", + ), + 'power': ( + # Test power feed to power port cables + "side_a_site,side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name", + "Site 1,Power Panel 1,dcim.powerfeed,Feed 1,Site 1,Device 1,dcim.powerport,PSU1", + ), + 'console': ( + # Test console port to console server port cables + "side_a_site,side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name", + "Site 1,Device 1,dcim.consoleport,Console 1,Site 1,Device 2,dcim.consoleserverport,Console Server 1", ) }