Merge branch 'develop-2.9' into 2006-scripts-reports-background

This commit is contained in:
Jeremy Stretch
2020-07-02 11:00:59 -04:00
committed by GitHub
144 changed files with 4747 additions and 3590 deletions

View File

@@ -254,8 +254,11 @@ class WritableNestedSerializer(ModelSerializer):
# Dictionary of related object attributes
if isinstance(data, dict):
params = dict_to_filter_params(data)
queryset = self.Meta.model.objects
if hasattr(queryset, 'restrict'):
queryset = queryset.unrestricted()
try:
return self.Meta.model.objects.get(**params)
return queryset.get(**params)
except ObjectDoesNotExist:
raise ValidationError(
"Related object not found using the provided attributes: {}".format(params)
@@ -281,8 +284,11 @@ class WritableNestedSerializer(ModelSerializer):
)
# Look up object by PK
queryset = self.Meta.model.objects
if hasattr(queryset, 'restrict'):
queryset = queryset.unrestricted()
try:
return self.Meta.model.objects.get(pk=int(data))
return queryset.get(pk=int(data))
except ObjectDoesNotExist:
raise ValidationError(
"Related object not found using the provided numeric ID: {}".format(pk)
@@ -327,7 +333,7 @@ class ModelViewSet(_ModelViewSet):
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
if not request.user.is_authenticated or request.user.is_superuser:
if not request.user.is_authenticated:
return
# TODO: Reconcile this with TokenPermissions.perms_map

View File

@@ -5,14 +5,8 @@ from drf_yasg.utils import get_serializer_ref_name
from rest_framework.fields import ChoiceField
from rest_framework.relations import ManyRelatedField
from dcim.api.serializers import InterfaceSerializer as DeviceInterfaceSerializer
from extras.api.customfields import CustomFieldsSerializer
from utilities.api import ChoiceField, SerializedPKRelatedField, WritableNestedSerializer
from virtualization.api.serializers import InterfaceSerializer as VirtualMachineInterfaceSerializer
# this might be ugly, but it limits drf_yasg-specific code to this file
DeviceInterfaceSerializer.Meta.ref_name = 'DeviceInterface'
VirtualMachineInterfaceSerializer.Meta.ref_name = 'VirtualMachineInterface'
class NetBoxSwaggerAutoSchema(SwaggerAutoSchema):

View File

@@ -68,6 +68,6 @@ class NaturalOrderingField(models.CharField):
return (
self.name,
'utilities.fields.NaturalOrderingField',
['target_field'],
[self.target_field],
kwargs,
)

View File

@@ -102,7 +102,7 @@ class TagFilter(django_filters.ModelMultipleChoiceFilter):
kwargs.setdefault('field_name', 'tags__slug')
kwargs.setdefault('to_field_name', 'slug')
kwargs.setdefault('conjoined', True)
kwargs.setdefault('queryset', Tag.objects.all())
kwargs.setdefault('queryset', Tag.objects.unrestricted())
super().__init__(*args, **kwargs)

View File

@@ -15,6 +15,7 @@ from django.forms import BoundField
from django.forms.models import fields_for_model
from django.urls import reverse
from utilities.querysets import RestrictedQuerySet
from .choices import ColorChoices, unpack_grouped_choices
from .validators import EnhancedURLValidator
@@ -138,6 +139,16 @@ def form_from_model(model, fields):
return type('FormFromModel', (forms.Form,), form_fields)
def restrict_form_fields(form, user, action='view'):
"""
Restrict all form fields which reference a RestrictedQuerySet. This ensures that users see only permitted objects
as available choices.
"""
for field in form.fields.values():
if hasattr(field, 'queryset') and issubclass(field.queryset.__class__, RestrictedQuerySet):
field.queryset = field.queryset.restrict(user, action)
#
# Widgets
#
@@ -518,8 +529,8 @@ class ExpandableNameField(forms.CharField):
"""
def to_python(self, value):
if value is None:
return list()
if not value:
return ''
if re.search(ALPHANUMERIC_EXPANSION_PATTERN, value):
return list(expand_alphanumeric_pattern(value))
return [value]
@@ -585,7 +596,7 @@ class TagFilterField(forms.MultipleChoiceField):
def __init__(self, model, *args, **kwargs):
def get_choices():
tags = model.tags.annotate(
tags = model.tags.all().unrestricted().annotate(
count=Count('extras_taggeditem_items')
).order_by('name')
return [
@@ -733,6 +744,30 @@ class BulkEditForm(forms.Form):
self.nullable_fields = self.Meta.nullable_fields
class BulkRenameForm(forms.Form):
"""
An extendable form to be used for renaming objects in bulk.
"""
find = forms.CharField()
replace = forms.CharField()
use_regex = forms.BooleanField(
required=False,
initial=True,
label='Use regular expressions'
)
def clean(self):
# Validate regular expression in "find" field
if self.cleaned_data['use_regex']:
try:
re.compile(self.cleaned_data['find'])
except re.error:
raise forms.ValidationError({
'find': "Invalid regular expression"
})
class CSVModelForm(forms.ModelForm):
"""
ModelForm used for the import of objects in CSV format.
@@ -795,31 +830,6 @@ class ImportForm(BootstrapMixin, forms.Form):
})
class LabeledComponentForm(BootstrapMixin, forms.Form):
"""
Base form for adding label pattern validation to `Create` forms
"""
name_pattern = ExpandableNameField(
label='Name'
)
label_pattern = ExpandableNameField(
label='Label',
required=False
)
def clean(self):
# Validate that the number of components being created from both the name_pattern and label_pattern are equal
name_pattern_count = len(self.cleaned_data['name_pattern'])
label_pattern_count = len(self.cleaned_data['label_pattern'])
if label_pattern_count and name_pattern_count != label_pattern_count:
raise forms.ValidationError({
'label_pattern': 'The provided name pattern will create {} components, however {} labels will '
'be generated. These counts must match.'.format(
name_pattern_count, label_pattern_count)
}, code='label_pattern_mismatch')
class TableConfigForm(BootstrapMixin, forms.Form):
"""
Form for configuring user's table preferences.

View File

@@ -0,0 +1,19 @@
from rest_framework.metadata import SimpleMetadata
from django.utils.encoding import force_str
from utilities.api import ContentTypeField
class ContentTypeMetadata(SimpleMetadata):
def get_field_info(self, field):
field_info = super().get_field_info(field)
if hasattr(field, 'queryset') and not field_info.get('read_only') and isinstance(field, ContentTypeField):
field_info['choices'] = [
{
'value': choice_value,
'display_name': force_str(choice_name, strings_only=True)
}
for choice_value, choice_name in field.choices.items()
]
field_info['choices'].sort(key=lambda item: item['display_name'])
return field_info

View File

@@ -59,13 +59,13 @@ class RestrictedQuerySet(QuerySet):
self.allow_evaluation = True
return self
def restrict(self, user, action):
def restrict(self, user, action='view'):
"""
Filter the QuerySet to return only objects on which the specified user has been granted the specified
permission.
:param user: User instance
:param action: The action which must be permitted (e.g. "view" for "dcim.view_site")
:param action: The action which must be permitted (e.g. "view" for "dcim.view_site"); default is 'view'
"""
# Resolve the full name of the required permission
app_label = self.model._meta.app_label

View File

@@ -123,6 +123,49 @@ class BooleanColumn(tables.Column):
return mark_safe(rendered)
class ButtonsColumn(tables.TemplateColumn):
"""
Render edit, delete, and changelog buttons for an object.
:param model: Model class to use for calculating URL view names
:param prepend_content: Additional template content to render in the column (optional)
"""
attrs = {'td': {'class': 'text-right text-nowrap noprint'}}
# Note that braces are escaped to allow for string formatting prior to template rendering
template_code = """
<a href="{{% url '{app_label}:{model_name}_changelog' {pk_field}=record.{pk_field} %}}" class="btn btn-default btn-xs" title="Change log">
<i class="fa fa-history"></i>
</a>
{{% if perms.{app_label}.change_{model_name} %}}
<a href="{{% url '{app_label}:{model_name}_edit' {pk_field}=record.{pk_field} %}}?return_url={{{{ request.path }}}}" class="btn btn-xs btn-warning" title="Edit">
<i class="fa fa-pencil"></i>
</a>
{{% endif %}}
{{% if perms.{app_label}.delete_{model_name} %}}
<a href="{{% url '{app_label}:{model_name}_delete' {pk_field}=record.{pk_field} %}}?return_url={{{{ request.path }}}}" class="btn btn-xs btn-danger" title="Delete">
<i class="fa fa-trash"></i>
</a>
{{% endif %}}
"""
def __init__(self, model, *args, pk_field='pk', prepend_template=None, **kwargs):
if prepend_template:
prepend_template = prepend_template.replace('{', '{{')
prepend_template = prepend_template.replace('}', '}}')
self.template_code = prepend_template + self.template_code
template_code = self.template_code.format(
app_label=model._meta.app_label,
model_name=model._meta.model_name,
pk_field=pk_field
)
super().__init__(template_code=template_code, *args, **kwargs)
def header(self):
return ''
class ColorColumn(tables.Column):
"""
Display a color (#RRGGBB).

View File

@@ -75,16 +75,24 @@ def meta(obj, attr):
@register.filter()
def url_name(model, action):
def viewname(model, action):
"""
Return the URL name for the given model and action, or None if invalid.
Return the view name for the given model and action. Does not perform any validation.
"""
url_name = '{}:{}_{}'.format(model._meta.app_label, model._meta.model_name, action)
return f'{model._meta.app_label}:{model._meta.model_name}_{action}'
@register.filter()
def validated_viewname(model, action):
"""
Return the view name for the given model and action if valid, or None if invalid.
"""
viewname = f'{model._meta.app_label}:{model._meta.model_name}_{action}'
try:
# Validate and return the URL name. We don't return the actual URL yet because many of the templates
# Validate and return the view name. We don't return the actual URL yet because many of the templates
# are written to pass a name to {% url %}.
reverse(url_name)
return url_name
reverse(viewname)
return viewname
except NoReverseMatch:
return None

View File

@@ -0,0 +1,30 @@
from django import template
register = template.Library()
def _check_permission(user, instance, action):
return user.has_perm(
perm=f'{instance._meta.app_label}.{action}_{instance._meta.model_name}',
obj=instance
)
@register.filter()
def can_view(user, instance):
return _check_permission(user, instance, 'view')
@register.filter()
def can_add(user, instance):
return _check_permission(user, instance, 'add')
@register.filter()
def can_change(user, instance):
return _check_permission(user, instance, 'change')
@register.filter()
def can_delete(user, instance):
return _check_permission(user, instance, 'delete')

View File

@@ -52,7 +52,7 @@ class APIViewTestCases:
"""
GET a single object as an unauthenticated user.
"""
url = self._get_detail_url(self.model.objects.first())
url = self._get_detail_url(self.model.objects.unrestricted().first())
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
@@ -61,7 +61,7 @@ class APIViewTestCases:
"""
GET a single object as an authenticated user without the required permission.
"""
url = self._get_detail_url(self.model.objects.first())
url = self._get_detail_url(self.model.objects.unrestricted().first())
# Try GET without permission
with disable_warnings('django.request'):
@@ -72,9 +72,9 @@ class APIViewTestCases:
"""
GET a single object as an authenticated user with permission to view the object.
"""
self.assertGreaterEqual(self.model.objects.count(), 2,
self.assertGreaterEqual(self.model.objects.unrestricted().count(), 2,
f"Test requires the creation of at least two {self.model} instances")
instance1, instance2 = self.model.objects.all()[:2]
instance1, instance2 = self.model.objects.unrestricted()[:2]
# Add object-level permission
obj_perm = ObjectPermission(
@@ -104,7 +104,7 @@ class APIViewTestCases:
url = self._get_list_url()
response = self.client.get(url, **self.header)
self.assertEqual(len(response.data['results']), self.model.objects.count())
self.assertEqual(len(response.data['results']), self.model.objects.unrestricted().count())
self.assertHttpStatus(response, status.HTTP_200_OK)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
@@ -115,7 +115,7 @@ class APIViewTestCases:
url = f'{self._get_list_url()}?brief=1'
response = self.client.get(url, **self.header)
self.assertEqual(len(response.data['results']), self.model.objects.count())
self.assertEqual(len(response.data['results']), self.model.objects.unrestricted().count())
self.assertEqual(sorted(response.data['results'][0]), self.brief_fields)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@@ -134,9 +134,9 @@ class APIViewTestCases:
"""
GET a list of objects as an authenticated user with permission to view the objects.
"""
self.assertGreaterEqual(self.model.objects.count(), 3,
self.assertGreaterEqual(self.model.objects.unrestricted().count(), 3,
f"Test requires the creation of at least three {self.model} instances")
instance1, instance2 = self.model.objects.all()[:2]
instance1, instance2 = self.model.objects.unrestricted()[:2]
# Add object-level permission
obj_perm = ObjectPermission(
@@ -178,11 +178,15 @@ class APIViewTestCases:
obj_perm.users.add(self.user)
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
initial_count = self.model.objects.count()
initial_count = self.model.objects.unrestricted().count()
response = self.client.post(self._get_list_url(), self.create_data[0], format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(self.model.objects.count(), initial_count + 1)
self.assertInstanceEqual(self.model.objects.get(pk=response.data['id']), self.create_data[0], api=True)
self.assertEqual(self.model.objects.unrestricted().count(), initial_count + 1)
self.assertInstanceEqual(
self.model.objects.unrestricted().get(pk=response.data['id']),
self.create_data[0],
api=True
)
def test_bulk_create_objects(self):
"""
@@ -196,13 +200,17 @@ class APIViewTestCases:
obj_perm.users.add(self.user)
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
initial_count = self.model.objects.count()
initial_count = self.model.objects.unrestricted().count()
response = self.client.post(self._get_list_url(), self.create_data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(len(response.data), len(self.create_data))
self.assertEqual(self.model.objects.count(), initial_count + len(self.create_data))
self.assertEqual(self.model.objects.unrestricted().count(), initial_count + len(self.create_data))
for i, obj in enumerate(response.data):
self.assertInstanceEqual(self.model.objects.get(pk=obj['id']), self.create_data[i], api=True)
self.assertInstanceEqual(
self.model.objects.unrestricted().get(pk=obj['id']),
self.create_data[i],
api=True
)
class UpdateObjectViewTestCase(APITestCase):
update_data = {}
@@ -211,7 +219,7 @@ class APIViewTestCases:
"""
PATCH a single object without permission.
"""
url = self._get_detail_url(self.model.objects.first())
url = self._get_detail_url(self.model.objects.unrestricted().first())
update_data = self.update_data or getattr(self, 'create_data')[0]
# Try PATCH without permission
@@ -223,7 +231,7 @@ class APIViewTestCases:
"""
PATCH a single object identified by its numeric ID.
"""
instance = self.model.objects.first()
instance = self.model.objects.unrestricted().first()
url = self._get_detail_url(instance)
update_data = self.update_data or getattr(self, 'create_data')[0]
@@ -246,7 +254,7 @@ class APIViewTestCases:
"""
DELETE a single object without permission.
"""
url = self._get_detail_url(self.model.objects.first())
url = self._get_detail_url(self.model.objects.unrestricted().first())
# Try DELETE without permission
with disable_warnings('django.request'):
@@ -257,7 +265,7 @@ class APIViewTestCases:
"""
DELETE a single object identified by its numeric ID.
"""
instance = self.model.objects.first()
instance = self.model.objects.unrestricted().first()
url = self._get_detail_url(instance)
# Add object-level permission
@@ -270,7 +278,7 @@ class APIViewTestCases:
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertFalse(self.model.objects.filter(pk=instance.pk).exists())
self.assertFalse(self.model.objects.unrestricted().filter(pk=instance.pk).exists())
class APIViewTestCase(
GetObjectViewTestCase,

View File

@@ -165,6 +165,14 @@ class ModelViewTestCase(TestCase):
if self.model is None:
raise Exception("Test case requires model to be defined")
def _get_queryset(self):
"""
Return a base queryset suitable for use in test methods. Call unrestricted() if RestrictedQuerySet is in use.
"""
if hasattr(self.model.objects, 'restrict'):
return self.model.objects.unrestricted()
return self.model.objects.all()
def _get_base_url(self):
"""
Return the base format for a URL for the test's model. Override this to test for a model which belongs
@@ -177,27 +185,23 @@ class ModelViewTestCase(TestCase):
def _get_url(self, action, instance=None):
"""
Return the URL name for a specific action. An instance must be specified for
get/edit/delete views.
Return the URL name for a specific action and optionally a specific instance
"""
url_format = self._get_base_url()
if action in ('list', 'add', 'import', 'bulk_edit', 'bulk_delete'):
# If no instance was provided, assume we don't need a unique identifier
if instance is None:
return reverse(url_format.format(action))
elif action in ('get', 'edit', 'delete'):
if instance is None:
raise Exception("Resolving {} URL requires specifying an instance".format(action))
# Attempt to resolve using slug first
if hasattr(self.model, 'slug'):
try:
return reverse(url_format.format(action), kwargs={'slug': instance.slug})
except NoReverseMatch:
pass
return reverse(url_format.format(action), kwargs={'pk': instance.pk})
# Attempt to resolve using slug as the unique identifier if one exists
if hasattr(self.model, 'slug'):
try:
return reverse(url_format.format(action), kwargs={'slug': instance.slug})
except NoReverseMatch:
pass
else:
raise Exception("Invalid action for URL resolution: {}".format(action))
# Default to using the numeric PK to retrieve the URL for an object
return reverse(url_format.format(action), kwargs={'pk': instance.pk})
class ViewTestCases:
@@ -212,12 +216,12 @@ class ViewTestCases:
def test_get_object_anonymous(self):
# Make the request as an unauthenticated user
self.client.logout()
response = self.client.get(self.model.objects.first().get_absolute_url())
response = self.client.get(self._get_queryset().first().get_absolute_url())
self.assertHttpStatus(response, 200)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object_without_permission(self):
instance = self.model.objects.first()
instance = self._get_queryset().first()
# Try GET without permission
with disable_warnings('django.request'):
@@ -225,7 +229,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object_with_permission(self):
instance = self.model.objects.first()
instance = self._get_queryset().first()
# Add model-level permission
obj_perm = ObjectPermission(
@@ -240,7 +244,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object_with_constrained_permission(self):
instance1, instance2 = self.model.objects.all()[:2]
instance1, instance2 = self._get_queryset().all()[:2]
# Add object-level permission
obj_perm = ObjectPermission(
@@ -257,6 +261,16 @@ class ViewTestCases:
# Try GET to non-permitted object
self.assertHttpStatus(self.client.get(instance2.get_absolute_url()), 404)
class GetObjectChangelogViewTestCase(ModelViewTestCase):
"""
View the changelog for an instance.
"""
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_get_object_changelog(self):
url = self._get_url('changelog', self._get_queryset().first())
response = self.client.get(url)
self.assertHttpStatus(response, 200)
class CreateObjectViewTestCase(ModelViewTestCase):
"""
Create a single new instance.
@@ -265,12 +279,11 @@ class ViewTestCases:
"""
form_data = {}
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_create_object_without_permission(self):
# Try GET without permission
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(self._get_url('add')), 403)
self.assertHttpStatus(self.client.get(self._get_url('add')), 403)
# Try POST without permission
request = {
@@ -281,9 +294,9 @@ class ViewTestCases:
with disable_warnings('django.request'):
self.assertHttpStatus(response, 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_create_object_with_permission(self):
initial_count = self.model.objects.count()
initial_count = self._get_queryset().count()
# Assign unconstrained permission
obj_perm = ObjectPermission(
@@ -302,12 +315,12 @@ class ViewTestCases:
'data': post_data(self.form_data),
}
self.assertHttpStatus(self.client.post(**request), 302)
self.assertEqual(initial_count + 1, self.model.objects.count())
self.assertInstanceEqual(self.model.objects.order_by('pk').last(), self.form_data)
self.assertEqual(initial_count + 1, self._get_queryset().count())
self.assertInstanceEqual(self._get_queryset().order_by('pk').last(), self.form_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_create_object_with_constrained_permission(self):
initial_count = self.model.objects.count()
initial_count = self._get_queryset().count()
# Assign constrained permission
obj_perm = ObjectPermission(
@@ -327,7 +340,7 @@ class ViewTestCases:
'data': post_data(self.form_data),
}
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(initial_count, self.model.objects.count()) # Check that no object was created
self.assertEqual(initial_count, self._get_queryset().count()) # Check that no object was created
# Update the ObjectPermission to allow creation
obj_perm.constraints = {'pk__gt': 0}
@@ -339,8 +352,8 @@ class ViewTestCases:
'data': post_data(self.form_data),
}
self.assertHttpStatus(self.client.post(**request), 302)
self.assertEqual(initial_count + 1, self.model.objects.count())
self.assertInstanceEqual(self.model.objects.order_by('pk').last(), self.form_data)
self.assertEqual(initial_count + 1, self._get_queryset().count())
self.assertInstanceEqual(self._get_queryset().order_by('pk').last(), self.form_data)
class EditObjectViewTestCase(ModelViewTestCase):
"""
@@ -350,13 +363,12 @@ class ViewTestCases:
"""
form_data = {}
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_edit_object_without_permission(self):
instance = self.model.objects.first()
instance = self._get_queryset().first()
# Try GET without permission
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(self._get_url('edit', instance)), 403)
self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 403)
# Try POST without permission
request = {
@@ -366,9 +378,9 @@ class ViewTestCases:
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_edit_object_with_permission(self):
instance = self.model.objects.first()
instance = self._get_queryset().first()
# Assign model-level permission
obj_perm = ObjectPermission(
@@ -387,11 +399,11 @@ class ViewTestCases:
'data': post_data(self.form_data),
}
self.assertHttpStatus(self.client.post(**request), 302)
self.assertInstanceEqual(self.model.objects.get(pk=instance.pk), self.form_data)
self.assertInstanceEqual(self._get_queryset().get(pk=instance.pk), self.form_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_edit_object_with_constrained_permission(self):
instance1, instance2 = self.model.objects.all()[:2]
instance1, instance2 = self._get_queryset().all()[:2]
# Assign constrained permission
obj_perm = ObjectPermission(
@@ -414,7 +426,7 @@ class ViewTestCases:
'data': post_data(self.form_data),
}
self.assertHttpStatus(self.client.post(**request), 302)
self.assertInstanceEqual(self.model.objects.get(pk=instance1.pk), self.form_data)
self.assertInstanceEqual(self._get_queryset().get(pk=instance1.pk), self.form_data)
# Try to edit a non-permitted object
request = {
@@ -427,9 +439,8 @@ class ViewTestCases:
"""
Delete a single instance.
"""
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_delete_object_without_permission(self):
instance = self.model.objects.first()
instance = self._get_queryset().first()
# Try GET without permission
with disable_warnings('django.request'):
@@ -443,9 +454,9 @@ class ViewTestCases:
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_delete_object_with_permission(self):
instance = self.model.objects.first()
instance = self._get_queryset().first()
# Assign model-level permission
obj_perm = ObjectPermission(
@@ -465,11 +476,11 @@ class ViewTestCases:
}
self.assertHttpStatus(self.client.post(**request), 302)
with self.assertRaises(ObjectDoesNotExist):
self.model.objects.get(pk=instance.pk)
self._get_queryset().get(pk=instance.pk)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_delete_object_with_constrained_permission(self):
instance1, instance2 = self.model.objects.all()[:2]
instance1, instance2 = self._get_queryset().all()[:2]
# Assign object-level permission
obj_perm = ObjectPermission(
@@ -493,7 +504,7 @@ class ViewTestCases:
}
self.assertHttpStatus(self.client.post(**request), 302)
with self.assertRaises(ObjectDoesNotExist):
self.model.objects.get(pk=instance1.pk)
self._get_queryset().get(pk=instance1.pk)
# Try to delete a non-permitted object
request = {
@@ -501,7 +512,7 @@ class ViewTestCases:
'data': post_data({'confirm': True}),
}
self.assertHttpStatus(self.client.post(**request), 404)
self.assertTrue(self.model.objects.filter(pk=instance2.pk).exists())
self.assertTrue(self._get_queryset().filter(pk=instance2.pk).exists())
class ListObjectsViewTestCase(ModelViewTestCase):
"""
@@ -543,7 +554,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_list_objects_with_constrained_permission(self):
instance1, instance2 = self.model.objects.all()[:2]
instance1, instance2 = self._get_queryset().all()[:2]
# Add object-level permission
obj_perm = ObjectPermission(
@@ -588,7 +599,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_create_objects_with_permission(self):
initial_count = self.model.objects.count()
initial_count = self._get_queryset().count()
request = {
'path': self._get_url('add'),
'data': post_data(self.bulk_create_data),
@@ -605,13 +616,13 @@ class ViewTestCases:
# Bulk create objects
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
self.assertEqual(initial_count + self.bulk_create_count, self.model.objects.count())
for instance in self.model.objects.order_by('-pk')[:self.bulk_create_count]:
self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count())
for instance in self._get_queryset().order_by('-pk')[:self.bulk_create_count]:
self.assertInstanceEqual(instance, self.bulk_create_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_create_objects_with_constrained_permission(self):
initial_count = self.model.objects.count()
initial_count = self._get_queryset().count()
request = {
'path': self._get_url('add'),
'data': post_data(self.bulk_create_data),
@@ -628,7 +639,7 @@ class ViewTestCases:
# Attempt to make the request with unmet constraints
self.assertHttpStatus(self.client.post(**request), 200)
self.assertEqual(self.model.objects.count(), initial_count)
self.assertEqual(self._get_queryset().count(), initial_count)
# Update the ObjectPermission to allow creation
obj_perm.constraints = {'pk__gt': 0} # Dummy constraint to allow all
@@ -636,8 +647,8 @@ class ViewTestCases:
response = self.client.post(**request)
self.assertHttpStatus(response, 302)
self.assertEqual(initial_count + self.bulk_create_count, self.model.objects.count())
for instance in self.model.objects.order_by('-pk')[:self.bulk_create_count]:
self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count())
for instance in self._get_queryset().order_by('-pk')[:self.bulk_create_count]:
self.assertInstanceEqual(instance, self.bulk_create_data)
class BulkImportObjectsViewTestCase(ModelViewTestCase):
@@ -651,7 +662,6 @@ class ViewTestCases:
def _get_csv_data(self):
return '\n'.join(self.csv_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_import_objects_without_permission(self):
data = {
'csv': self._get_csv_data(),
@@ -666,9 +676,9 @@ class ViewTestCases:
with disable_warnings('django.request'):
self.assertHttpStatus(response, 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_bulk_import_objects_with_permission(self):
initial_count = self.model.objects.count()
initial_count = self._get_queryset().count()
data = {
'csv': self._get_csv_data(),
}
@@ -686,11 +696,11 @@ class ViewTestCases:
# Test POST with permission
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1)
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_bulk_import_objects_with_constrained_permission(self):
initial_count = self.model.objects.count()
initial_count = self._get_queryset().count()
data = {
'csv': self._get_csv_data(),
}
@@ -706,7 +716,7 @@ class ViewTestCases:
# Attempt to import non-permitted objects
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
self.assertEqual(self.model.objects.count(), initial_count)
self.assertEqual(self._get_queryset().count(), initial_count)
# Update permission constraints
obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
@@ -714,7 +724,7 @@ class ViewTestCases:
# Import permitted objects
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1)
self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1)
class BulkEditObjectsViewTestCase(ModelViewTestCase):
"""
@@ -725,9 +735,8 @@ class ViewTestCases:
"""
bulk_edit_data = {}
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_edit_objects_without_permission(self):
pk_list = self.model.objects.values_list('pk', flat=True)[:3]
pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
data = {
'pk': pk_list,
'_apply': True, # Form button
@@ -741,9 +750,9 @@ class ViewTestCases:
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_bulk_edit_objects_with_permission(self):
pk_list = self.model.objects.values_list('pk', flat=True)[:3]
pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
data = {
'pk': pk_list,
'_apply': True, # Form button
@@ -762,13 +771,12 @@ class ViewTestCases:
# Try POST with model-level permission
self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302)
for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)):
for i, instance in enumerate(self._get_queryset().filter(pk__in=pk_list)):
self.assertInstanceEqual(instance, self.bulk_edit_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_bulk_edit_objects_with_constrained_permission(self):
initial_instances = self.model.objects.all()[:3]
pk_list = list(self.model.objects.values_list('pk', flat=True)[:3])
pk_list = list(self._get_queryset().values_list('pk', flat=True)[:3])
data = {
'pk': pk_list,
'_apply': True, # Form button
@@ -780,7 +788,7 @@ class ViewTestCases:
# Dynamically determine a constraint that will *not* be matched by the updated objects.
attr_name = list(self.bulk_edit_data.keys())[0]
field = self.model._meta.get_field(attr_name)
value = field.value_from_object(self.model.objects.first())
value = field.value_from_object(self._get_queryset().first())
# Assign constrained permission
obj_perm = ObjectPermission(
@@ -801,7 +809,7 @@ class ViewTestCases:
# Bulk edit permitted objects
self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302)
for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)):
for i, instance in enumerate(self._get_queryset().filter(pk__in=pk_list)):
self.assertInstanceEqual(instance, self.bulk_edit_data)
class BulkDeleteObjectsViewTestCase(ModelViewTestCase):
@@ -810,7 +818,7 @@ class ViewTestCases:
"""
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects_without_permission(self):
pk_list = self.model.objects.values_list('pk', flat=True)[:3]
pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
data = {
'pk': pk_list,
'confirm': True,
@@ -827,7 +835,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects_with_permission(self):
pk_list = self.model.objects.values_list('pk', flat=True)
pk_list = self._get_queryset().values_list('pk', flat=True)
data = {
'pk': pk_list,
'confirm': True,
@@ -844,12 +852,12 @@ class ViewTestCases:
# Try POST with model-level permission
self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
self.assertEqual(self.model.objects.count(), 0)
self.assertEqual(self._get_queryset().count(), 0)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects_with_constrained_permission(self):
initial_count = self.model.objects.count()
pk_list = self.model.objects.values_list('pk', flat=True)
initial_count = self._get_queryset().count()
pk_list = self._get_queryset().values_list('pk', flat=True)
data = {
'pk': pk_list,
'confirm': True,
@@ -867,7 +875,7 @@ class ViewTestCases:
# Attempt to bulk delete non-permitted objects
self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
self.assertEqual(self.model.objects.count(), initial_count)
self.assertEqual(self._get_queryset().count(), initial_count)
# Update permission constraints
obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
@@ -875,10 +883,11 @@ class ViewTestCases:
# Bulk delete permitted objects
self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
self.assertEqual(self.model.objects.count(), 0)
self.assertEqual(self._get_queryset().count(), 0)
class PrimaryObjectViewTestCase(
GetObjectViewTestCase,
GetObjectChangelogViewTestCase,
CreateObjectViewTestCase,
EditObjectViewTestCase,
DeleteObjectViewTestCase,
@@ -893,8 +902,10 @@ class ViewTestCases:
maxDiff = None
class OrganizationalObjectViewTestCase(
GetObjectChangelogViewTestCase,
CreateObjectViewTestCase,
EditObjectViewTestCase,
DeleteObjectViewTestCase,
ListObjectsViewTestCase,
BulkImportObjectsViewTestCase,
BulkDeleteObjectsViewTestCase,
@@ -917,6 +928,8 @@ class ViewTestCases:
maxDiff = None
class DeviceComponentViewTestCase(
GetObjectViewTestCase,
GetObjectChangelogViewTestCase,
EditObjectViewTestCase,
DeleteObjectViewTestCase,
ListObjectsViewTestCase,

View File

@@ -1,4 +1,5 @@
import logging
import re
import sys
from copy import deepcopy
@@ -15,6 +16,7 @@ from django.shortcuts import get_object_or_404, redirect, render
from django.template import loader
from django.template.exceptions import TemplateDoesNotExist
from django.urls import reverse
from django.urls.exceptions import NoReverseMatch
from django.utils.decorators import method_decorator
from django.utils.html import escape
from django.utils.http import is_safe_url
@@ -27,7 +29,7 @@ from django_tables2 import RequestConfig
from extras.models import CustomField, CustomFieldValue, ExportTemplate
from extras.querysets import CustomFieldQueryset
from utilities.exceptions import AbortTransaction
from utilities.forms import BootstrapMixin, CSVDataField, TableConfigForm
from utilities.forms import BootstrapMixin, BulkRenameForm, CSVDataField, TableConfigForm, restrict_form_fields
from utilities.permissions import get_permission_for_model, resolve_permission
from utilities.utils import csv_format, prepare_cloned_fields
from .error_handlers import handle_protectederror
@@ -119,7 +121,7 @@ class ObjectPermissionRequiredMixin(AccessMixin):
return super().dispatch(request, *args, **kwargs)
class GetReturnURLMixin(object):
class GetReturnURLMixin:
"""
Provides logic for determining where a user should be redirected after processing a form.
"""
@@ -134,13 +136,21 @@ class GetReturnURLMixin(object):
return query_param
# Next, check if the object being modified (if any) has an absolute URL.
elif obj is not None and obj.pk and hasattr(obj, 'get_absolute_url'):
if obj is not None and obj.pk and hasattr(obj, 'get_absolute_url'):
return obj.get_absolute_url()
# Fall back to the default URL (if specified) for the view.
elif self.default_return_url is not None:
if self.default_return_url is not None:
return reverse(self.default_return_url)
# Attempt to dynamically resolve the list view for the object
if hasattr(self, 'queryset'):
model_opts = self.queryset.model._meta
try:
return reverse(f'{model_opts.app_label}:{model_opts.model_name}_list')
except NoReverseMatch:
pass
# If all else fails, return home. Ideally this should never happen.
return reverse('home')
@@ -160,6 +170,25 @@ class ObjectView(ObjectPermissionRequiredMixin, View):
def get_required_permission(self):
return get_permission_for_model(self.queryset.model, 'view')
def get_template_name(self):
"""
Return self.template_name if set. Otherwise, resolve the template path by model app_label and name.
"""
if hasattr(self, 'template_name'):
return self.template_name
model_opts = self.queryset.model._meta
return f'{model_opts.app_label}/{model_opts.model_name}.html'
def get(self, request, pk):
"""
Generic GET handler for accessing an object by PK
"""
instance = get_object_or_404(self.queryset, pk=pk)
return render(request, self.get_template_name(), {
'instance': instance,
})
class ObjectListView(ObjectPermissionRequiredMixin, View):
"""
@@ -366,6 +395,7 @@ class ObjectEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
# Parse initial data manually to avoid setting field values as lists
initial_data = {k: request.GET[k] for k in request.GET}
form = self.model_form(instance=obj, initial=initial_data)
restrict_form_fields(form, request.user)
return render(request, self.template_name, {
'obj': obj,
@@ -382,6 +412,7 @@ class ObjectEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
files=request.FILES,
instance=obj
)
restrict_form_fields(form, request.user)
if form.is_valid():
logger.debug("Form validation was successful")
@@ -641,6 +672,7 @@ class ObjectImportView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
# Initialize model form
data = form.cleaned_data['data']
model_form = self.model_form(data)
restrict_form_fields(model_form, request.user)
# Assign default values for any fields which were not specified. We have to do this manually because passing
# 'initial=' to the form on initialization merely sets default values for the widgets. Since widgets are not
@@ -794,6 +826,7 @@ class BulkImportView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
headers, records = form.cleaned_data['csv']
for row, data in enumerate(records, start=1):
obj_form = self.model_form(data, headers=headers)
restrict_form_fields(obj_form, request.user)
if obj_form.is_valid():
obj = self._save_obj(obj_form, request)
@@ -875,6 +908,7 @@ class BulkEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
if '_apply' in request.POST:
form = self.form(model, request.POST)
restrict_form_fields(form, request.user)
if form.is_valid():
logger.debug("Form validation was successful")
@@ -982,6 +1016,7 @@ class BulkEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
initial_data['device_type'] = request.GET.get('device_type')
form = self.form(model, initial=initial_data)
restrict_form_fields(form, request.user)
# Retrieve objects being edited
table = self.table(self.queryset.filter(pk__in=pk_list), orderable=False)
@@ -997,6 +1032,69 @@ class BulkEditView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
})
class BulkRenameView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
"""
An extendable view for renaming objects in bulk.
"""
queryset = None
template_name = 'utilities/obj_bulk_rename.html'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Create a new Form class from BulkRenameForm
class _Form(BulkRenameForm):
pk = ModelMultipleChoiceField(
queryset=self.queryset,
widget=MultipleHiddenInput()
)
self.form = _Form
def get_required_permission(self):
return get_permission_for_model(self.queryset.model, 'change')
def post(self, request):
if '_preview' in request.POST or '_apply' in request.POST:
form = self.form(request.POST, initial={'pk': request.POST.getlist('pk')})
selected_objects = self.queryset.filter(pk__in=form.initial['pk'])
if form.is_valid():
for obj in selected_objects:
find = form.cleaned_data['find']
replace = form.cleaned_data['replace']
if form.cleaned_data['use_regex']:
try:
obj.new_name = re.sub(find, replace, obj.name)
# Catch regex group reference errors
except re.error:
obj.new_name = obj.name
else:
obj.new_name = obj.name.replace(find, replace)
if '_apply' in request.POST:
for obj in selected_objects:
obj.name = obj.new_name
obj.save()
messages.success(request, "Renamed {} {}".format(
len(selected_objects),
self.queryset.model._meta.verbose_name_plural
))
return redirect(self.get_return_url(request))
else:
form = self.form(initial={'pk': request.POST.getlist('pk')})
selected_objects = self.queryset.filter(pk__in=form.initial['pk'])
return render(request, self.template_name, {
'form': form,
'obj_type_plural': self.queryset.model._meta.verbose_name_plural,
'selected_objects': selected_objects,
'return_url': self.get_return_url(request),
})
class BulkDeleteView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
"""
Delete objects in bulk.