mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-24 17:38:37 -06:00
Enable the specifcation of related objects by arbitrary attribute during CSV import
This commit is contained in:
parent
6ab046ba8f
commit
34a17d4571
@ -405,10 +405,11 @@ class CSVDataField(forms.CharField):
|
|||||||
"""
|
"""
|
||||||
widget = forms.Textarea
|
widget = forms.Textarea
|
||||||
|
|
||||||
def __init__(self, fields, required_fields=[], *args, **kwargs):
|
def __init__(self, model, fields, required_fields=None, *args, **kwargs):
|
||||||
|
|
||||||
|
self.model = model
|
||||||
self.fields = fields
|
self.fields = fields
|
||||||
self.required_fields = required_fields
|
self.required_fields = required_fields or list()
|
||||||
|
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
@ -423,31 +424,49 @@ class CSVDataField(forms.CharField):
|
|||||||
'in double quotes.'
|
'in double quotes.'
|
||||||
|
|
||||||
def to_python(self, value):
|
def to_python(self, value):
|
||||||
|
|
||||||
records = []
|
records = []
|
||||||
reader = csv.reader(StringIO(value))
|
reader = csv.reader(StringIO(value))
|
||||||
|
|
||||||
# Consume and validate the first line of CSV data as column headers
|
# Consume the first line of CSV data as column headers. Create a dictionary mapping each header to an optional
|
||||||
headers = next(reader)
|
# "to" field specifying how the related object is being referenced. For example, importing a Device might use a
|
||||||
for f in self.required_fields:
|
# `site.slug` header, to indicate the related site is being referenced by its slug.
|
||||||
if f not in headers:
|
headers = {}
|
||||||
raise forms.ValidationError('Required column header "{}" not found.'.format(f))
|
for header in next(reader):
|
||||||
for f in headers:
|
if '.' in header:
|
||||||
if f not in self.fields:
|
field, to_field = header.split('.', 1)
|
||||||
raise forms.ValidationError('Unexpected column header "{}" found.'.format(f))
|
headers[field] = to_field
|
||||||
|
else:
|
||||||
|
headers[header] = None
|
||||||
|
|
||||||
# Parse CSV data
|
# Parse CSV data
|
||||||
for i, row in enumerate(reader, start=1):
|
for i, row in enumerate(reader, start=1):
|
||||||
if row:
|
if row:
|
||||||
if len(row) != len(headers):
|
if len(row) != len(headers):
|
||||||
raise forms.ValidationError(
|
raise forms.ValidationError(f"Row {i}: Expected {len(headers)} columns but found {len(row)}")
|
||||||
"Row {}: Expected {} columns but found {}".format(i, len(headers), len(row))
|
|
||||||
)
|
|
||||||
row = [col.strip() for col in row]
|
row = [col.strip() for col in row]
|
||||||
record = dict(zip(headers, row))
|
record = dict(zip(headers.keys(), row))
|
||||||
records.append(record)
|
records.append(record)
|
||||||
|
|
||||||
return records
|
return headers, records
|
||||||
|
|
||||||
|
def validate(self, value):
|
||||||
|
headers, records = value
|
||||||
|
|
||||||
|
# Validate provided column headers
|
||||||
|
for field, to_field in headers.items():
|
||||||
|
if field not in self.fields:
|
||||||
|
raise forms.ValidationError(f'Unexpected column header "{field}" found.')
|
||||||
|
if to_field and not hasattr(self.fields[field], 'to_field_name'):
|
||||||
|
raise forms.ValidationError(f'Column "{field}" is not a related object; cannot use dots')
|
||||||
|
if to_field and not hasattr(self.fields[field].queryset.model, to_field):
|
||||||
|
raise forms.ValidationError(f'Invalid related object attribute for column "{field}": {to_field}')
|
||||||
|
|
||||||
|
# Validate required fields
|
||||||
|
for f in self.required_fields:
|
||||||
|
if f not in headers:
|
||||||
|
raise forms.ValidationError(f'Required column header "{f}" not found.')
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
class CSVChoiceField(forms.ChoiceField):
|
class CSVChoiceField(forms.ChoiceField):
|
||||||
|
@ -557,11 +557,18 @@ class BulkImportView(GetReturnURLMixin, View):
|
|||||||
|
|
||||||
def _import_form(self, *args, **kwargs):
|
def _import_form(self, *args, **kwargs):
|
||||||
|
|
||||||
fields = self.model_form().fields.keys()
|
fields = self.model_form().fields
|
||||||
required_fields = [name for name, field in self.model_form().fields.items() if field.required]
|
required_fields = [
|
||||||
|
name for name, field in self.model_form().fields.items() if field.required
|
||||||
|
]
|
||||||
|
|
||||||
class ImportForm(BootstrapMixin, Form):
|
class ImportForm(BootstrapMixin, Form):
|
||||||
csv = CSVDataField(fields=fields, required_fields=required_fields, widget=Textarea(attrs=self.widget_attrs))
|
csv = CSVDataField(
|
||||||
|
model=self.model_form.Meta.model,
|
||||||
|
fields=fields,
|
||||||
|
required_fields=required_fields,
|
||||||
|
widget=Textarea(attrs=self.widget_attrs)
|
||||||
|
)
|
||||||
|
|
||||||
return ImportForm(*args, **kwargs)
|
return ImportForm(*args, **kwargs)
|
||||||
|
|
||||||
@ -591,8 +598,15 @@ class BulkImportView(GetReturnURLMixin, View):
|
|||||||
try:
|
try:
|
||||||
# Iterate through CSV data and bind each row to a new model form instance.
|
# Iterate through CSV data and bind each row to a new model form instance.
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
for row, data in enumerate(form.cleaned_data['csv'], start=1):
|
headers, records = form.cleaned_data['csv']
|
||||||
|
for row, data in enumerate(records, start=1):
|
||||||
obj_form = self.model_form(data)
|
obj_form = self.model_form(data)
|
||||||
|
|
||||||
|
# Modify the model form to accommodate any customized to_field_name properties
|
||||||
|
for field, to_field in headers.items():
|
||||||
|
if to_field is not None:
|
||||||
|
obj_form.fields[field].to_field_name = to_field
|
||||||
|
|
||||||
if obj_form.is_valid():
|
if obj_form.is_valid():
|
||||||
obj = self._save_obj(obj_form, request)
|
obj = self._save_obj(obj_form, request)
|
||||||
new_objs.append(obj)
|
new_objs.append(obj)
|
||||||
|
Loading…
Reference in New Issue
Block a user