Merge branch 'develop' into 2921-tags-select2

This commit is contained in:
Jeremy Stretch
2020-02-04 09:37:31 -05:00
committed by GitHub
49 changed files with 2290 additions and 1546 deletions

View File

@@ -1,7 +1,28 @@
# noinspection PyUnresolvedReferences
from django.core.management.commands.makemigrations import Command
from django.conf import settings
from django.core.management.base import CommandError
from django.core.management.commands.makemigrations import Command as _Command
from django.db import models
from . import custom_deconstruct
models.Field.deconstruct = custom_deconstruct
class Command(_Command):
def handle(self, *args, **kwargs):
"""
This built-in management command enables the creation of new database schema migration files, which should
never be required by and ordinary user. We prevent this command from executing unless the configuration
indicates that the user is a developer (i.e. configuration.DEVELOPER == True).
"""
if not settings.DEVELOPER:
raise CommandError(
"This command is available for development purposes only. It will\n"
"NOT resolve any issues with missing or unapplied migrations. For assistance,\n"
"please post to the NetBox mailing list:\n"
" https://groups.google.com/forum/#!forum/netbox-discuss"
)
super().handle(*args, **kwargs)

View File

@@ -1,6 +1,7 @@
import datetime
import json
import re
import yaml
from django import template
from django.utils.html import strip_tags
@@ -76,6 +77,14 @@ def render_json(value):
return json.dumps(value, indent=4, sort_keys=True)
@register.filter()
def render_yaml(value):
"""
Render a dictionary as formatted YAML.
"""
return yaml.dump(dict(value))
@register.filter()
def model_name(obj):
"""

View File

@@ -1,79 +0,0 @@
import logging
from contextlib import contextmanager
from django.contrib.auth.models import Permission, User
from rest_framework.test import APITestCase as _APITestCase
from users.models import Token
class APITestCase(_APITestCase):
def setUp(self):
"""
Create a superuser and token for API calls.
"""
self.user = User.objects.create(username='testuser', is_superuser=True)
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)}
def assertHttpStatus(self, response, expected_status):
"""
Provide more detail in the event of an unexpected HTTP response.
"""
err_message = "Expected HTTP status {}; received {}: {}"
self.assertEqual(response.status_code, expected_status, err_message.format(
expected_status, response.status_code, getattr(response, 'data', 'No data')
))
def create_test_user(username='testuser', permissions=list()):
"""
Create a User with the given permissions.
"""
user = User.objects.create_user(username=username)
for perm_name in permissions:
app, codename = perm_name.split('.')
perm = Permission.objects.get(content_type__app_label=app, codename=codename)
user.user_permissions.add(perm)
return user
def choices_to_dict(choices_list):
"""
Convert a list of field choices to a dictionary suitable for direct comparison with a ChoiceSet. For example:
[
{
"value": "choice-1",
"label": "First Choice"
},
{
"value": "choice-2",
"label": "Second Choice"
}
]
Becomes:
{
"choice-1": "First Choice",
"choice-2": "Second Choice
}
"""
return {
choice['value']: choice['label'] for choice in choices_list
}
@contextmanager
def disable_warnings(logger_name):
"""
Temporarily suppress expected warning messages to keep the test output clean.
"""
logger = logging.getLogger(logger_name)
current_level = logger.level
logger.setLevel(logging.ERROR)
yield
logger.setLevel(current_level)

View File

@@ -0,0 +1,2 @@
from .testcases import *
from .utils import *

View File

@@ -0,0 +1,325 @@
from django.contrib.auth.models import Permission, User
from django.core.exceptions import ObjectDoesNotExist
from django.test import Client, TestCase as _TestCase, override_settings
from django.urls import reverse, NoReverseMatch
from rest_framework.test import APIClient
from users.models import Token
from .utils import disable_warnings, model_to_dict, post_data
class TestCase(_TestCase):
user_permissions = ()
def setUp(self):
# Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser')
self.add_permissions(*self.user_permissions)
# Initialize the test client
self.client = Client()
self.client.force_login(self.user)
#
# Permissions management
#
def add_permissions(self, *names):
"""
Assign a set of permissions to the test user. Accepts permission names in the form <app>.<action>_<model>.
"""
for name in names:
app, codename = name.split('.')
perm = Permission.objects.get(content_type__app_label=app, codename=codename)
self.user.user_permissions.add(perm)
def remove_permissions(self, *names):
"""
Remove a set of permissions from the test user, if assigned.
"""
for name in names:
app, codename = name.split('.')
perm = Permission.objects.get(content_type__app_label=app, codename=codename)
self.user.user_permissions.remove(perm)
#
# Convenience methods
#
def assertHttpStatus(self, response, expected_status):
"""
TestCase method. Provide more detail in the event of an unexpected HTTP response.
"""
err_message = "Expected HTTP status {}; received {}: {}"
self.assertEqual(response.status_code, expected_status, err_message.format(
expected_status, response.status_code, getattr(response, 'data', 'No data')
))
class APITestCase(TestCase):
client_class = APIClient
def setUp(self):
"""
Create a superuser and token for API calls.
"""
self.user = User.objects.create(username='testuser', is_superuser=True)
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)}
class StandardTestCases:
"""
We keep any TestCases with test_* methods inside a class to prevent unittest from trying to run them.
"""
class Views(TestCase):
"""
Stock TestCase suitable for testing all standard View functions:
- List objects
- View single object
- Create new object
- Modify existing object
- Delete existing object
- Import multiple new objects
"""
model = None
# Data to be sent when creating/editing individual objects
form_data = {}
# CSV lines used for bulk import of new objects
csv_data = ()
# Form data to be used when editing multiple objects at once
bulk_edit_data = {}
maxDiff = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.model is None:
raise Exception("Test case requires model to be defined")
def _get_url(self, action, instance=None):
"""
Return the URL name for a specific action. An instance must be specified for
get/edit/delete views.
"""
url_format = '{}:{}_{{}}'.format(
self.model._meta.app_label,
self.model._meta.model_name
)
if action in ('list', 'add', 'import', 'bulk_edit', 'bulk_delete'):
return reverse(url_format.format(action))
elif action in ('get', 'edit', 'delete'):
if instance is None:
raise Exception("Resolving {} URL requires specifying an instance".format(action))
# Attempt to resolve using slug first
if hasattr(self.model, 'slug'):
try:
return reverse(url_format.format(action), kwargs={'slug': instance.slug})
except NoReverseMatch:
pass
return reverse(url_format.format(action), kwargs={'pk': instance.pk})
else:
raise Exception("Invalid action for URL resolution: {}".format(action))
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_list_objects(self):
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.get(self._get_url('list')), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.view_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.get(self._get_url('list'))
self.assertHttpStatus(response, 200)
# Built-in CSV export
if hasattr(self.model, 'csv_headers'):
response = self.client.get('{}?export'.format(self._get_url('list')))
self.assertHttpStatus(response, 200)
self.assertEqual(response.get('Content-Type'), 'text/csv')
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object(self):
instance = self.model.objects.first()
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.view_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.get(instance.get_absolute_url())
self.assertHttpStatus(response, 200)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_create_object(self):
initial_count = self.model.objects.count()
request = {
'path': self._get_url('add'),
'data': post_data(self.form_data),
'follow': False, # Do not follow 302 redirects
}
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.add_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
self.assertEqual(initial_count + 1, self.model.objects.count())
instance = self.model.objects.order_by('-pk').first()
self.assertDictEqual(model_to_dict(instance), self.form_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_edit_object(self):
instance = self.model.objects.first()
request = {
'path': self._get_url('edit', instance),
'data': post_data(self.form_data),
'follow': False, # Do not follow 302 redirects
}
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.change_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
instance = self.model.objects.get(pk=instance.pk)
self.assertDictEqual(model_to_dict(instance), self.form_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_delete_object(self):
instance = self.model.objects.first()
request = {
'path': self._get_url('delete', instance),
'data': {'confirm': True},
'follow': False, # Do not follow 302 redirects
}
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.delete_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
with self.assertRaises(ObjectDoesNotExist):
self.model.objects.get(pk=instance.pk)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_import_objects(self):
initial_count = self.model.objects.count()
request = {
'path': self._get_url('import'),
'data': {
'csv': '\n'.join(self.csv_data)
}
}
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.view_{}'.format(self.model._meta.app_label, self.model._meta.model_name),
'{}.add_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.post(**request)
self.assertHttpStatus(response, 200)
self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_edit_objects(self):
pk_list = self.model.objects.values_list('pk', flat=True)
request = {
'path': self._get_url('bulk_edit'),
'data': {
'pk': pk_list,
'_apply': True, # Form button
},
'follow': False, # Do not follow 302 redirects
}
# Append the form data to the request
request['data'].update(post_data(self.bulk_edit_data))
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.change_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
bulk_edit_fields = self.bulk_edit_data.keys()
for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)):
self.assertDictEqual(
model_to_dict(instance, fields=bulk_edit_fields),
self.bulk_edit_data,
msg="Instance {} failed to validate after bulk edit: {}".format(i, instance)
)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects(self):
pk_list = self.model.objects.values_list('pk', flat=True)
request = {
'path': self._get_url('bulk_delete'),
'data': {
'pk': pk_list,
'confirm': True,
'_confirm': True, # Form button
},
'follow': False, # Do not follow 302 redirects
}
# Attempt to make the request without required permissions
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
# Assign the required permission and submit again
self.add_permissions(
'{}.delete_{}'.format(self.model._meta.app_label, self.model._meta.model_name)
)
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
# Check that all objects were deleted
self.assertEqual(self.model.objects.count(), 0)

View File

@@ -0,0 +1,102 @@
import logging
from contextlib import contextmanager
from django.contrib.auth.models import Permission, User
from django.forms.models import model_to_dict as _model_to_dict
def model_to_dict(instance, fields=None, exclude=None):
"""
Customized wrapper for Django's built-in model_to_dict(). Does the following:
- Excludes the instance ID field
- Exclude any fields prepended with an underscore
- Convert any assigned tags to a comma-separated string
"""
_exclude = ['id']
if exclude is not None:
_exclude += exclude
model_dict = _model_to_dict(instance, fields=fields, exclude=_exclude)
for key in list(model_dict.keys()):
if key.startswith('_'):
del model_dict[key]
# TODO: Differentiate between tags assigned to the instance and a M2M field for tags (ex: ConfigContext)
elif key == 'tags':
model_dict[key] = ','.join(sorted([tag.name for tag in model_dict['tags']]))
# Convert ManyToManyField to list of instance PKs
elif model_dict[key] and type(model_dict[key]) in (list, tuple) and hasattr(model_dict[key][0], 'pk'):
model_dict[key] = [obj.pk for obj in model_dict[key]]
return model_dict
def post_data(data):
"""
Take a dictionary of test data (suitable for comparison to an instance) and return a dict suitable for POSTing.
"""
ret = {}
for key, value in data.items():
if value is None:
ret[key] = ''
elif type(value) in (list, tuple):
ret[key] = value
else:
ret[key] = str(value)
return ret
def create_test_user(username='testuser', permissions=list()):
"""
Create a User with the given permissions.
"""
user = User.objects.create_user(username=username)
for perm_name in permissions:
app, codename = perm_name.split('.')
perm = Permission.objects.get(content_type__app_label=app, codename=codename)
user.user_permissions.add(perm)
return user
def choices_to_dict(choices_list):
"""
Convert a list of field choices to a dictionary suitable for direct comparison with a ChoiceSet. For example:
[
{
"value": "choice-1",
"label": "First Choice"
},
{
"value": "choice-2",
"label": "Second Choice"
}
]
Becomes:
{
"choice-1": "First Choice",
"choice-2": "Second Choice
}
"""
return {
choice['value']: choice['label'] for choice in choices_list
}
@contextmanager
def disable_warnings(logger_name):
"""
Temporarily suppress expected warning messages to keep the test output clean.
"""
logger = logging.getLogger(logger_name)
current_level = logger.level
logger.setLevel(logging.ERROR)
yield
logger.setLevel(current_level)

View File

@@ -4,9 +4,9 @@ from copy import deepcopy
from django.conf import settings
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.db import transaction, IntegrityError
from django.db.models import ProtectedError
from django.db.models import Count, ManyToManyField, ProtectedError
from django.db.models.query import QuerySet
from django.forms import CharField, Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea
from django.http import HttpResponse, HttpResponseServerError
@@ -88,15 +88,27 @@ class ObjectListView(View):
Export the queryset of objects as comma-separated value (CSV), using the model's to_csv() method.
"""
csv_data = []
custom_fields = []
# Start with the column headers
headers = ','.join(self.queryset.model.csv_headers)
csv_data.append(headers)
headers = self.queryset.model.csv_headers.copy()
# Add custom field headers, if any
if hasattr(self.queryset.model, 'get_custom_fields'):
for custom_field in self.queryset.model().get_custom_fields():
headers.append(custom_field.name)
custom_fields.append(custom_field.name)
csv_data.append(','.join(headers))
# Iterate through the queryset appending each object
for obj in self.queryset:
data = csv_format(obj.to_csv())
csv_data.append(data)
data = obj.to_csv()
for custom_field in custom_fields:
data += (obj.cf.get(custom_field, ''),)
csv_data.append(csv_format(data))
return '\n'.join(csv_data)
@@ -631,7 +643,9 @@ class BulkEditView(GetReturnURLMixin, View):
if form.is_valid():
custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else []
standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk']
standard_fields = [
field for field in form.fields if field not in custom_fields + ['pk']
]
nullified_fields = request.POST.getlist('_nullify')
try:
@@ -643,14 +657,29 @@ class BulkEditView(GetReturnURLMixin, View):
# Update standard fields. If a field is listed in _nullify, delete its value.
for name in standard_fields:
if name in form.nullable_fields and name in nullified_fields and isinstance(form.cleaned_data[name], QuerySet):
getattr(obj, name).set([])
elif name in form.nullable_fields and name in nullified_fields:
setattr(obj, name, '' if isinstance(form.fields[name], CharField) else None)
elif isinstance(form.cleaned_data[name], QuerySet) and form.cleaned_data[name]:
try:
model_field = model._meta.get_field(name)
except FieldDoesNotExist:
# The form field is used to modify a field rather than set its value directly,
# so we skip it.
continue
# Handle nullification
if name in form.nullable_fields and name in nullified_fields:
if isinstance(model_field, ManyToManyField):
getattr(obj, name).set([])
else:
setattr(obj, name, None if model_field.null else '')
# ManyToManyFields
elif isinstance(model_field, ManyToManyField):
getattr(obj, name).set(form.cleaned_data[name])
elif form.cleaned_data[name] not in (None, '') and not isinstance(form.cleaned_data[name], QuerySet):
# Normal fields
elif form.cleaned_data[name] not in (None, ''):
setattr(obj, name, form.cleaned_data[name])
obj.full_clean()
obj.save()