Modify default CSV export to render from tables

This commit is contained in:
jeremystretch 2021-06-09 15:00:28 -04:00
parent 569041a4c4
commit 9d3cac43b7
3 changed files with 40 additions and 114 deletions

View File

@ -2530,23 +2530,6 @@ class ConsoleConnectionsListView(generic.ObjectListView):
table = tables.ConsoleConnectionTable table = tables.ConsoleConnectionTable
template_name = 'dcim/connections_list.html' template_name = 'dcim/connections_list.html'
def queryset_to_csv(self):
csv_data = [
# Headers
','.join(['console_server', 'port', 'device', 'console_port', 'reachable'])
]
for obj in self.queryset:
csv = csv_format([
obj._path.destination.device.identifier if obj._path.destination else None,
obj._path.destination.name if obj._path.destination else None,
obj.device.identifier,
obj.name,
obj._path.is_active
])
csv_data.append(csv)
return '\n'.join(csv_data)
def extra_context(self): def extra_context(self):
return { return {
'title': 'Console Connections' 'title': 'Console Connections'
@ -2560,23 +2543,6 @@ class PowerConnectionsListView(generic.ObjectListView):
table = tables.PowerConnectionTable table = tables.PowerConnectionTable
template_name = 'dcim/connections_list.html' template_name = 'dcim/connections_list.html'
def queryset_to_csv(self):
csv_data = [
# Headers
','.join(['pdu', 'outlet', 'device', 'power_port', 'reachable'])
]
for obj in self.queryset:
csv = csv_format([
obj._path.destination.device.identifier if obj._path.destination else None,
obj._path.destination.name if obj._path.destination else None,
obj.device.identifier,
obj.name,
obj._path.is_active
])
csv_data.append(csv)
return '\n'.join(csv_data)
def extra_context(self): def extra_context(self):
return { return {
'title': 'Power Connections' 'title': 'Power Connections'
@ -2594,25 +2560,6 @@ class InterfaceConnectionsListView(generic.ObjectListView):
table = tables.InterfaceConnectionTable table = tables.InterfaceConnectionTable
template_name = 'dcim/connections_list.html' template_name = 'dcim/connections_list.html'
def queryset_to_csv(self):
csv_data = [
# Headers
','.join([
'device_a', 'interface_a', 'device_b', 'interface_b', 'reachable'
])
]
for obj in self.queryset:
csv = csv_format([
obj._path.destination.device.identifier if obj._path.destination else None,
obj._path.destination.name if obj._path.destination else None,
obj.device.identifier,
obj.name,
obj._path.is_active
])
csv_data.append(csv)
return '\n'.join(csv_data)
def extra_context(self): def extra_context(self):
return { return {
'title': 'Interface Connections' 'title': 'Interface Connections'

View File

@ -92,7 +92,7 @@ class ObjectListView(ObjectPermissionRequiredMixin, View):
def get_required_permission(self): def get_required_permission(self):
return get_permission_for_model(self.queryset.model, 'view') return get_permission_for_model(self.queryset.model, 'view')
def queryset_to_yaml(self): def export_yaml(self):
""" """
Export the queryset of objects as concatenated YAML documents. Export the queryset of objects as concatenated YAML documents.
""" """
@ -100,34 +100,32 @@ class ObjectListView(ObjectPermissionRequiredMixin, View):
return '---\n'.join(yaml_data) return '---\n'.join(yaml_data)
def queryset_to_csv(self): def export_table(self, table, columns=None):
""" """
Export the queryset of objects as comma-separated value (CSV), using the model's to_csv() method. Export all table data in CSV format.
:param table: The Table instance to export
:param columns: A list of specific columns to include. If not specified, the default view
will be exported.
""" """
csv_data = [] exclude_columns = {'pk'}
custom_fields = [] if columns:
all_columns = [col_name for col_name, _ in table.selected_columns + table.available_columns]
# Start with the column headers exclude_columns.update({
headers = self.queryset.model.csv_headers.copy() col for col in all_columns if col not in columns
})
# Add custom field headers, if any else:
if hasattr(self.queryset.model, 'custom_field_data'): exclude_columns.update({
for custom_field in CustomField.objects.get_for_model(self.queryset.model): name for name, _ in table.available_columns
headers.append(custom_field.name) })
custom_fields.append(custom_field.name) exporter = TableExport(
export_format=TableExport.CSV,
csv_data.append(','.join(headers)) table=table,
exclude_columns=exclude_columns
# Iterate through the queryset appending each object )
for obj in self.queryset: return exporter.response(
data = obj.to_csv() filename=f'netbox_{self.queryset.model._meta.verbose_name_plural}.csv'
)
for custom_field in custom_fields:
data += (obj.cf.get(custom_field, ''),)
csv_data.append(csv_format(data))
return '\n'.join(csv_data)
def get(self, request): def get(self, request):
@ -137,7 +135,13 @@ class ObjectListView(ObjectPermissionRequiredMixin, View):
if self.filterset: if self.filterset:
self.queryset = self.filterset(request.GET, self.queryset).qs self.queryset = self.filterset(request.GET, self.queryset).qs
# Check for export rendering (except for table-based) # Compile a dictionary indicating which permissions are available to the current user for this model
permissions = {}
for action in ('add', 'change', 'delete', 'view'):
perm_name = get_permission_for_model(model, action)
permissions[action] = request.user.has_perm(perm_name)
# Export template/YAML rendering
if 'export' in request.GET and request.GET['export'] != 'table': if 'export' in request.GET and request.GET['export'] != 'table':
# An export template has been specified # An export template has been specified
@ -155,44 +159,21 @@ class ObjectListView(ObjectPermissionRequiredMixin, View):
# Check for YAML export support # Check for YAML export support
elif hasattr(model, 'to_yaml'): elif hasattr(model, 'to_yaml'):
response = HttpResponse(self.queryset_to_yaml(), content_type='text/yaml') response = HttpResponse(self.export_yaml(), content_type='text/yaml')
filename = 'netbox_{}.yaml'.format(self.queryset.model._meta.verbose_name_plural) filename = 'netbox_{}.yaml'.format(self.queryset.model._meta.verbose_name_plural)
response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename) response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename)
return response return response
# Fall back to built-in CSV formatting if export requested but no template specified
elif 'export' in request.GET and hasattr(model, 'to_csv'):
response = HttpResponse(self.queryset_to_csv(), content_type='text/csv')
filename = 'netbox_{}.csv'.format(self.queryset.model._meta.verbose_name_plural)
response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename)
return response
# Compile a dictionary indicating which permissions are available to the current user for this model
permissions = {}
for action in ('add', 'change', 'delete', 'view'):
perm_name = get_permission_for_model(model, action)
permissions[action] = request.user.has_perm(perm_name)
# Construct the objects table # Construct the objects table
table = self.table(self.queryset, user=request.user) table = self.table(self.queryset, user=request.user)
if 'pk' in table.base_columns and (permissions['change'] or permissions['delete']): if 'pk' in table.base_columns and (permissions['change'] or permissions['delete']):
table.columns.show('pk') table.columns.show('pk')
# Handle table-based export # Handle table-based exports (current view or static CSV-based)
if request.GET.get('export') == 'table': if request.GET.get('export') == 'table':
exclude_columns = {'pk'} return self.export_table(table)
exclude_columns.update({ elif 'export' in request.GET:
name for name, _ in table.available_columns return self.export_table(table, model.csv_headers)
})
exporter = TableExport(
export_format=TableExport.CSV,
table=table,
exclude_columns=exclude_columns,
dataset_kwargs={},
)
return exporter.response(
filename=f'netbox_{self.queryset.model._meta.verbose_name_plural}.csv'
)
# Paginate the objects table # Paginate the objects table
paginate_table(table, request) paginate_table(table, request)

View File

@ -452,12 +452,10 @@ class ViewTestCases:
url = self._get_url('list') url = self._get_url('list')
# Test default CSV export # Test default CSV export
response = self.client.get(f'{url}?export')
self.assertHttpStatus(response, 200)
if hasattr(self.model, 'csv_headers'): if hasattr(self.model, 'csv_headers'):
self.assertEqual(response.get('Content-Type'), 'text/csv') response = self.client.get(f'{url}?export')
content = response.content.decode('utf-8') self.assertHttpStatus(response, 200)
self.assertEqual(content.splitlines()[0], ','.join(self.model.csv_headers)) self.assertEqual(response.get('Content-Type'), 'text/csv; charset=utf-8')
# Test table-based export # Test table-based export
response = self.client.get(f'{url}?export=table') response = self.client.get(f'{url}?export=table')