mirror of
https://github.com/netbox-community/netbox.git
synced 2025-08-24 08:25:17 -06:00
11617 add tests for yaml, json bulk import
This commit is contained in:
parent
b8d7c8bc0d
commit
5346583ccc
@ -1,4 +1,7 @@
|
|||||||
import csv
|
import csv
|
||||||
|
import io
|
||||||
|
import json
|
||||||
|
import yaml
|
||||||
|
|
||||||
from django.contrib.contenttypes.models import ContentType
|
from django.contrib.contenttypes.models import ContentType
|
||||||
from django.core.exceptions import ObjectDoesNotExist
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
@ -553,10 +556,29 @@ class ViewTestCases:
|
|||||||
def _get_csv_data(self):
|
def _get_csv_data(self):
|
||||||
return '\n'.join(self.csv_data)
|
return '\n'.join(self.csv_data)
|
||||||
|
|
||||||
def _get_update_csv_data(self):
|
def _get_yaml_data(self):
|
||||||
return self.csv_update_data, '\n'.join(self.csv_update_data)
|
data = [*csv.DictReader(io.StringIO(self._get_csv_data()))]
|
||||||
|
return yaml.dump(data)
|
||||||
|
|
||||||
def test_bulk_import_objects_without_permission(self):
|
def _get_json_data(self):
|
||||||
|
data = [*csv.DictReader(io.StringIO(self._get_csv_data()))]
|
||||||
|
return json.dumps(data)
|
||||||
|
|
||||||
|
def _get_update_csv_data(self):
|
||||||
|
return '\n'.join(self.csv_update_data)
|
||||||
|
|
||||||
|
def _get_update_yaml_data(self):
|
||||||
|
data = [*csv.DictReader(io.StringIO(self._get_update_csv_data()))]
|
||||||
|
return yaml.dump(data)
|
||||||
|
|
||||||
|
def _get_update_json_data(self):
|
||||||
|
data = [*csv.DictReader(io.StringIO(self._get_update_csv_data()))]
|
||||||
|
return json.dumps(data)
|
||||||
|
|
||||||
|
def _get_update_array(self):
|
||||||
|
return self.csv_update_data
|
||||||
|
|
||||||
|
def test_bulk_import_objects_without_permission_csv(self):
|
||||||
data = {
|
data = {
|
||||||
'data': self._get_csv_data(),
|
'data': self._get_csv_data(),
|
||||||
'format': 'csv',
|
'format': 'csv',
|
||||||
@ -571,8 +593,38 @@ class ViewTestCases:
|
|||||||
with disable_warnings('django.request'):
|
with disable_warnings('django.request'):
|
||||||
self.assertHttpStatus(response, 403)
|
self.assertHttpStatus(response, 403)
|
||||||
|
|
||||||
|
def test_bulk_import_objects_without_permission_yaml(self):
|
||||||
|
data = {
|
||||||
|
'data': self._get_yaml_data(),
|
||||||
|
'format': 'yaml',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Test GET without permission
|
||||||
|
with disable_warnings('django.request'):
|
||||||
|
self.assertHttpStatus(self.client.get(self._get_url('import')), 403)
|
||||||
|
|
||||||
|
# Try POST without permission
|
||||||
|
response = self.client.post(self._get_url('import'), data)
|
||||||
|
with disable_warnings('django.request'):
|
||||||
|
self.assertHttpStatus(response, 403)
|
||||||
|
|
||||||
|
def test_bulk_import_objects_without_permission_json(self):
|
||||||
|
data = {
|
||||||
|
'data': self._get_json_data(),
|
||||||
|
'format': 'json',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Test GET without permission
|
||||||
|
with disable_warnings('django.request'):
|
||||||
|
self.assertHttpStatus(self.client.get(self._get_url('import')), 403)
|
||||||
|
|
||||||
|
# Try POST without permission
|
||||||
|
response = self.client.post(self._get_url('import'), data)
|
||||||
|
with disable_warnings('django.request'):
|
||||||
|
self.assertHttpStatus(response, 403)
|
||||||
|
|
||||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
def test_bulk_import_objects_with_permission(self):
|
def test_bulk_import_objects_with_permission_csv(self):
|
||||||
initial_count = self._get_queryset().count()
|
initial_count = self._get_queryset().count()
|
||||||
data = {
|
data = {
|
||||||
'data': self._get_csv_data(),
|
'data': self._get_csv_data(),
|
||||||
@ -596,15 +648,64 @@ class ViewTestCases:
|
|||||||
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
||||||
|
|
||||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
def test_bulk_update_objects_with_permission(self):
|
def test_bulk_import_objects_with_permission_yaml(self):
|
||||||
|
initial_count = self._get_queryset().count()
|
||||||
|
data = {
|
||||||
|
'data': self._get_yaml_data(),
|
||||||
|
'format': 'yaml',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Assign model-level permission
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
name='Test permission',
|
||||||
|
actions=['add']
|
||||||
|
)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(self.user)
|
||||||
|
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||||
|
|
||||||
|
# Try GET with model-level permission
|
||||||
|
self.assertHttpStatus(self.client.get(self._get_url('import')), 200)
|
||||||
|
|
||||||
|
# Test POST with permission
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
||||||
|
|
||||||
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
|
def test_bulk_import_objects_with_permission_json(self):
|
||||||
|
initial_count = self._get_queryset().count()
|
||||||
|
data = {
|
||||||
|
'data': self._get_json_data(),
|
||||||
|
'format': 'json',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Assign model-level permission
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
name='Test permission',
|
||||||
|
actions=['add']
|
||||||
|
)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(self.user)
|
||||||
|
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||||
|
|
||||||
|
# Try GET with model-level permission
|
||||||
|
self.assertHttpStatus(self.client.get(self._get_url('import')), 200)
|
||||||
|
|
||||||
|
# Test POST with permission
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
||||||
|
|
||||||
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
|
def test_bulk_update_objects_with_permission_csv(self):
|
||||||
if not hasattr(self, 'csv_update_data'):
|
if not hasattr(self, 'csv_update_data'):
|
||||||
raise NotImplementedError("The test must define csv_update_data.")
|
raise NotImplementedError("The test must define csv_update_data.")
|
||||||
|
|
||||||
initial_count = self._get_queryset().count()
|
initial_count = self._get_queryset().count()
|
||||||
array, csv_data = self._get_update_csv_data()
|
array = self._get_update_array()
|
||||||
|
update_data = self._get_update_csv_data()
|
||||||
data = {
|
data = {
|
||||||
'format': ImportFormatChoices.CSV,
|
'format': ImportFormatChoices.CSV,
|
||||||
'data': csv_data,
|
'data': update_data,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Assign model-level permission
|
# Assign model-level permission
|
||||||
@ -633,7 +734,83 @@ class ViewTestCases:
|
|||||||
self.assertEqual(value, value)
|
self.assertEqual(value, value)
|
||||||
|
|
||||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
def test_bulk_import_objects_with_constrained_permission(self):
|
def test_bulk_update_objects_with_permission_yaml(self):
|
||||||
|
if not hasattr(self, 'csv_update_data'):
|
||||||
|
raise NotImplementedError("The test must define csv_update_data.")
|
||||||
|
|
||||||
|
initial_count = self._get_queryset().count()
|
||||||
|
array = self._get_update_array()
|
||||||
|
update_data = self._get_update_yaml_data()
|
||||||
|
data = {
|
||||||
|
'format': ImportFormatChoices.YAML,
|
||||||
|
'data': update_data,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Assign model-level permission
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
name='Test permission',
|
||||||
|
actions=['add']
|
||||||
|
)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(self.user)
|
||||||
|
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||||
|
|
||||||
|
# Test POST with permission
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(initial_count, self._get_queryset().count())
|
||||||
|
|
||||||
|
reader = csv.DictReader(array, delimiter=',')
|
||||||
|
check_data = list(reader)
|
||||||
|
for line in check_data:
|
||||||
|
obj = self.model.objects.get(id=line["id"])
|
||||||
|
for attr, value in line.items():
|
||||||
|
if attr != "id":
|
||||||
|
field = self.model._meta.get_field(attr)
|
||||||
|
value = getattr(obj, attr)
|
||||||
|
# cannot verify FK fields as don't know what name the CSV maps to
|
||||||
|
if value is not None and not isinstance(field, ForeignKey):
|
||||||
|
self.assertEqual(value, value)
|
||||||
|
|
||||||
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
|
def test_bulk_update_objects_with_permission_json(self):
|
||||||
|
if not hasattr(self, 'csv_update_data'):
|
||||||
|
raise NotImplementedError("The test must define csv_update_data.")
|
||||||
|
|
||||||
|
initial_count = self._get_queryset().count()
|
||||||
|
array = self._get_update_array()
|
||||||
|
update_data = self._get_update_json_data()
|
||||||
|
data = {
|
||||||
|
'format': ImportFormatChoices.JSON,
|
||||||
|
'data': update_data,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Assign model-level permission
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
name='Test permission',
|
||||||
|
actions=['add']
|
||||||
|
)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(self.user)
|
||||||
|
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||||
|
|
||||||
|
# Test POST with permission
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(initial_count, self._get_queryset().count())
|
||||||
|
|
||||||
|
reader = csv.DictReader(array, delimiter=',')
|
||||||
|
check_data = list(reader)
|
||||||
|
for line in check_data:
|
||||||
|
obj = self.model.objects.get(id=line["id"])
|
||||||
|
for attr, value in line.items():
|
||||||
|
if attr != "id":
|
||||||
|
field = self.model._meta.get_field(attr)
|
||||||
|
value = getattr(obj, attr)
|
||||||
|
# cannot verify FK fields as don't know what name the CSV maps to
|
||||||
|
if value is not None and not isinstance(field, ForeignKey):
|
||||||
|
self.assertEqual(value, value)
|
||||||
|
|
||||||
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
|
def test_bulk_import_objects_with_constrained_permission_csv(self):
|
||||||
initial_count = self._get_queryset().count()
|
initial_count = self._get_queryset().count()
|
||||||
data = {
|
data = {
|
||||||
'data': self._get_csv_data(),
|
'data': self._get_csv_data(),
|
||||||
@ -662,6 +839,66 @@ class ViewTestCases:
|
|||||||
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302)
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 302)
|
||||||
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
||||||
|
|
||||||
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
|
def test_bulk_import_objects_with_constrained_permission_yaml(self):
|
||||||
|
initial_count = self._get_queryset().count()
|
||||||
|
data = {
|
||||||
|
'data': self._get_yaml_data(),
|
||||||
|
'format': 'yaml',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Assign constrained permission
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
name='Test permission',
|
||||||
|
constraints={'pk': 0}, # Dummy permission to deny all
|
||||||
|
actions=['add']
|
||||||
|
)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(self.user)
|
||||||
|
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||||
|
|
||||||
|
# Attempt to import non-permitted objects
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(self._get_queryset().count(), initial_count)
|
||||||
|
|
||||||
|
# Update permission constraints
|
||||||
|
obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
|
||||||
|
obj_perm.save()
|
||||||
|
|
||||||
|
# Import permitted objects
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
||||||
|
|
||||||
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||||
|
def test_bulk_import_objects_with_constrained_permission_json(self):
|
||||||
|
initial_count = self._get_queryset().count()
|
||||||
|
data = {
|
||||||
|
'data': self._get_json_data(),
|
||||||
|
'format': 'json',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Assign constrained permission
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
name='Test permission',
|
||||||
|
constraints={'pk': 0}, # Dummy permission to deny all
|
||||||
|
actions=['add']
|
||||||
|
)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(self.user)
|
||||||
|
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||||
|
|
||||||
|
# Attempt to import non-permitted objects
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(self._get_queryset().count(), initial_count)
|
||||||
|
|
||||||
|
# Update permission constraints
|
||||||
|
obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
|
||||||
|
obj_perm.save()
|
||||||
|
|
||||||
|
# Import permitted objects
|
||||||
|
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
|
||||||
|
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
|
||||||
|
|
||||||
class BulkEditObjectsViewTestCase(ModelViewTestCase):
|
class BulkEditObjectsViewTestCase(ModelViewTestCase):
|
||||||
"""
|
"""
|
||||||
Edit multiple instances.
|
Edit multiple instances.
|
||||||
|
Loading…
Reference in New Issue
Block a user