This commit is contained in:
Jonathan Senecal
2025-12-09 22:06:34 -06:00
committed by GitHub
3 changed files with 389 additions and 50 deletions

View File

@@ -5,6 +5,7 @@ from django.core.exceptions import ObjectDoesNotExist
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from circuits.models import Circuit
from dcim.choices import *
from dcim.constants import *
from dcim.models import *
@@ -1414,19 +1415,52 @@ class MACAddressImportForm(NetBoxModelImportForm):
#
class CableImportForm(NetBoxModelImportForm):
"""
CSV bulk import form for cables.
Supports dynamic parent model resolution - terminations are identified by their parent
object (device, circuit, or power panel) and termination name.
The parent field resolves to different models based on the termination type
See CABLE_PARENT_MAPPING for supported termination types.
"""
# Map cable termination content types to their parent model and lookup field.
#
# This mapping enables dynamic parent model resolution during cable CSV imports.
# Each entry maps a termination type to a tuple of (parent_content_type, accessor):
#
# Format: 'app.model': ('parent_app.ParentModel', 'accessor')
#
CABLE_PARENT_MAPPING = {
'dcim.interface': ('dcim.Device', 'name'),
'dcim.consoleport': ('dcim.Device', 'name'),
'dcim.consoleserverport': ('dcim.Device', 'name'),
'dcim.powerport': ('dcim.Device', 'name'),
'dcim.poweroutlet': ('dcim.Device', 'name'),
'dcim.frontport': ('dcim.Device', 'name'),
'dcim.rearport': ('dcim.Device', 'name'),
'circuits.circuittermination': ('circuits.Circuit', 'cid'),
'dcim.powerfeed': ('dcim.PowerPanel', 'name'),
}
# Map parent model name to (parent_field_name, termination_name_field, value_transform)
TERMINATION_FIELDS = {
'Circuit': ('circuit', 'term_side', str.upper),
'Device': ('device', 'name', None),
'PowerPanel': ('power_panel', 'name', None),
}
# Termination A
side_a_site = CSVModelChoiceField(
label=_('Side A site'),
queryset=Site.objects.all(),
required=False,
to_field_name='name',
help_text=_('Site of parent device A (if any)'),
help_text=_('Site of parent A (if any)')
)
side_a_device = CSVModelChoiceField(
label=_('Side A device'),
queryset=Device.objects.all(),
to_field_name='name',
help_text=_('Device name')
side_a_parent = forms.CharField(
label=_('Side A parent'),
help_text=_('Device name, Circuit CID, or Power Panel name')
)
side_a_type = CSVContentTypeField(
label=_('Side A type'),
@@ -1445,13 +1479,11 @@ class CableImportForm(NetBoxModelImportForm):
queryset=Site.objects.all(),
required=False,
to_field_name='name',
help_text=_('Site of parent device B (if any)'),
help_text=_('Site of parent B (if any)')
)
side_b_device = CSVModelChoiceField(
label=_('Side B device'),
queryset=Device.objects.all(),
to_field_name='name',
help_text=_('Device name')
side_b_parent = forms.CharField(
label=_('Side B parent'),
help_text=_('Device name, Circuit CID, or Power Panel name')
)
side_b_type = CSVContentTypeField(
label=_('Side B type'),
@@ -1500,7 +1532,7 @@ class CableImportForm(NetBoxModelImportForm):
class Meta:
model = Cable
fields = [
'side_a_site', 'side_a_device', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_device', 'side_b_type',
'side_a_site', 'side_a_parent', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_parent', 'side_b_type',
'side_b_name', 'type', 'status', 'tenant', 'label', 'color', 'length', 'length_unit', 'description',
'comments', 'tags',
]
@@ -1508,21 +1540,6 @@ class CableImportForm(NetBoxModelImportForm):
def __init__(self, data=None, *args, **kwargs):
super().__init__(data, *args, **kwargs)
if data:
# Limit choices for side_a_device to the assigned side_a_site
if side_a_site := data.get('side_a_site'):
side_a_device_params = {f'site__{self.fields["side_a_site"].to_field_name}': side_a_site}
self.fields['side_a_device'].queryset = self.fields['side_a_device'].queryset.filter(
**side_a_device_params
)
# Limit choices for side_b_device to the assigned side_b_site
if side_b_site := data.get('side_b_site'):
side_b_device_params = {f'site__{self.fields["side_b_site"].to_field_name}': side_b_site}
self.fields['side_b_device'].queryset = self.fields['side_b_device'].queryset.filter(
**side_b_device_params
)
def _clean_side(self, side):
"""
Derive a Cable's A/B termination objects.
@@ -1531,31 +1548,118 @@ class CableImportForm(NetBoxModelImportForm):
"""
assert side in 'ab', f"Invalid side designation: {side}"
device = self.cleaned_data.get(f'side_{side}_device')
content_type = self.cleaned_data.get(f'side_{side}_type')
site = self.cleaned_data.get(f'side_{side}_site')
parent_value = self.cleaned_data.get(f'side_{side}_parent')
name = self.cleaned_data.get(f'side_{side}_name')
if not device or not content_type or not name:
if not parent_value or not content_type or not name: # pragma: no cover
return None
model = content_type.model_class()
# Get the parent model mapping from the submitted content_type
parent_map = self.CABLE_PARENT_MAPPING.get(f'{content_type.app_label}.{content_type.model}')
# This should never happen
assert parent_map, (
'Unknown cable termination content type parent mapping: '
f'{content_type.app_label}.{content_type.model}'
)
parent_content_type, parent_accessor = parent_map
parent_app_label, parent_model_name = parent_content_type.split('.')
# Get the parent model class
try:
if device.virtual_chassis and device.virtual_chassis.master == device and \
model.objects.filter(device=device, name=name).count() == 0:
termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name)
else:
termination_object = model.objects.get(device=device, name=name)
if termination_object.cable is not None and termination_object.cable != self.instance:
raise forms.ValidationError(
_("Side {side_upper}: {device} {termination_object} is already connected").format(
side_upper=side.upper(), device=device, termination_object=termination_object
)
)
except ObjectDoesNotExist:
parent_ct = ContentType.objects.get(app_label=parent_app_label.lower(), model=parent_model_name.lower())
parent_model: Device | PowerPanel | Circuit = parent_ct.model_class()
except ContentType.DoesNotExist: # pragma: no cover
# This should never happen
raise AssertionError(f'Unknown cable termination parent content type: {parent_content_type}')
# Build query for parent lookup
parent_query = {parent_accessor: parent_value}
# Add site to query if provided
if site:
parent_query['site'] = site
# Look up the parent object
try:
parent_object = parent_model.objects.get(**parent_query)
except parent_model.DoesNotExist:
raise forms.ValidationError(
_("{side_upper} side termination not found: {device} {name}").format(
side_upper=side.upper(), device=device, name=name
_('Side {side_upper}: {model_name} not found: {value}').format(
side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value
)
)
except parent_model.MultipleObjectsReturned:
raise forms.ValidationError(
_('Side {side_upper}: Multiple {model_name} objects found: {value}').format(
side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value
)
)
# Get the termination model class
termination_model = content_type.model_class()
# Build the query to find the termination object
field_mapping = self.TERMINATION_FIELDS.get(parent_model.__name__)
if not field_mapping: # pragma: no cover
return None
parent_field, name_field, value_transform = field_mapping
query = {parent_field: parent_object}
if value_transform:
name = value_transform(name)
if name:
query[name_field] = name
# Add site to query if provided (for site-scoped parents)
if site and parent_field in ('device', 'power_panel'):
query[f'{parent_field}__site'] = site
# Look up the termination object
try:
# Handle virtual chassis for device-based terminations
if (parent_field == 'device' and
parent_object.virtual_chassis and
parent_object.virtual_chassis.master == parent_object and
termination_model.objects.filter(**query).count() == 0):
query[f'{parent_field}__in'] = parent_object.virtual_chassis.members.all()
query.pop(parent_field, None)
termination_object = termination_model.objects.get(**query)
else:
termination_object = termination_model.objects.get(**query)
# Check if already connected to a cable
if termination_object.cable is not None and termination_object.cable != self.instance:
raise forms.ValidationError(
_('Side {side_upper}: {parent} {termination} is already connected').format(
side_upper=side.upper(), parent=parent_object, termination=termination_object
)
)
# Circuit terminations can also be connected to provider networks
if (name_field == 'term_side' and
hasattr(termination_object, '_provider_network') and
termination_object._provider_network is not None):
raise forms.ValidationError(
_('Side {side_upper}: {parent} {termination} is already connected to a provider network').format(
side_upper=side.upper(), parent=parent_object, termination=termination_object
)
)
except termination_model.DoesNotExist:
raise forms.ValidationError(
_('Side {side_upper}: {model_name} not found: {parent} {name}').format(
side_upper=side.upper(),
model_name=termination_model.__name__,
parent=parent_object, name=name or '',
),
)
except termination_model.MultipleObjectsReturned: # pragma: no cover
# This should never happen
raise AssertionError('Multiple termination objects returned for query: {query}'.format(query=query))
setattr(self.instance, f'{side}_terminations', [termination_object])
return termination_object