4347 initial code for json import

This commit is contained in:
Arthur 2022-09-13 16:43:58 -07:00 committed by jeremystretch
parent 0ce7f84ec1
commit ead26d09d3
8 changed files with 94 additions and 526 deletions

View File

@ -936,7 +936,7 @@ class DeviceTypeDeleteView(generic.ObjectDeleteView):
queryset = DeviceType.objects.all()
class DeviceTypeImportView(generic.ObjectImportView):
class DeviceTypeImportView(generic.BulkImportView):
additional_permissions = [
'dcim.add_devicetype',
'dcim.add_consoleporttemplate',
@ -952,6 +952,7 @@ class DeviceTypeImportView(generic.ObjectImportView):
]
queryset = DeviceType.objects.all()
model_form = forms.DeviceTypeImportForm
table = tables.DeviceTypeTable
related_object_forms = {
'console-ports': forms.ConsolePortTemplateImportForm,
'console-server-ports': forms.ConsoleServerPortTemplateImportForm,
@ -1069,7 +1070,7 @@ class ModuleTypeDeleteView(generic.ObjectDeleteView):
queryset = ModuleType.objects.all()
class ModuleTypeImportView(generic.ObjectImportView):
class ModuleTypeImportView(generic.BulkImportView):
additional_permissions = [
'dcim.add_moduletype',
'dcim.add_consoleporttemplate',
@ -1082,6 +1083,7 @@ class ModuleTypeImportView(generic.ObjectImportView):
]
queryset = ModuleType.objects.all()
model_form = forms.ModuleTypeImportForm
table = tables.ModuleTypeTable
related_object_forms = {
'console-ports': forms.ConsolePortTemplateImportForm,
'console-server-ports': forms.ConsoleServerPortTemplateImportForm,

View File

@ -887,7 +887,7 @@ class CustomFieldImportTest(TestCase):
)
csv_data = '\n'.join(','.join(row) for row in data)
response = self.client.post(reverse('dcim:site_import'), {'csv': csv_data})
response = self.client.post(reverse('dcim:site_import'), {'data': csv_data, 'format': 'csv'})
self.assertEqual(response.status_code, 200)
self.assertEqual(Site.objects.count(), 3)

View File

@ -26,6 +26,7 @@ from utilities.forms import (
from utilities.htmx import is_htmx
from utilities.permissions import get_permission_for_model
from utilities.views import GetReturnURLMixin
from utilities.forms.choices import ImportFormatChoices
from .base import BaseMultiObjectView
from .mixins import ActionsMixin, TableMixin
from .utils import get_prerequisite_model
@ -288,134 +289,6 @@ class BulkCreateView(GetReturnURLMixin, BaseMultiObjectView):
})
class OldBulkImportView(GetReturnURLMixin, BaseMultiObjectView):
"""
Import objects in bulk (CSV format).
Attributes:
model_form: The form used to create each imported object
"""
template_name = 'generic/bulk_import.html'
model_form = None
def _import_form(self, *args, **kwargs):
class ImportForm(BootstrapMixin, Form):
csv = CSVDataField(
from_form=self.model_form
)
csv_file = CSVFileField(
label="CSV file",
from_form=self.model_form,
required=False
)
def clean(self):
csv_rows = self.cleaned_data['csv'][1] if 'csv' in self.cleaned_data else None
csv_file = self.files.get('csv_file')
# Check that the user has not submitted both text data and a file
if csv_rows and csv_file:
raise ValidationError(
"Cannot process CSV text and file attachment simultaneously. Please choose only one import "
"method."
)
return ImportForm(*args, **kwargs)
def _create_objects(self, form, request):
new_objs = []
if request.FILES:
headers, records = form.cleaned_data['csv_file']
else:
headers, records = form.cleaned_data['csv']
for row, data in enumerate(records, start=1):
obj_form = self.model_form(data, headers=headers)
restrict_form_fields(obj_form, request.user)
if obj_form.is_valid():
obj = self._save_obj(obj_form, request)
new_objs.append(obj)
else:
for field, err in obj_form.errors.items():
form.add_error('csv', f'Row {row} {field}: {err[0]}')
raise ValidationError("")
return new_objs
def _save_obj(self, obj_form, request):
"""
Provide a hook to modify the object immediately before saving it (e.g. to encrypt secret data).
"""
return obj_form.save()
def get_required_permission(self):
return get_permission_for_model(self.queryset.model, 'add')
#
# Request handlers
#
def get(self, request):
return render(request, self.template_name, {
'model': self.model_form._meta.model,
'form': self._import_form(),
'fields': self.model_form().fields,
'return_url': self.get_return_url(request),
**self.get_extra_context(request),
})
def post(self, request):
logger = logging.getLogger('netbox.views.BulkImportView')
form = self._import_form(request.POST, request.FILES)
if form.is_valid():
logger.debug("Form validation was successful")
try:
# Iterate through CSV data and bind each row to a new model form instance.
with transaction.atomic():
new_objs = self._create_objects(form, request)
# Enforce object-level permissions
if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs):
raise PermissionsViolation
# Compile a table containing the imported objects
obj_table = self.table(new_objs)
if new_objs:
msg = 'Imported {} {}'.format(len(new_objs), new_objs[0]._meta.verbose_name_plural)
logger.info(msg)
messages.success(request, msg)
return render(request, "import_success.html", {
'table': obj_table,
'return_url': self.get_return_url(request),
})
except ValidationError:
clear_webhooks.send(sender=self)
except (AbortRequest, PermissionsViolation) as e:
logger.debug(e.message)
form.add_error(None, e.message)
clear_webhooks.send(sender=self)
else:
logger.debug("Form validation failed")
return render(request, self.template_name, {
'model': self.model_form._meta.model,
'form': form,
'fields': self.model_form().fields,
'return_url': self.get_return_url(request),
**self.get_extra_context(request),
})
class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
"""
Import objects in bulk (CSV format).
@ -425,19 +298,14 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
"""
template_name = 'generic/bulk_import.html'
model_form = None
related_object_forms = dict()
'''
supported_formats = [
{
'name': 'CSV',
'help_text': 'Enter the list of column headers followed by one line per record to be imported, using ' \
'commas to separate values. Multi-line data and values containing commas may be wrapped ' \
'in double quotes.'
},
{'name': 'JSON', },
{'name': 'YAML', },
]
'''
def prep_related_object_data(self, parent, data):
"""
Hook to modify the data for related objects before it's passed to the related object form (for example, to
assign a parent object).
"""
return data
def _create_object(self, request, model_form):
@ -478,16 +346,16 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
return obj
def _create_objects(self, form, request):
def _create_objects(self, form, format, data, request):
new_objs = []
for row_num, record in enumerate(data['data'], start=1):
if format == 'csv':
model_form = self.model_form(record, headers=headers)
if format == ImportFormatChoices.CSV:
model_form = self.model_form(record, headers=data['headers'])
else:
model_form = self.model_form(record)
restrict_form_fields(model_form, request.user)
if format == 'json' or format == 'yaml':
if format == ImportFormatChoices.JSON or format == ImportFormatChoices.YAML:
# Assign default values for any fields which were not specified.
# We have to do this manually because passing 'initial=' to the form
# on initialization merely sets default values for the widgets.
@ -505,8 +373,8 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
# Replicate model form errors for display
for field, errors in model_form.errors.items():
for err in errors:
if format == 'csv':
form.add_error('csv', f'Row {row} {field}: {err[0]}')
if format == ImportFormatChoices.CSV:
form.add_error(None, f'Row {row_num} {field}: {err}')
else:
if field == '__all__':
form.add_error(None, err)
@ -530,10 +398,13 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
# Request handlers
#
def get_context(self, request, data_form, file_form):
def get_context(self, request, data_form, file_form, form=None):
# small hack - need to return 'form' set to either the file or data form
# as the bulk_import base view relies on it for error reporting.
return {
'model': self.model_form._meta.model,
'data_form': data_form,
'form': form,
'file_form': file_form,
'fields': self.model_form().fields,
'return_url': self.get_return_url(request),
@ -541,34 +412,36 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
}
def get(self, request):
data_form = ImportForm()
file_form = FileUploadImportForm()
data_form = ImportForm(related=self.related_object_forms)
file_form = FileUploadImportForm(related=self.related_object_forms)
return render(request, self.template_name, self.get_context(request, data_form, file_form))
def post(self, request):
logger = logging.getLogger('netbox.views.BulkImportView')
data_form = ImportForm(request.POST)
file_form = FileUploadImportForm(request.POST, request.FILES)
data_form = ImportForm(request.POST, related=self.related_object_forms)
file_form = FileUploadImportForm(request.POST, request.FILES, related=self.related_object_forms)
data = None
if 'data_submit' in request.POST:
if data_form.is_valid():
logger.debug("Data Import form validation was successful")
data = data_form.cleaned_data
elif 'file_submit' in request.POST:
form = None
if 'file_submit' in request.POST:
form = file_form
if file_form.is_valid():
logger.debug("File Import form validation was successful")
data = file_form.cleaned_data
else: # data_submit
form = data_form
if data_form.is_valid():
logger.debug("Data Import form validation was successful")
data = data_form.cleaned_data
if data:
format = data['format']
headers = data['headers'] if format == 'csv' else None
try:
# Iterate through data and bind each row to a new model form instance.
with transaction.atomic():
new_objs = self._create_objects(form, request)
new_objs = self._create_objects(form, format, data, request)
# Enforce object-level permissions
if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs):
@ -598,7 +471,7 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
else:
logger.debug("Form validation failed")
return render(request, self.template_name, self.get_context(request, data_form, file_form))
return render(request, self.template_name, self.get_context(request, data_form, file_form, form))
class BulkEditView(GetReturnURLMixin, BaseMultiObjectView):

