Adds replication and adoption for module import (#9498)

* Adds replication and adoption for module import

* Moves common Module form clean logic to new class

* Adds tests for replication and adoption for module import

* Fix test

Co-authored-by: jeremystretch <jstretch@ns1.com>
This commit is contained in:
sleepinggenius2 2022-12-13 11:33:09 -05:00 committed by GitHub
parent 9bb9ac3dec
commit b3693099dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 66 deletions

View File

@ -13,6 +13,7 @@ from tenancy.models import Tenant
from utilities.forms import CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, CSVTypedChoiceField, SlugField
from virtualization.models import Cluster
from wireless.choices import WirelessRoleChoices
from .common import ModuleCommonForm
__all__ = (
'CableCSVForm',
@ -407,7 +408,7 @@ class DeviceCSVForm(BaseDeviceCSVForm):
self.fields['rack'].queryset = self.fields['rack'].queryset.filter(**params)
class ModuleCSVForm(NetBoxModelCSVForm):
class ModuleCSVForm(ModuleCommonForm, NetBoxModelCSVForm):
device = CSVModelChoiceField(
queryset=Device.objects.all(),
to_field_name='name'
@ -420,11 +421,20 @@ class ModuleCSVForm(NetBoxModelCSVForm):
queryset=ModuleType.objects.all(),
to_field_name='model'
)
replicate_components = forms.BooleanField(
required=False,
help_text="Automatically populate components associated with this module type (default: true)"
)
adopt_components = forms.BooleanField(
required=False,
help_text="Adopt already existing components"
)
class Meta:
model = Module
fields = (
'device', 'module_bay', 'module_type', 'serial', 'asset_tag', 'comments',
'device', 'module_bay', 'module_type', 'serial', 'asset_tag', 'replicate_components',
'adopt_components', 'comments',
)
def __init__(self, data=None, *args, **kwargs):
@ -435,6 +445,13 @@ class ModuleCSVForm(NetBoxModelCSVForm):
params = {f"device__{self.fields['device'].to_field_name}": data.get('device')}
self.fields['module_bay'].queryset = self.fields['module_bay'].queryset.filter(**params)
def clean_replicate_components(self):
# Make sure replicate_components is True when it's not included in the uploaded data
if 'replicate_components' not in self.data:
return True
else:
return self.cleaned_data['replicate_components']
class ChildDeviceCSVForm(BaseDeviceCSVForm):
parent = CSVModelChoiceField(

View File

@ -5,6 +5,7 @@ from dcim.constants import *
__all__ = (
'InterfaceCommonForm',
'ModuleCommonForm'
)
@ -47,3 +48,60 @@ class InterfaceCommonForm(forms.Form):
'tagged_vlans': f"The tagged VLANs ({', '.join(invalid_vlans)}) must belong to the same site as "
f"the interface's parent device/VM, or they must be global"
})
class ModuleCommonForm(forms.Form):
def clean(self):
super().clean()
replicate_components = self.cleaned_data.get("replicate_components")
adopt_components = self.cleaned_data.get("adopt_components")
device = self.cleaned_data['device']
module_type = self.cleaned_data['module_type']
module_bay = self.cleaned_data['module_bay']
if adopt_components:
self.instance._adopt_components = True
# Bail out if we are not installing a new module or if we are not replicating components
if self.instance.pk or not replicate_components:
self.instance._disable_replication = True
return
for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
("consoleserverporttemplates", "consoleserverports"),
("interfacetemplates", "interfaces"),
("powerporttemplates", "powerports"),
("poweroutlettemplates", "poweroutlets"),
("rearporttemplates", "rearports"),
("frontporttemplates", "frontports")
]:
# Prefetch installed components
installed_components = {
component.name: component for component in getattr(device, component_attribute).all()
}
# Get the templates for the module type.
for template in getattr(module_type, templates).all():
# Installing modules with placeholders require that the bay has a position value
if MODULE_TOKEN in template.name and not module_bay.position:
raise forms.ValidationError(
"Cannot install module with placeholder values in a module bay with no position defined"
)
resolved_name = template.name.replace(MODULE_TOKEN, module_bay.position)
existing_item = installed_components.get(resolved_name)
# It is not possible to adopt components already belonging to a module
if adopt_components and existing_item and existing_item.module:
raise forms.ValidationError(
f"Cannot adopt {template.component_model.__name__} '{resolved_name}' as it already belongs "
f"to a module"
)
# If we are not adopting components we error if the component exists
if not adopt_components and resolved_name in installed_components:
raise forms.ValidationError(
f"{template.component_model.__name__} - {resolved_name} already exists"
)

View File

