mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-22 20:12:00 -06:00
Implement ObjectPermissionManager
This commit is contained in:
parent
06aca2e1d5
commit
63f842c7db
@ -194,12 +194,13 @@ class SiteListView(ObjectPermissionRequiredMixin, ObjectListView):
|
||||
table = tables.SiteTable
|
||||
|
||||
|
||||
class SiteView(PermissionRequiredMixin, View):
|
||||
class SiteView(ObjectPermissionRequiredMixin, View):
|
||||
permission_required = 'dcim.view_site'
|
||||
queryset = Site.objects.prefetch_related('region', 'tenant__group')
|
||||
|
||||
def get(self, request, slug):
|
||||
|
||||
site = get_object_or_404(Site.objects.prefetch_related('region', 'tenant__group'), slug=slug)
|
||||
site = get_object_or_404(self.queryset, slug=slug)
|
||||
stats = {
|
||||
'rack_count': Rack.objects.filter(site=site).count(),
|
||||
'device_count': Device.objects.filter(site=site).count(),
|
||||
@ -219,7 +220,7 @@ class SiteView(PermissionRequiredMixin, View):
|
||||
})
|
||||
|
||||
|
||||
class SiteCreateView(PermissionRequiredMixin, ObjectEditView):
|
||||
class SiteCreateView(ObjectPermissionRequiredMixin, ObjectEditView):
|
||||
permission_required = 'dcim.add_site'
|
||||
queryset = Site.objects.all()
|
||||
model_form = forms.SiteForm
|
||||
@ -231,7 +232,7 @@ class SiteEditView(SiteCreateView):
|
||||
permission_required = 'dcim.change_site'
|
||||
|
||||
|
||||
class SiteDeleteView(PermissionRequiredMixin, ObjectDeleteView):
|
||||
class SiteDeleteView(ObjectPermissionRequiredMixin, ObjectDeleteView):
|
||||
permission_required = 'dcim.delete_site'
|
||||
queryset = Site.objects.all()
|
||||
default_return_url = 'dcim:site_list'
|
||||
|
@ -14,22 +14,14 @@ class ObjectPermissionRequiredMixin(AccessMixin):
|
||||
if self.request.user.has_perm(self.permission_required):
|
||||
return True
|
||||
|
||||
# If not, check for an object-level permission
|
||||
# If not, check for object-level permissions
|
||||
app, codename = self.permission_required.split('.')
|
||||
action, model_name = codename.split('_')
|
||||
model = self.queryset.model
|
||||
obj_permissions = ObjectPermission.objects.filter(
|
||||
Q(users=self.request.user) | Q(groups__user=self.request.user),
|
||||
model=ContentType.objects.get_for_model(model),
|
||||
**{f'can_{action}': True}
|
||||
)
|
||||
if obj_permissions:
|
||||
|
||||
attrs = ObjectPermission.objects.get_attr_constraints(self.request.user, model, action)
|
||||
if attrs:
|
||||
# Update the view's QuerySet to filter only the permitted objects
|
||||
# TODO: Do this more efficiently
|
||||
for perm in obj_permissions:
|
||||
self.queryset = self.queryset.filter(**perm.attrs)
|
||||
|
||||
self.queryset = self.queryset.filter(**attrs)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
@ -7,6 +7,7 @@ from django.contrib.postgres.fields import JSONField
|
||||
from django.core.exceptions import FieldError, ValidationError
|
||||
from django.core.validators import MinLengthValidator
|
||||
from django.db import models
|
||||
from django.db.models import Q
|
||||
from django.db.models.signals import post_save
|
||||
from django.dispatch import receiver
|
||||
from django.utils import timezone
|
||||
@ -194,6 +195,38 @@ class Token(models.Model):
|
||||
return True
|
||||
|
||||
|
||||
class ObjectPermissionManager(models.Manager):
|
||||
|
||||
def get_attr_constraints(self, user, model, action):
|
||||
"""
|
||||
Compile all ObjectPermission attributes applicable to a specific combination of user, model, and action. Returns
|
||||
a dictionary that can be passed directly to .filter() on a QuerySet.
|
||||
"""
|
||||
assert action in ['view', 'add', 'change', 'delete'], f"Invalid action: {action}"
|
||||
|
||||
qs = self.get_queryset().filter(
|
||||
Q(users=user) | Q(groups__user=user),
|
||||
model=ContentType.objects.get_for_model(model),
|
||||
**{f'can_{action}': True}
|
||||
)
|
||||
|
||||
attrs = {}
|
||||
for perm in qs:
|
||||
attrs.update(perm.attrs)
|
||||
|
||||
return attrs
|
||||
|
||||
def validate_queryset(self, queryset, user, action):
|
||||
"""
|
||||
Check that the specified user has permission to perform the specified action on all objects in the QuerySet.
|
||||
"""
|
||||
assert action in ['view', 'add', 'change', 'delete'], f"Invalid action: {action}"
|
||||
|
||||
model = queryset.model
|
||||
attrs = self.get_attr_constraints(user, model, action)
|
||||
return queryset.count() == model.objects.filter(**attrs).count()
|
||||
|
||||
|
||||
class ObjectPermission(models.Model):
|
||||
"""
|
||||
A mapping of view, add, change, and/or delete permission for users and/or groups to an arbitrary set of objects
|
||||
@ -229,6 +262,8 @@ class ObjectPermission(models.Model):
|
||||
default=False
|
||||
)
|
||||
|
||||
objects = ObjectPermissionManager()
|
||||
|
||||
class Meta:
|
||||
unique_together = ('model', 'attrs')
|
||||
|
||||
|
@ -56,21 +56,12 @@ class ObjectPermissionBackend(ModelBackend):
|
||||
if model._meta.model_name != model_name:
|
||||
raise ValueError(f"Invalid permission {perm} for model {model}")
|
||||
|
||||
# Retrieve user's permissions for this model
|
||||
# This can probably be cached
|
||||
obj_permissions = ObjectPermission.objects.filter(
|
||||
Q(users=user_obj) | Q(groups__user=user_obj),
|
||||
model=ContentType.objects.get_for_model(obj),
|
||||
**{f'can_{action}': True}
|
||||
)
|
||||
|
||||
for perm in obj_permissions:
|
||||
|
||||
# Attempt to retrieve the model from the database using the
|
||||
# attributes defined in the ObjectPermission. If we have a
|
||||
# match, assert that the user has permission.
|
||||
if model.objects.filter(pk=obj.pk, **perm.attrs).exists():
|
||||
return True
|
||||
# Attempt to retrieve the model from the database using the
|
||||
# attributes defined in the ObjectPermission. If we have a
|
||||
# match, assert that the user has permission.
|
||||
attrs = ObjectPermission.objects.get_attr_constraints(user_obj, obj, action)
|
||||
if model.objects.filter(pk=obj.pk, **attrs).exists():
|
||||
return True
|
||||
|
||||
|
||||
class RemoteUserBackend(ViewExemptModelBackend, RemoteUserBackend_):
|
||||
|
@ -4,7 +4,7 @@ from copy import deepcopy
|
||||
|
||||
from django.contrib import messages
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import FieldDoesNotExist, ValidationError
|
||||
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist, ValidationError
|
||||
from django.db import transaction, IntegrityError
|
||||
from django.db.models import ManyToManyField, ProtectedError
|
||||
from django.forms import Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea
|
||||
@ -23,6 +23,7 @@ from django_tables2 import RequestConfig
|
||||
|
||||
from extras.models import CustomField, CustomFieldValue, ExportTemplate
|
||||
from extras.querysets import CustomFieldQueryset
|
||||
from users.models import ObjectPermission
|
||||
from utilities.exceptions import AbortTransaction
|
||||
from utilities.forms import BootstrapMixin, CSVDataField, TableConfigForm
|
||||
from utilities.utils import csv_format, prepare_cloned_fields
|
||||
@ -262,32 +263,43 @@ class ObjectEditView(GetReturnURLMixin, View):
|
||||
if form.is_valid():
|
||||
logger.debug("Form validation was successful")
|
||||
|
||||
obj = form.save()
|
||||
msg = '{} {}'.format(
|
||||
'Created' if not form.instance.pk else 'Modified',
|
||||
self.queryset.model._meta.verbose_name
|
||||
)
|
||||
logger.info(f"{msg} {obj} (PK: {obj.pk})")
|
||||
if hasattr(obj, 'get_absolute_url'):
|
||||
msg = '{} <a href="{}">{}</a>'.format(msg, obj.get_absolute_url(), escape(obj))
|
||||
else:
|
||||
msg = '{} {}'.format(msg, escape(obj))
|
||||
messages.success(request, mark_safe(msg))
|
||||
try:
|
||||
with transaction.atomic():
|
||||
obj = form.save()
|
||||
|
||||
if '_addanother' in request.POST:
|
||||
# Check that the new object conforms with any assigned object-level permissions
|
||||
self.queryset.get(pk=obj.pk)
|
||||
|
||||
# If the object has clone_fields, pre-populate a new instance of the form
|
||||
if hasattr(obj, 'clone_fields'):
|
||||
url = '{}?{}'.format(request.path, prepare_cloned_fields(obj))
|
||||
return redirect(url)
|
||||
msg = '{} {}'.format(
|
||||
'Created' if not form.instance.pk else 'Modified',
|
||||
self.queryset.model._meta.verbose_name
|
||||
)
|
||||
logger.info(f"{msg} {obj} (PK: {obj.pk})")
|
||||
if hasattr(obj, 'get_absolute_url'):
|
||||
msg = '{} <a href="{}">{}</a>'.format(msg, obj.get_absolute_url(), escape(obj))
|
||||
else:
|
||||
msg = '{} {}'.format(msg, escape(obj))
|
||||
messages.success(request, mark_safe(msg))
|
||||
|
||||
return redirect(request.get_full_path())
|
||||
if '_addanother' in request.POST:
|
||||
|
||||
return_url = form.cleaned_data.get('return_url')
|
||||
if return_url is not None and is_safe_url(url=return_url, allowed_hosts=request.get_host()):
|
||||
return redirect(return_url)
|
||||
else:
|
||||
return redirect(self.get_return_url(request, obj))
|
||||
# If the object has clone_fields, pre-populate a new instance of the form
|
||||
if hasattr(obj, 'clone_fields'):
|
||||
url = '{}?{}'.format(request.path, prepare_cloned_fields(obj))
|
||||
return redirect(url)
|
||||
|
||||
return redirect(request.get_full_path())
|
||||
|
||||
return_url = form.cleaned_data.get('return_url')
|
||||
if return_url is not None and is_safe_url(url=return_url, allowed_hosts=request.get_host()):
|
||||
return redirect(return_url)
|
||||
else:
|
||||
return redirect(self.get_return_url(request, obj))
|
||||
|
||||
except ObjectDoesNotExist:
|
||||
logger.debug("Object save failed due to object-level permissions violation")
|
||||
# TODO: Link user to personal permissions view
|
||||
form.add_error(None, "Object save failed due to object-level permissions violation")
|
||||
|
||||
else:
|
||||
logger.debug("Form validation failed")
|
||||
|
Loading…
Reference in New Issue
Block a user