From b3693099dc106502d0a020b640eb58295e2fc806 Mon Sep 17 00:00:00 2001 From: sleepinggenius2 Date: Tue, 13 Dec 2022 11:33:09 -0500 Subject: [PATCH] Adds replication and adoption for module import (#9498) * Adds replication and adoption for module import * Moves common Module form clean logic to new class * Adds tests for replication and adoption for module import * Fix test Co-authored-by: jeremystretch --- netbox/dcim/forms/bulk_import.py | 21 +++++++- netbox/dcim/forms/common.py | 58 ++++++++++++++++++++ netbox/dcim/forms/models.py | 66 +---------------------- netbox/dcim/tests/test_views.py | 90 ++++++++++++++++++++++++++++++++ 4 files changed, 169 insertions(+), 66 deletions(-) diff --git a/netbox/dcim/forms/bulk_import.py b/netbox/dcim/forms/bulk_import.py index f0fd9bf86..5c6cbb0b0 100644 --- a/netbox/dcim/forms/bulk_import.py +++ b/netbox/dcim/forms/bulk_import.py @@ -13,6 +13,7 @@ from tenancy.models import Tenant from utilities.forms import CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, CSVTypedChoiceField, SlugField from virtualization.models import Cluster from wireless.choices import WirelessRoleChoices +from .common import ModuleCommonForm __all__ = ( 'CableCSVForm', @@ -407,7 +408,7 @@ class DeviceCSVForm(BaseDeviceCSVForm): self.fields['rack'].queryset = self.fields['rack'].queryset.filter(**params) -class ModuleCSVForm(NetBoxModelCSVForm): +class ModuleCSVForm(ModuleCommonForm, NetBoxModelCSVForm): device = CSVModelChoiceField( queryset=Device.objects.all(), to_field_name='name' @@ -420,11 +421,20 @@ class ModuleCSVForm(NetBoxModelCSVForm): queryset=ModuleType.objects.all(), to_field_name='model' ) + replicate_components = forms.BooleanField( + required=False, + help_text="Automatically populate components associated with this module type (default: true)" + ) + adopt_components = forms.BooleanField( + required=False, + help_text="Adopt already existing components" + ) class Meta: model = Module fields = ( - 'device', 'module_bay', 'module_type', 'serial', 'asset_tag', 'comments', + 'device', 'module_bay', 'module_type', 'serial', 'asset_tag', 'replicate_components', + 'adopt_components', 'comments', ) def __init__(self, data=None, *args, **kwargs): @@ -435,6 +445,13 @@ class ModuleCSVForm(NetBoxModelCSVForm): params = {f"device__{self.fields['device'].to_field_name}": data.get('device')} self.fields['module_bay'].queryset = self.fields['module_bay'].queryset.filter(**params) + def clean_replicate_components(self): + # Make sure replicate_components is True when it's not included in the uploaded data + if 'replicate_components' not in self.data: + return True + else: + return self.cleaned_data['replicate_components'] + class ChildDeviceCSVForm(BaseDeviceCSVForm): parent = CSVModelChoiceField( diff --git a/netbox/dcim/forms/common.py b/netbox/dcim/forms/common.py index f484b48e1..bfe09ab71 100644 --- a/netbox/dcim/forms/common.py +++ b/netbox/dcim/forms/common.py @@ -5,6 +5,7 @@ from dcim.constants import * __all__ = ( 'InterfaceCommonForm', + 'ModuleCommonForm' ) @@ -47,3 +48,60 @@ class InterfaceCommonForm(forms.Form): 'tagged_vlans': f"The tagged VLANs ({', '.join(invalid_vlans)}) must belong to the same site as " f"the interface's parent device/VM, or they must be global" }) + + +class ModuleCommonForm(forms.Form): + def clean(self): + super().clean() + + replicate_components = self.cleaned_data.get("replicate_components") + adopt_components = self.cleaned_data.get("adopt_components") + device = self.cleaned_data['device'] + module_type = self.cleaned_data['module_type'] + module_bay = self.cleaned_data['module_bay'] + + if adopt_components: + self.instance._adopt_components = True + + # Bail out if we are not installing a new module or if we are not replicating components + if self.instance.pk or not replicate_components: + self.instance._disable_replication = True + return + + for templates, component_attribute in [ + ("consoleporttemplates", "consoleports"), + ("consoleserverporttemplates", "consoleserverports"), + ("interfacetemplates", "interfaces"), + ("powerporttemplates", "powerports"), + ("poweroutlettemplates", "poweroutlets"), + ("rearporttemplates", "rearports"), + ("frontporttemplates", "frontports") + ]: + # Prefetch installed components + installed_components = { + component.name: component for component in getattr(device, component_attribute).all() + } + + # Get the templates for the module type. + for template in getattr(module_type, templates).all(): + # Installing modules with placeholders require that the bay has a position value + if MODULE_TOKEN in template.name and not module_bay.position: + raise forms.ValidationError( + "Cannot install module with placeholder values in a module bay with no position defined" + ) + + resolved_name = template.name.replace(MODULE_TOKEN, module_bay.position) + existing_item = installed_components.get(resolved_name) + + # It is not possible to adopt components already belonging to a module + if adopt_components and existing_item and existing_item.module: + raise forms.ValidationError( + f"Cannot adopt {template.component_model.__name__} '{resolved_name}' as it already belongs " + f"to a module" + ) + + # If we are not adopting components we error if the component exists + if not adopt_components and resolved_name in installed_components: + raise forms.ValidationError( + f"{template.component_model.__name__} - {resolved_name} already exists" + ) diff --git a/netbox/dcim/forms/models.py b/netbox/dcim/forms/models.py index 579851651..620806397 100644 --- a/netbox/dcim/forms/models.py +++ b/netbox/dcim/forms/models.py @@ -17,7 +17,7 @@ from utilities.forms import ( ) from virtualization.models import Cluster, ClusterGroup from wireless.models import WirelessLAN, WirelessLANGroup -from .common import InterfaceCommonForm +from .common import InterfaceCommonForm, ModuleCommonForm __all__ = ( 'CableForm', @@ -657,7 +657,7 @@ class DeviceForm(TenancyForm, NetBoxModelForm): self.fields['position'].widget.choices = [(position, f'U{position}')] -class ModuleForm(NetBoxModelForm): +class ModuleForm(ModuleCommonForm, NetBoxModelForm): device = DynamicModelChoiceField( queryset=Device.objects.all(), initial_params={ @@ -722,68 +722,6 @@ class ModuleForm(NetBoxModelForm): self.fields['adopt_components'].initial = False self.fields['adopt_components'].disabled = True - def save(self, *args, **kwargs): - - # If replicate_components is False, disable automatic component replication on the instance - if self.instance.pk or not self.cleaned_data['replicate_components']: - self.instance._disable_replication = True - - if self.cleaned_data['adopt_components']: - self.instance._adopt_components = True - - return super().save(*args, **kwargs) - - def clean(self): - super().clean() - - replicate_components = self.cleaned_data.get("replicate_components") - adopt_components = self.cleaned_data.get("adopt_components") - device = self.cleaned_data['device'] - module_type = self.cleaned_data['module_type'] - module_bay = self.cleaned_data['module_bay'] - - # Bail out if we are not installing a new module or if we are not replicating components - if self.instance.pk or not replicate_components: - return - - for templates, component_attribute in [ - ("consoleporttemplates", "consoleports"), - ("consoleserverporttemplates", "consoleserverports"), - ("interfacetemplates", "interfaces"), - ("powerporttemplates", "powerports"), - ("poweroutlettemplates", "poweroutlets"), - ("rearporttemplates", "rearports"), - ("frontporttemplates", "frontports") - ]: - # Prefetch installed components - installed_components = { - component.name: component for component in getattr(device, component_attribute).all() - } - - # Get the templates for the module type. - for template in getattr(module_type, templates).all(): - # Installing modules with placeholders require that the bay has a position value - if MODULE_TOKEN in template.name and not module_bay.position: - raise forms.ValidationError( - "Cannot install module with placeholder values in a module bay with no position defined" - ) - - resolved_name = template.name.replace(MODULE_TOKEN, module_bay.position) - existing_item = installed_components.get(resolved_name) - - # It is not possible to adopt components already belonging to a module - if adopt_components and existing_item and existing_item.module: - raise forms.ValidationError( - f"Cannot adopt {template.component_model.__name__} '{resolved_name}' as it already belongs " - f"to a module" - ) - - # If we are not adopting components we error if the component exists - if not adopt_components and resolved_name in installed_components: - raise forms.ValidationError( - f"{template.component_model.__name__} - {resolved_name} already exists" - ) - class CableForm(TenancyForm, NetBoxModelForm): diff --git a/netbox/dcim/tests/test_views.py b/netbox/dcim/tests/test_views.py index db3495521..16540af87 100644 --- a/netbox/dcim/tests/test_views.py +++ b/netbox/dcim/tests/test_views.py @@ -1848,6 +1848,53 @@ class ModuleTestCase( self.assertHttpStatus(self.client.post(**request), 302) self.assertEqual(Interface.objects.filter(device=device).count(), 5) + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_module_bulk_replication(self): + self.add_permissions('dcim.add_module') + + # Add 5 InterfaceTemplates to a ModuleType + module_type = ModuleType.objects.first() + interface_templates = [ + InterfaceTemplate(module_type=module_type, name=f'Interface {i}') for i in range(1, 6) + ] + InterfaceTemplate.objects.bulk_create(interface_templates) + + form_data = self.form_data.copy() + device = Device.objects.get(pk=form_data['device']) + + # Create a module *without* replicating components + module_bay = ModuleBay.objects.get(pk=form_data['module_bay']) + csv_data = [ + "device,module_bay,module_type,replicate_components", + f"{device.name},{module_bay.name},{module_type.model},false" + ] + request = { + 'path': self._get_url('import'), + 'data': { + 'csv': '\n'.join(csv_data), + } + } + + initial_count = self._get_queryset().count() + self.assertHttpStatus(self.client.post(**request), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1) + self.assertEqual(Interface.objects.filter(device=device).count(), 0) + + # Create a second module (in the next bay) with replicated components + module_bay = ModuleBay.objects.get(pk=(form_data['module_bay'] + 1)) + csv_data[1] = f"{device.name},{module_bay.name},{module_type.model},true" + request = { + 'path': self._get_url('import'), + 'data': { + 'csv': '\n'.join(csv_data), + } + } + + initial_count = self._get_queryset().count() + self.assertHttpStatus(self.client.post(**request), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1) + self.assertEqual(Interface.objects.filter(device=device).count(), 5) + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) def test_module_component_adoption(self): self.add_permissions('dcim.add_module') @@ -1885,6 +1932,49 @@ class ModuleTestCase( # Check that the Interface now has a module self.assertIsNotNone(interface.module) + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_module_bulk_adoption(self): + self.add_permissions('dcim.add_module') + + interface_name = "Interface-1" + + # Add an interface to the ModuleType + module_type = ModuleType.objects.first() + InterfaceTemplate(module_type=module_type, name=interface_name).save() + + form_data = self.form_data.copy() + device = Device.objects.get(pk=form_data['device']) + + # Create an interface to be adopted + interface = Interface(device=device, name=interface_name, type=InterfaceTypeChoices.TYPE_10GE_FIXED) + interface.save() + + # Ensure that interface is created with no module + self.assertIsNone(interface.module) + + # Create a module with adopted components + module_bay = ModuleBay.objects.get(device=device, name='Module Bay 4') + csv_data = [ + "device,module_bay,module_type,replicate_components,adopt_components", + f"{device.name},{module_bay.name},{module_type.model},false,true" + ] + request = { + 'path': self._get_url('import'), + 'data': { + 'csv': '\n'.join(csv_data), + } + } + + initial_count = self._get_queryset().count() + self.assertHttpStatus(self.client.post(**request), 200) + self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1) + + # Re-retrieve interface to get new module id + interface.refresh_from_db() + + # Check that the Interface now has a module + self.assertIsNotNone(interface.module) + class ConsolePortTestCase(ViewTestCases.DeviceComponentViewTestCase): model = ConsolePort