@ -17,7 +17,7 @@ from utilities.forms import (
)
from virtualization.models import Cluster, ClusterGroup
from wireless.models import WirelessLAN, WirelessLANGroup
from .common import InterfaceCommonForm
from .common import InterfaceCommonForm, ModuleCommonForm
__all__ = (
'CableForm',
@ -657,7 +657,7 @@ class DeviceForm(TenancyForm, NetBoxModelForm):
self.fields['position'].widget.choices = [(position, f'U{position}')]
class ModuleForm(NetBoxModelForm):
class ModuleForm(ModuleCommonForm, NetBoxModelForm):
device = DynamicModelChoiceField(
queryset=Device.objects.all(),
initial_params={
@ -722,68 +722,6 @@ class ModuleForm(NetBoxModelForm):
self.fields['adopt_components'].initial = False
self.fields['adopt_components'].disabled = True
def save(self, *args, **kwargs):
# If replicate_components is False, disable automatic component replication on the instance
if self.instance.pk or not self.cleaned_data['replicate_components']:
self.instance._disable_replication = True
if self.cleaned_data['adopt_components']:
self.instance._adopt_components = True
return super().save(*args, **kwargs)
def clean(self):
super().clean()
replicate_components = self.cleaned_data.get("replicate_components")
adopt_components = self.cleaned_data.get("adopt_components")
device = self.cleaned_data['device']
module_type = self.cleaned_data['module_type']
module_bay = self.cleaned_data['module_bay']
# Bail out if we are not installing a new module or if we are not replicating components
if self.instance.pk or not replicate_components:
return
for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
("consoleserverporttemplates", "consoleserverports"),
("interfacetemplates", "interfaces"),
("powerporttemplates", "powerports"),
("poweroutlettemplates", "poweroutlets"),
("rearporttemplates", "rearports"),
("frontporttemplates", "frontports")
]:
# Prefetch installed components
installed_components = {
component.name: component for component in getattr(device, component_attribute).all()
}
# Get the templates for the module type.
for template in getattr(module_type, templates).all():
# Installing modules with placeholders require that the bay has a position value
if MODULE_TOKEN in template.name and not module_bay.position:
raise forms.ValidationError(
"Cannot install module with placeholder values in a module bay with no position defined"
)
resolved_name = template.name.replace(MODULE_TOKEN, module_bay.position)
existing_item = installed_components.get(resolved_name)
# It is not possible to adopt components already belonging to a module
if adopt_components and existing_item and existing_item.module:
raise forms.ValidationError(
f"Cannot adopt {template.component_model.__name__} '{resolved_name}' as it already belongs "
f"to a module"
)
# If we are not adopting components we error if the component exists
if not adopt_components and resolved_name in installed_components:
raise forms.ValidationError(
f"{template.component_model.__name__} - {resolved_name} already exists"
)
class CableForm(TenancyForm, NetBoxModelForm):

View File

@ -1848,6 +1848,53 @@ class ModuleTestCase(
self.assertHttpStatus(self.client.post(**request), 302)
self.assertEqual(Interface.objects.filter(device=device).count(), 5)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_module_bulk_replication(self):
self.add_permissions('dcim.add_module')
# Add 5 InterfaceTemplates to a ModuleType
module_type = ModuleType.objects.first()
interface_templates = [
InterfaceTemplate(module_type=module_type, name=f'Interface {i}') for i in range(1, 6)
]
InterfaceTemplate.objects.bulk_create(interface_templates)
form_data = self.form_data.copy()
device = Device.objects.get(pk=form_data['device'])
# Create a module *without* replicating components
module_bay = ModuleBay.objects.get(pk=form_data['module_bay'])
csv_data = [
"device,module_bay,module_type,replicate_components",
f"{device.name},{module_bay.name},{module_type.model},false"
]
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(csv_data),
}
}
initial_count = self._get_queryset().count()
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)
self.assertEqual(Interface.objects.filter(device=device).count(), 0)
# Create a second module (in the next bay) with replicated components
module_bay = ModuleBay.objects.get(pk=(form_data['module_bay'] + 1))
csv_data[1] = f"{device.name},{module_bay.name},{module_type.model},true"
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(csv_data),
}
}
initial_count = self._get_queryset().count()
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)
self.assertEqual(Interface.objects.filter(device=device).count(), 5)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_module_component_adoption(self):
self.add_permissions('dcim.add_module')
@ -1885,6 +1932,49 @@ class ModuleTestCase(
# Check that the Interface now has a module
self.assertIsNotNone(interface.module)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_module_bulk_adoption(self):
self.add_permissions('dcim.add_module')
interface_name = "Interface-1"
# Add an interface to the ModuleType
module_type = ModuleType.objects.first()
InterfaceTemplate(module_type=module_type, name=interface_name).save()
form_data = self.form_data.copy()
device = Device.objects.get(pk=form_data['device'])
# Create an interface to be adopted
interface = Interface(device=device, name=interface_name, type=InterfaceTypeChoices.TYPE_10GE_FIXED)
interface.save()
# Ensure that interface is created with no module
self.assertIsNone(interface.module)
# Create a module with adopted components
module_bay = ModuleBay.objects.get(device=device, name='Module Bay 4')
csv_data = [
"device,module_bay,module_type,replicate_components,adopt_components",
f"{device.name},{module_bay.name},{module_type.model},false,true"
]
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(csv_data),
}
}
initial_count = self._get_queryset().count()
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self._get_queryset().count(), initial_count + len(csv_data) - 1)
# Re-retrieve interface to get new module id
interface.refresh_from_db()
# Check that the Interface now has a module
self.assertIsNotNone(interface.module)
class ConsolePortTestCase(ViewTestCases.DeviceComponentViewTestCase):
model = ConsolePort