mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-26 18:38:38 -06:00
Add Remote Group Support to the RemoteUserAuth Backend and Middleware
This commit is contained in:
parent
19844e81d1
commit
4b1979f741
@ -2,14 +2,17 @@ import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.backends import ModelBackend, RemoteUserBackend as _RemoteUserBackend
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import Group, AnonymousUser
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.db.models import Q
|
||||
|
||||
from users.models import ObjectPermission
|
||||
from utilities.permissions import permission_is_exempt, resolve_permission, resolve_permission_ct
|
||||
|
||||
UserModel = get_user_model()
|
||||
|
||||
|
||||
class ObjectPermissionBackend(ModelBackend):
|
||||
|
||||
@ -94,38 +97,131 @@ class RemoteUserBackend(_RemoteUserBackend):
|
||||
def create_unknown_user(self):
|
||||
return settings.REMOTE_AUTH_AUTO_CREATE_USER
|
||||
|
||||
def configure_user(self, request, user):
|
||||
def configure_groups(self, user, remote_groups):
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||
|
||||
# Assign default groups to the user
|
||||
group_list = []
|
||||
for name in settings.REMOTE_AUTH_DEFAULT_GROUPS:
|
||||
for name in remote_groups:
|
||||
try:
|
||||
group_list.append(Group.objects.get(name=name))
|
||||
except Group.DoesNotExist:
|
||||
logging.error(f"Could not assign group {name} to remotely-authenticated user {user}: Group not found")
|
||||
if group_list:
|
||||
user.groups.add(*group_list)
|
||||
user.groups.set(group_list)
|
||||
logger.debug(f"Assigned groups to remotely-authenticated user {user}: {group_list}")
|
||||
else:
|
||||
user.groups.clear()
|
||||
logger.debug(f"Stripping user {user} from Groups")
|
||||
user.is_superuser = self._is_superuser(user)
|
||||
logger.debug(f"User {user} is Superuser: {user.is_superuser}")
|
||||
logger.debug(f"User {user} should be Superuser: {self._is_superuser(user)}")
|
||||
|
||||
# Assign default object permissions to the user
|
||||
permissions_list = []
|
||||
for permission_name, constraints in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS.items():
|
||||
user.is_staff = self._is_staff(user)
|
||||
logger.debug(f"User {user} is Staff: {user.is_staff}")
|
||||
logger.debug(f"User {user} should be Staff: {self._is_staff(user)}")
|
||||
user.save()
|
||||
return user
|
||||
|
||||
def authenticate(self, request, remote_user, remote_groups=None):
|
||||
"""
|
||||
The username passed as ``remote_user`` is considered trusted. Return
|
||||
the ``User`` object with the given username. Create a new ``User``
|
||||
object if ``create_unknown_user`` is ``True``.
|
||||
Return None if ``create_unknown_user`` is ``False`` and a ``User``
|
||||
object with the given username is not found in the database.
|
||||
"""
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||
logger.debug(f"trying to authenticate {remote_user} with groups {remote_groups}")
|
||||
if not remote_user:
|
||||
return
|
||||
user = None
|
||||
username = self.clean_username(remote_user)
|
||||
|
||||
# Note that this could be accomplished in one try-except clause, but
|
||||
# instead we use get_or_create when creating unknown users since it has
|
||||
# built-in safeguards for multiple threads.
|
||||
if self.create_unknown_user:
|
||||
user, created = UserModel._default_manager.get_or_create(**{
|
||||
UserModel.USERNAME_FIELD: username
|
||||
})
|
||||
if created:
|
||||
user = self.configure_user(request, user)
|
||||
else:
|
||||
try:
|
||||
object_type, action = resolve_permission_ct(permission_name)
|
||||
# TODO: Merge multiple actions into a single ObjectPermission per content type
|
||||
obj_perm = ObjectPermission(actions=[action], constraints=constraints)
|
||||
obj_perm.save()
|
||||
obj_perm.users.add(user)
|
||||
obj_perm.object_types.add(object_type)
|
||||
permissions_list.append(permission_name)
|
||||
except ValueError:
|
||||
logging.error(
|
||||
f"Invalid permission name: '{permission_name}'. Permissions must be in the form "
|
||||
"<app>.<action>_<model>. (Example: dcim.add_site)"
|
||||
)
|
||||
if permissions_list:
|
||||
logger.debug(f"Assigned permissions to remotely-authenticated user {user}: {permissions_list}")
|
||||
user = UserModel._default_manager.get_by_natural_key(username)
|
||||
except UserModel.DoesNotExist:
|
||||
pass
|
||||
if self.user_can_authenticate(user):
|
||||
if remote_groups:
|
||||
if user is not None and not isinstance(user, AnonymousUser):
|
||||
return self.configure_groups(user,remote_groups)
|
||||
else:
|
||||
return None
|
||||
|
||||
def _is_superuser(self, user):
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||
superuser_groups = settings.REMOTE_AUTH_SUPERUSER_GROUPS
|
||||
logger.debug(f"Superuser Groups: {superuser_groups}")
|
||||
superusers = settings.REMOTE_AUTH_SUPERUSERS
|
||||
logger.debug(f"Superuser Users: {superusers}")
|
||||
user_groups = set()
|
||||
for g in user.groups.all():
|
||||
user_groups.add(g.name)
|
||||
logger.debug(f"User {user.username} is in Groups:{user_groups}")
|
||||
|
||||
result = user.username in superusers or (set(user_groups) & set(superuser_groups))
|
||||
logger.debug(f"User {user.username} in Superuser Users :{result}")
|
||||
return bool(result)
|
||||
|
||||
def _is_staff(self, user):
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||
staff_groups = settings.REMOTE_AUTH_STAFF_GROUPS
|
||||
logger.debug(f"Superuser Groups: {staff_groups}")
|
||||
staff_users = settings.REMOTE_AUTH_STAFF_USERS
|
||||
logger.debug(f"Staff Users :{staff_users}")
|
||||
user_groups = set()
|
||||
for g in user.groups.all():
|
||||
user_groups.add(g.name)
|
||||
logger.debug(f"User {user.username} is in Groups:{user_groups}")
|
||||
result = user.username in staff_users or (set(user_groups) & set(staff_groups))
|
||||
logger.debug(f"User {user.username} in Staff Users :{result}")
|
||||
return bool(result)
|
||||
|
||||
def configure_user(self, request, user):
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||
if not settings.REMOTE_AUTH_GROUP_SYNC_ENABLED:
|
||||
# Assign default groups to the user
|
||||
group_list = []
|
||||
for name in settings.REMOTE_AUTH_DEFAULT_GROUPS:
|
||||
try:
|
||||
group_list.append(Group.objects.get(name=name))
|
||||
except Group.DoesNotExist:
|
||||
logging.error(f"Could not assign group {name} to remotely-authenticated user {user}: Group not found")
|
||||
if group_list:
|
||||
user.groups.add(*group_list)
|
||||
logger.debug(f"Assigned groups to remotely-authenticated user {user}: {group_list}")
|
||||
|
||||
# Assign default object permissions to the user
|
||||
permissions_list = []
|
||||
for permission_name, constraints in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS.items():
|
||||
try:
|
||||
object_type, action = resolve_permission_ct(permission_name)
|
||||
# TODO: Merge multiple actions into a single ObjectPermission per content type
|
||||
obj_perm = ObjectPermission(actions=[action], constraints=constraints)
|
||||
obj_perm.save()
|
||||
obj_perm.users.add(user)
|
||||
obj_perm.object_types.add(object_type)
|
||||
permissions_list.append(permission_name)
|
||||
except ValueError:
|
||||
logging.error(
|
||||
f"Invalid permission name: '{permission_name}'. Permissions must be in the form "
|
||||
"<app>.<action>_<model>. (Example: dcim.add_site)"
|
||||
)
|
||||
if permissions_list:
|
||||
logger.debug(f"Assigned permissions to remotely-authenticated user {user}: {permissions_list}")
|
||||
else:
|
||||
logger.debug(f"Skipped initial assignment of permissions and groups to remotely-authenticated user {user} as Group sync is enabled")
|
||||
|
||||
return user
|
||||
|
||||
|
@ -1,8 +1,11 @@
|
||||
import uuid
|
||||
from urllib import parse
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.middleware import RemoteUserMiddleware as RemoteUserMiddleware_
|
||||
from django.contrib import auth
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.db import ProgrammingError
|
||||
from django.http import Http404, HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
@ -45,13 +48,61 @@ class RemoteUserMiddleware(RemoteUserMiddleware_):
|
||||
return settings.REMOTE_AUTH_HEADER
|
||||
|
||||
def process_request(self, request):
|
||||
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserMiddleware')
|
||||
# Bypass middleware if remote authentication is not enabled
|
||||
if not settings.REMOTE_AUTH_ENABLED:
|
||||
return
|
||||
# AuthenticationMiddleware is required so that request.user exists.
|
||||
if not hasattr(request, 'user'):
|
||||
raise ImproperlyConfigured(
|
||||
"The Django remote user auth middleware requires the"
|
||||
" authentication middleware to be installed. Edit your"
|
||||
" MIDDLEWARE setting to insert"
|
||||
" 'django.contrib.auth.middleware.AuthenticationMiddleware'"
|
||||
" before the RemoteUserMiddleware class.")
|
||||
try:
|
||||
username = request.META[self.header]
|
||||
except KeyError:
|
||||
# If specified header doesn't exist then remove any existing
|
||||
# authenticated remote-user, or return (leaving request.user set to
|
||||
# AnonymousUser by the AuthenticationMiddleware).
|
||||
if self.force_logout_if_no_header and request.user.is_authenticated:
|
||||
self._remove_invalid_user(request)
|
||||
return
|
||||
# If the user is already authenticated and that user is the user we are
|
||||
# getting passed in the headers, then the correct user is already
|
||||
# persisted in the session and we don't need to continue.
|
||||
if request.user.is_authenticated:
|
||||
if request.user.get_username() == self.clean_username(username, request):
|
||||
return
|
||||
else:
|
||||
# An authenticated user is associated with the request, but
|
||||
# it does not match the authorized user in the header.
|
||||
self._remove_invalid_user(request)
|
||||
|
||||
return super().process_request(request)
|
||||
# We are seeing this user for the first time in this session, attempt
|
||||
# to authenticate the user.
|
||||
if settings.REMOTE_AUTH_GROUP_SYNC_ENABLED:
|
||||
logger.debug("Trying to sync Groups")
|
||||
user = auth.authenticate(request, remote_user=username, remote_groups=self._get_groups(request))
|
||||
else:
|
||||
user = auth.authenticate(request, remote_user=username)
|
||||
if user:
|
||||
# User is valid. Set request.user and persist user in the session
|
||||
# by logging the user in.
|
||||
request.user = user
|
||||
auth.login(request, user)
|
||||
|
||||
def _get_groups(self, request):
|
||||
logger = logging.getLogger('netbox.authentication.RemoteUserMiddleware')
|
||||
|
||||
groups_string = request.META.get(settings.REMOTE_AUTH_GROUP_HEADER, None)
|
||||
if groups_string:
|
||||
groups = groups_string.split(settings.REMOTE_AUTH_GROUP_SEPERATOR)
|
||||
else:
|
||||
groups = []
|
||||
logger.debug(f"Groups are {groups}")
|
||||
return groups
|
||||
|
||||
class ObjectChangeMiddleware(object):
|
||||
"""
|
||||
|
@ -109,6 +109,13 @@ REMOTE_AUTH_DEFAULT_GROUPS = getattr(configuration, 'REMOTE_AUTH_DEFAULT_GROUPS'
|
||||
REMOTE_AUTH_DEFAULT_PERMISSIONS = getattr(configuration, 'REMOTE_AUTH_DEFAULT_PERMISSIONS', {})
|
||||
REMOTE_AUTH_ENABLED = getattr(configuration, 'REMOTE_AUTH_ENABLED', False)
|
||||
REMOTE_AUTH_HEADER = getattr(configuration, 'REMOTE_AUTH_HEADER', 'HTTP_REMOTE_USER')
|
||||
REMOTE_AUTH_GROUP_HEADER = getattr(configuration, 'REMOTE_AUTH_GROUP_HEADER', 'HTTP_REMOTE_USER_GROUP')
|
||||
REMOTE_AUTH_GROUP_SYNC_ENABLED = getattr(configuration, 'REMOTE_AUTH_GROUP_SYNC_ENABLED', False)
|
||||
REMOTE_AUTH_SUPERUSER_GROUPS = getattr(configuration, 'REMOTE_AUTH_SUPERUSER_GROUPS', [])
|
||||
REMOTE_AUTH_SUPERUSERS = getattr(configuration, 'REMOTE_AUTH_SUPERUSERS', [])
|
||||
REMOTE_AUTH_STAFF_GROUPS = getattr(configuration, 'REMOTE_AUTH_STAFF_GROUPS', [])
|
||||
REMOTE_AUTH_STAFF_USERS = getattr(configuration, 'REMOTE_AUTH_STAFF_USERS', [])
|
||||
REMOTE_AUTH_GROUP_SEPERATOR = getattr(configuration, 'REMOTE_AUTH_GROUP_SEPERATOR', '|')
|
||||
RELEASE_CHECK_URL = getattr(configuration, 'RELEASE_CHECK_URL', None)
|
||||
RELEASE_CHECK_TIMEOUT = getattr(configuration, 'RELEASE_CHECK_TIMEOUT', 24 * 3600)
|
||||
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
|
||||
|
Loading…
Reference in New Issue
Block a user