View File

@ -28,7 +28,6 @@ __all__ = (
'ObjectChildrenView',
'ObjectDeleteView',
'ObjectEditView',
'ObjectImportView',
'ObjectView',
)
@ -145,149 +144,6 @@ class ObjectChildrenView(ObjectView, ActionsMixin, TableMixin):
})
class ObjectImportView(GetReturnURLMixin, BaseObjectView):
"""
Import a single object (YAML or JSON format).
Attributes:
model_form: The ModelForm used to create individual objects
related_object_forms: A dictionary mapping of forms to be used for the creation of related (child) objects
"""
template_name = 'generic/object_import.html'
model_form = None
related_object_forms = dict()
def get_required_permission(self):
return get_permission_for_model(self.queryset.model, 'add')
def prep_related_object_data(self, parent, data):
"""
Hook to modify the data for related objects before it's passed to the related object form (for example, to
assign a parent object).
"""
return data
def _create_object(self, model_form):
# Save the primary object
obj = model_form.save()
# Enforce object-level permissions
if not self.queryset.filter(pk=obj.pk).first():
raise PermissionsViolation()
# Iterate through the related object forms (if any), validating and saving each instance.
for field_name, related_object_form in self.related_object_forms.items():
related_obj_pks = []
for i, rel_obj_data in enumerate(model_form.data.get(field_name, list())):
rel_obj_data = self.prep_related_object_data(obj, rel_obj_data)
f = related_object_form(rel_obj_data)
for subfield_name, field in f.fields.items():
if subfield_name not in rel_obj_data and hasattr(field, 'initial'):
f.data[subfield_name] = field.initial
if f.is_valid():
related_obj = f.save()
related_obj_pks.append(related_obj.pk)
else:
# Replicate errors on the related object form to the primary form for display
for subfield_name, errors in f.errors.items():
for err in errors:
err_msg = "{}[{}] {}: {}".format(field_name, i, subfield_name, err)
model_form.add_error(None, err_msg)
raise AbortTransaction()
# Enforce object-level permissions on related objects
model = related_object_form.Meta.model
if model.objects.filter(pk__in=related_obj_pks).count() != len(related_obj_pks):
raise ObjectDoesNotExist
return obj
#
# Request handlers
#
def get(self, request):
form = ImportForm()
model = self.queryset.model
return render(request, self.template_name, {
'model': model,
'form': form,
'obj_type': self.queryset.model._meta.verbose_name,
'return_url': self.get_return_url(request),
})
def post(self, request):
logger = logging.getLogger('netbox.views.ObjectImportView')
form = ImportForm(request.POST)
if form.is_valid():
logger.debug("Import form validation was successful")
# Initialize model form
data = form.cleaned_data['data']
model_form = self.model_form(data)
restrict_form_fields(model_form, request.user)
# Assign default values for any fields which were not specified. We have to do this manually because passing
# 'initial=' to the form on initialization merely sets default values for the widgets. Since widgets are not
# used for YAML/JSON import, we first bind the imported data normally, then update the form's data with the
# applicable field defaults as needed prior to form validation.
for field_name, field in model_form.fields.items():
if field_name not in data and hasattr(field, 'initial'):
model_form.data[field_name] = field.initial
if model_form.is_valid():
try:
with transaction.atomic():
obj = self._create_object(model_form)
except AbortTransaction:
clear_webhooks.send(sender=self)
except (AbortRequest, PermissionsViolation) as e:
logger.debug(e.message)
form.add_error(None, e.message)
clear_webhooks.send(sender=self)
if not model_form.errors:
logger.info(f"Import object {obj} (PK: {obj.pk})")
msg = f'Imported object: <a href="{obj.get_absolute_url()}">{obj}</a>'
messages.success(request, mark_safe(msg))
if '_addanother' in request.POST:
return redirect(request.get_full_path())
self.get_return_url(request, obj)
return redirect(self.get_return_url(request, obj))
else:
logger.debug("Model form validation failed")
# Replicate model form errors for display
for field, errors in model_form.errors.items():
for err in errors:
if field == '__all__':
form.add_error(None, err)
else:
form.add_error(None, "{}: {}".format(field, err))
else:
logger.debug("Import form validation failed")
return render(request, self.template_name, {
'model': model,
'form': form,
'obj_type': self.queryset.model._meta.verbose_name,
'return_url': self.get_return_url(request),
})
class ObjectEditView(GetReturnURLMixin, BaseObjectView):
"""
Create or edit a single object.

View File

@ -170,152 +170,3 @@ Context:
</div>
{% endblock content-wrapper %}
{% comment %}
{% block tabs %}
<ul class="nav nav-tabs px-3">
<li class="nav-item" role="presentation">
<a class ="nav-link active" href="#">Bulk Import</a>
</li>
</ul>
{% endblock tabs %}
{% block content-wrapper %}
<div class="tab-content">
{% block content %}
<div class="row">
<div class="col col-md-12 col-lg-10 offset-lg-1">
<ul class="nav nav-pills px-3" role="tablist">
<li class="nav-item" role="presentation">
<button class="nav-link active" role="tab" type="button" data-bs-target="#csv" data-bs-toggle="tab">CSV Data</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" role="tab" type="button" data-bs-target="#csv-file" data-bs-toggle="tab">CSV File Upload</button>
</li>
</ul>
<form action="" method="post" enctype="multipart/form-data" class="form">
{% csrf_token %}
<div class="tab-content border-0">
<div role="tabpanel" class="tab-pane active" id="csv">
{% render_field form.csv %}
</div>
<div role="tabpanel" class="tab-pane" id="csv-file">
{% render_field form.csv_file %}
</div>
</div>
<div class="form-group">
<div class="col col-md-12 text-end">
<button type="submit" class="btn btn-primary">Submit</button>
{% if return_url %}
<a href="{{ return_url }}" class="btn btn-outline-danger">Cancel</a>
{% endif %}
</div>
</div>
</form>
{% if fields %}
<div class="row my-3">
<div class="col col-md-12">
<div class="card">
<h5 class="card-header">
CSV Field Options
</h5>
<div class="card-body">
<table class="table">
<tr>
<th>Field</th>
<th>Required</th>
<th>Accessor</th>
<th>Description</th>
</tr>
{% for name, field in fields.items %}
<tr>
<td>
<code>{{ name }}</code>
</td>
<td>
{% if field.required %}
{% checkmark True true="Required" %}
{% else %}
{{ ''|placeholder }}
{% endif %}
</td>
<td>
{% if field.to_field_name %}
<code>{{ field.to_field_name }}</code>
{% else %}
{{ ''|placeholder }}
{% endif %}
</td>
<td>
{% if field.STATIC_CHOICES %}
<button type="button" class="btn btn-link btn-sm float-end" data-bs-toggle="modal" data-bs-target="#{{ name }}_choices">
<i class="mdi mdi-help-circle"></i>
</button>
<div class="modal fade" id="{{ name }}_choices" tabindex="-1" role="dialog">
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title"><code>{{ name }}</code> Choices</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body">
<table class="table table-striped">
<tr>
<th>Import Value</th>
<th>Label</th>
</tr>
{% for value, label in field.choices %}
{% if value %}
<tr>
<td>
<samp>{{ value }}</samp>
</td>
<td>
{{ label }}
</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
</div>
</div>
</div>
{% endif %}
{% if field.help_text %}
{{ field.help_text }}<br />
{% elif field.label %}
{{ field.label }}<br />
{% endif %}
{% if field|widget_type == 'dateinput' %}
<small class="text-muted">Format: YYYY-MM-DD</small>
{% elif field|widget_type == 'checkboxinput' %}
<small class="text-muted">Specify "true" or "false"</small>
{% endif %}
</td>
</tr>
{% endfor %}
</table>
</div>
</div>
</div>
</div>
<p class="small text-muted">
<i class="mdi mdi-check-bold text-success"></i> Required fields <strong>must</strong> be specified for all
objects.
</p>
<p class="small text-muted">
<i class="mdi mdi-information-outline"></i> Related objects may be referenced by any unique attribute.
For example, <code>vrf.rd</code> would identify a VRF by its route distinguisher.
</p>
{% endif %}
</div>
</div>
{% endblock content %}
</div>
{% endblock content-wrapper %}
{% endcomment %}

View File

@ -0,0 +1,27 @@
from utilities.choices import ChoiceSet
#
# Import Choices
#
class ImportFormatChoices(ChoiceSet):
CSV = 'csv'
JSON = 'json'
YAML = 'yaml'
CHOICES = [
(CSV, 'CSV'),
(JSON, 'JSON'),
(YAML, 'YAML'),
]
class ImportFormatChoicesRelated(ChoiceSet):
JSON = 'json'
YAML = 'yaml'
CHOICES = [
(JSON, 'JSON'),
(YAML, 'YAML'),
]

View File

@ -1,3 +1,4 @@
import csv
import json
import re
from io import StringIO
@ -6,6 +7,7 @@ import yaml
from django import forms
from utilities.forms.utils import parse_csv, validate_csv
from .choices import ImportFormatChoices, ImportFormatChoicesRelated
from .widgets import APISelect, APISelectMultiple, ClearableFileInput, StaticSelect
__all__ = (
@ -133,95 +135,46 @@ class CSVModelForm(forms.ModelForm):
self.fields[field].to_field_name = to_field
class OldImportForm(BootstrapMixin, forms.Form):
"""
Generic form for creating an object from JSON/YAML data
"""
data = forms.CharField(
widget=forms.Textarea(attrs={'class': 'font-monospace'}),
help_text="Enter object data in JSON or YAML format. Note: Only a single object/document is supported."
)
format = forms.ChoiceField(
choices=(
('json', 'JSON'),
('yaml', 'YAML')
),
initial='yaml'
)
def clean(self):
super().clean()
data = self.cleaned_data['data']
format = self.cleaned_data['format']
# Process JSON/YAML data
if format == 'json':
try:
self.cleaned_data['data'] = json.loads(data)
# Check for multiple JSON objects
if type(self.cleaned_data['data']) is not dict:
raise forms.ValidationError({
'data': "Import is limited to one object at a time."
})
except json.decoder.JSONDecodeError as err:
raise forms.ValidationError({
'data': "Invalid JSON data: {}".format(err)
})
else:
# Check for multiple YAML documents
if '\n---' in data:
raise forms.ValidationError({
'data': "Import is limited to one object at a time."
})
try:
self.cleaned_data['data'] = yaml.load(data, Loader=yaml.SafeLoader)
except yaml.error.YAMLError as err:
raise forms.ValidationError({
'data': "Invalid YAML data: {}".format(err)
})
class BaseImportForm(BootstrapMixin, forms.Form):
def __init__(self, *args, **kwargs):
related = kwargs.pop("related", False)
super().__init__(*args, **kwargs)
self.fields['format'].choices = self.get_supported_formats()
if related:
self.fields['format'].choices = ImportFormatChoicesRelated.CHOICES
self.fields['format'].initial = ImportFormatChoicesRelated.YAML
def get_supported_formats(self):
return (
('csv', 'CSV'),
('json', 'JSON'),
('yaml', 'YAML')
)
@property
def data_field(self):
return 'data'
def convert_data(self, data):
format = self.cleaned_data['format']
stream = StringIO(data.strip())
# Process data
if format == 'csv':
if format == ImportFormatChoices.CSV:
reader = csv.reader(stream)
headers, records = parse_csv(reader)
self.cleaned_data['data'] = records
self.cleaned_data['headers'] = headers
elif format == 'json':
elif format == ImportFormatChoices.JSON:
try:
self.cleaned_data['data'] = json.loads(data)
except json.decoder.JSONDecodeError as err:
raise forms.ValidationError({
'data': f"Invalid JSON data: {err}"
self.data_field: f"Invalid JSON data: {err}"
})
elif format == 'yaml':
elif format == ImportFormatChoices.YAML:
try:
self.cleaned_data['data'] = yaml.load_all(data, Loader=yaml.SafeLoader)
except yaml.error.YAMLError as err:
raise forms.ValidationError({
'data': f"Invalid YAML data: {err}"
self.data_field: f"Invalid YAML data: {err}"
})
else:
raise forms.ValidationError({
'data': f"Invalid file format: {format}"
self.data_field: f"Invalid file format: {format}"
})
@ -234,8 +187,8 @@ class ImportForm(BaseImportForm):
help_text="Enter object data in CSV, JSON or YAML format."
)
format = forms.ChoiceField(
choices=(),
initial='csv'
choices=ImportFormatChoices.CHOICES,
initial=ImportFormatChoices.CSV
)
def clean(self):
@ -253,19 +206,22 @@ class FileUploadImportForm(BaseImportForm):
label="data file",
required=False
)
format = forms.ChoiceField(
choices=(),
initial='csv'
choices=ImportFormatChoices.CHOICES,
initial=ImportFormatChoices.CSV
)
@property
def data_field(self):
return 'data_file'
def clean(self):
super().clean()
file = self.files.get('data_file')
data = file.read().decode('utf-8')
self.convert_data()
self.convert_data(data)
class FilterForm(BootstrapMixin, forms.Form):

View File

@ -548,7 +548,8 @@ class ViewTestCases:
def test_bulk_import_objects_without_permission(self):
data = {
'csv': self._get_csv_data(),
'data': self._get_csv_data(),
'format': 'csv',
}
# Test GET without permission
@ -564,7 +565,8 @@ class ViewTestCases:
def test_bulk_import_objects_with_permission(self):
initial_count = self._get_queryset().count()
data = {
'csv': self._get_csv_data(),
'data': self._get_csv_data(),
'format': 'csv',
}
# Assign model-level permission
@ -587,7 +589,8 @@ class ViewTestCases:
def test_bulk_import_objects_with_constrained_permission(self):
initial_count = self._get_queryset().count()
data = {
'csv': self._get_csv_data(),
'data': self._get_csv_data(),
'format': 'csv',
}
# Assign constrained permission