make model choice fields work inside csv cable import

This commit is contained in:
hellerve 2021-04-12 11:41:35 +02:00
parent 0e8fb136c3
commit 3c13f81c83
4 changed files with 75 additions and 33 deletions

View File

@ -23,6 +23,7 @@ from tenancy.models import Tenant, TenantGroup
from utilities.forms import (
APISelect, APISelectMultiple, add_blank_choice, BootstrapMixin, BulkEditForm, BulkEditNullBooleanSelect,
ColorSelect, CommentField, CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, CSVModelForm,
DeferredCSVModelChoiceField,
DynamicModelChoiceField, DynamicModelMultipleChoiceField, ExpandableNameField, form_from_model, JSONField,
NumericArrayField, SelectWithPK, SmallTextarea, SlugField, StaticSelect2, StaticSelect2Multiple, TagFilterField,
BOOLEAN_WITH_BLANK_CHOICES,
@ -3824,15 +3825,19 @@ class CableCSVForm(CustomFieldModelCSVForm):
limit_choices_to=CABLE_TERMINATION_PARENT_MODELS,
help_text='Side A parent type'
)
side_a_parent_id = forms.IntegerField(
help_text='Side A parent ID'
side_a_parent = DeferredCSVModelChoiceField(
queryset=Device.objects.none(), # this is a standin
to_field_name='name',
help_text='Side A parent name'
)
side_a_type = CSVContentTypeField(
queryset=ContentType.objects.all(),
limit_choices_to=CABLE_TERMINATION_MODELS,
help_text='Side A type'
)
side_a_name = forms.CharField(
side_a = DeferredCSVModelChoiceField(
queryset=Interface.objects.none(), # this is a standin
to_field_name='name',
help_text='Side A component name'
)
@ -3842,15 +3847,19 @@ class CableCSVForm(CustomFieldModelCSVForm):
limit_choices_to=CABLE_TERMINATION_PARENT_MODELS,
help_text='Side B parent type'
)
side_b_parent_id = forms.IntegerField(
help_text='Side B parent ID'
side_b_parent = DeferredCSVModelChoiceField(
queryset=Device.objects.none(), # this is a standin
to_field_name='name',
help_text='Side B parent name'
)
side_b_type = CSVContentTypeField(
queryset=ContentType.objects.all(),
limit_choices_to=CABLE_TERMINATION_MODELS,
help_text='Side B type'
)
side_b_name = forms.CharField(
side_b = DeferredCSVModelChoiceField(
queryset=Interface.objects.none(), # this is a standin
to_field_name='name',
help_text='Side B component name'
)
@ -3874,24 +3883,14 @@ class CableCSVForm(CustomFieldModelCSVForm):
class Meta:
model = Cable
fields = [
'side_a_parent_type', 'side_a_parent_id', 'side_a_type', 'side_a_name',
'side_b_parent_type', 'side_b_parent_id', 'side_b_type', 'side_b_name',
'side_a_parent_type', 'side_a_parent', 'side_a_type', 'side_a',
'side_b_parent_type', 'side_b_parent', 'side_b_type', 'side_b',
'type', 'status', 'label', 'color', 'length', 'length_unit',
]
help_texts = {
'color': mark_safe('RGB color in hexadecimal (e.g. <code>00ff00</code>)'),
}
def _get_parent(self, side):
parent_type = self.cleaned_data.get(f'side_{side}_parent_type')
parent_id = self.cleaned_data.get(f'side_{side}_parent_id')
if not parent_type or not parent_id:
return None
model = parent_type.model_class()
return model.objects.get(pk=parent_id)
def _translate_model(self, parent_type):
# TODO: maybe we went overboard with making this generic/extensible?
return {
@ -3907,19 +3906,19 @@ class CableCSVForm(CustomFieldModelCSVForm):
assert side in 'ab', f"Invalid side designation: {side}"
parent_type = self.cleaned_data.get(f'side_{side}_parent_type')
parent = self._get_parent(side)
parent = self.cleaned_data.get(f'side_{side}_parent')
content_type = self.cleaned_data.get(f'side_{side}_type')
name = self.cleaned_data.get(f'side_{side}_name')
if not parent or not content_type or not name:
if not parent or not content_type:
return None
model = content_type.model_class()
field = self.fields[f'side_{side}']
try:
# the parent is named like the model, we utilize that fact for the query
termination_object = model.objects.get(**{
"name" if hasattr(model, "name") else "pk": name,
termination_filter = model.objects.filter(**{
self._translate_model(parent_type): parent
})
termination_object = field.get_value(termination_filter)
if termination_object.cable is not None:
raise forms.ValidationError(f"Side {side.upper()}: {parent} {termination_object} is already connected")
except ObjectDoesNotExist:
@ -3928,12 +3927,31 @@ class CableCSVForm(CustomFieldModelCSVForm):
setattr(self.instance, f'termination_{side}', termination_object)
return termination_object
def clean_side_a_name(self):
def clean_side_a(self):
return self._clean_side('a')
def clean_side_b_name(self):
def clean_side_b(self):
return self._clean_side('b')
def _clean_side_parent(self, side):
"""
Derive a Cable's A/B termination parent.
:param side: 'a' or 'b'
"""
assert side in 'ab', f"Invalid side designation: {side}"
parent_type = self.cleaned_data.get(f'side_{side}_parent_type')
field = self.fields[f'side_{side}_parent']
return field.get_value(parent_type.model_class().objects.all())
def clean_side_a_parent(self):
return self._clean_side_parent('a')
def clean_side_b_parent(self):
return self._clean_side_parent('b')
def clean_length_unit(self):
# Avoid trying to save as NULL
length_unit = self.cleaned_data.get('length_unit', None)

View File

@ -112,8 +112,8 @@ class Cable(ChangeLoggedModel, CustomFieldModel):
objects = RestrictedQuerySet.as_manager()
csv_headers = [
'side_a_parent_type', 'side_a_parent_id', 'side_a_type', 'side_a_name',
'side_b_parent_type', 'side_b_parent_id', 'side_b_type', 'side_b_name',
'side_a_parent_type', 'side_a_parent.id', 'side_a_type', 'side_a.id',
'side_b_parent_type', 'side_b_parent.id', 'side_b_type', 'side_b.id',
'type', 'status', 'label', 'color', 'length', 'length_unit',
]
@ -285,11 +285,11 @@ class Cable(ChangeLoggedModel, CustomFieldModel):
'{}.{}'.format(self.termination_a.parent._meta.app_label, self.termination_a.parent._meta.model_name),
self.termination_a.parent.id,
'{}.{}'.format(self.termination_a_type.app_label, self.termination_a_type.model),
self.termination_a.name if hasattr(self.termination_a, "name") else self.termination_a_id,
self.termination_a_id,
'{}.{}'.format(self.termination_b.parent._meta.app_label, self.termination_b.parent._meta.model_name),
self.termination_b.parent.id,
'{}.{}'.format(self.termination_b_type.app_label, self.termination_b_type.model),
self.termination_b.name if hasattr(self.termination_b, "name") else self.termination_b_id,
self.termination_b_id,
self.type,
self.status,
self.label,

View File

@ -1676,10 +1676,10 @@ class CableTestCase(
}
cls.csv_data = (
"side_a_parent_type,side_a_parent_id,side_a_type,side_a_name,side_b_parent_type,side_b_parent_id,side_b_type,side_b_name",
f"dcim.device,{devices[2].pk},dcim.interface,Interface 1,dcim.device,{devices[3].pk},dcim.interface,Interface 1",
f"dcim.device,{devices[2].pk},dcim.interface,Interface 2,dcim.device,{devices[3].pk},dcim.interface,Interface 2",
f"dcim.device,{devices[2].pk},dcim.interface,Interface 3,dcim.device,{devices[3].pk},dcim.interface,Interface 3",
"side_a_parent_type,side_a_parent,side_a_type,side_a,side_b_parent_type,side_b_parent.id,side_b_type,side_b",
f"dcim.device,Device 3,dcim.interface,Interface 1,dcim.device,{devices[3].pk},dcim.interface,Interface 1",
f"dcim.device,Device 3,dcim.interface,Interface 2,dcim.device,{devices[3].pk},dcim.interface,Interface 2",
f"dcim.device,Device 3,dcim.interface,Interface 3,dcim.device,{devices[3].pk},dcim.interface,Interface 3",
)
cls.bulk_edit_data = {

View File

@ -24,6 +24,7 @@ __all__ = (
'CSVContentTypeField',
'CSVDataField',
'CSVModelChoiceField',
'DeferredCSVModelChoiceField',
'DynamicModelChoiceField',
'DynamicModelMultipleChoiceField',
'ExpandableIPAddressField',
@ -142,6 +143,29 @@ class CSVModelChoiceField(forms.ModelChoiceField):
)
class DeferredCSVModelChoiceField(CSVModelChoiceField):
"""
Allows to defer querying from to_python
"""
default_error_messages = {
'invalid_choice': 'Object not found.',
}
def to_python(self, value):
self.value = value
return value
def get_value(self, qs):
self.queryset = qs
try:
return super().to_python(self.value)
except MultipleObjectsReturned:
raise forms.ValidationError(
f'"{self.value}" is not a unique value for this field; multiple objects were found'
)
class CSVContentTypeField(CSVModelChoiceField):
"""
Reference a ContentType in the form <app>.<model>