Closes #16137: Remove is_staff boolean from User model (#20306)
Some checks are pending
CI / build (20.x, 3.12) (push) Waiting to run
CI / build (20.x, 3.13) (push) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (none, actions) (push) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Waiting to run
CodeQL / Analyze (${{ matrix.language }}) (none, python) (push) Waiting to run

* Closes #16137: Remove is_staff boolean from User model

* Remove default is_staff value from UserManager.create_user()

* Restore staff_only on MenuItem

* Introduce IsSuperuser API permission to replace IsAdminUser

* Update and improve RQ task API view tests

* Remove is_staff attribute assignment from RemoteUserBackend
This commit is contained in:
Jeremy Stretch 2025-09-10 16:51:59 -04:00 committed by GitHub
parent d95eaa7ba2
commit c0e4d1c1e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 220 additions and 129 deletions

View File

@ -2,7 +2,7 @@
## Local Authentication
Local user accounts and groups can be created in NetBox under the "Authentication" section in the "Admin" menu. This section is available only to users with the "staff" permission enabled.
Local user accounts and groups can be created in NetBox under the "Authentication" section in the "Admin" menu.
At a minimum, each user account must have a username and password set. User accounts may also denote a first name, last name, and email address. [Permissions](../permissions.md) may also be assigned to individual users and/or groups as needed.

View File

@ -127,19 +127,3 @@ The list of groups that promote an remote User to Superuser on Login. If group i
Default: `[]` (Empty list)
The list of users that get promoted to Superuser on Login. If user isn't present in list on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
---
## REMOTE_AUTH_STAFF_GROUPS
Default: `[]` (Empty list)
The list of groups that promote an remote User to Staff on Login. If group isn't present on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
---
## REMOTE_AUTH_STAFF_USERS
Default: `[]` (Empty list)
The list of users that get promoted to Staff on Login. If user isn't present in list on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )

View File

@ -121,7 +121,6 @@ AUTH_LDAP_MIRROR_GROUPS = True
# Define special user types using groups. Exercise great caution when assigning superuser status.
AUTH_LDAP_USER_FLAGS_BY_GROUP = {
"is_active": "cn=active,ou=groups,dc=example,dc=com",
"is_staff": "cn=staff,ou=groups,dc=example,dc=com",
"is_superuser": "cn=superuser,ou=groups,dc=example,dc=com"
}
@ -134,7 +133,6 @@ AUTH_LDAP_CACHE_TIMEOUT = 3600
```
* `is_active` - All users must be mapped to at least this group to enable authentication. Without this, users cannot log in.
* `is_staff` - Users mapped to this group are enabled for access to the administration tools; this is the equivalent of checking the "staff status" box on a manually created user. This doesn't grant any specific permissions.
* `is_superuser` - Users mapped to this group will be granted superuser status. Superusers are implicitly granted all permissions.
!!! warning
@ -248,7 +246,6 @@ AUTH_LDAP_MIRROR_GROUPS = True
# Define special user types using groups. Exercise great caution when assigning superuser status.
AUTH_LDAP_USER_FLAGS_BY_GROUP = {
"is_active": "cn=active,ou=groups,dc=example,dc=com",
"is_staff": "cn=staff,ou=groups,dc=example,dc=com",
"is_superuser": "cn=superuser,ou=groups,dc=example,dc=com"
}

View File

@ -64,14 +64,17 @@ item1 = PluginMenuItem(
A `PluginMenuItem` has the following attributes:
| Attribute | Required | Description |
|-----------------|----------|----------------------------------------------------------------------------------------------------------|
| `link` | Yes | Name of the URL path to which this menu item links |
| `link_text` | Yes | The text presented to the user |
| `permissions` | - | A list of permissions required to display this link |
| `auth_required` | - | Display only for authenticated users |
| `staff_only` | - | Display only for users who have `is_staff` set to true (any specified permissions will also be required) |
| `buttons` | - | An iterable of PluginMenuButton instances to include |
| Attribute | Required | Description |
|-----------------|----------|------------------------------------------------------|
| `link` | Yes | Name of the URL path to which this menu item links |
| `link_text` | Yes | The text presented to the user |
| `permissions` | - | A list of permissions required to display this link |
| `auth_required` | - | Display only for authenticated users |
| `staff_only` | - | Display only for superusers |
| `buttons` | - | An iterable of PluginMenuButton instances to include |
!!! note "Changed in NetBox v4.5"
In releases prior to NetBox v4.5, `staff_only` restricted display of a menu item to only users with `is_staff` set to True. In NetBox v4.5, the `is_staff` flag was removed from the user model. Menu items with `staff_only` set to True are now displayed only for superusers.
## Menu Buttons

View File

@ -9,7 +9,6 @@ from drf_spectacular.utils import extend_schema
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from rest_framework.routers import APIRootView
from rest_framework.viewsets import ReadOnlyModelViewSet
@ -24,6 +23,7 @@ from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.pagination import LimitOffsetListPagination
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
from utilities.api import IsSuperuser
from . import serializers
@ -99,7 +99,7 @@ class BaseRQViewSet(viewsets.ViewSet):
"""
Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data().
"""
permission_classes = [IsAdminUser]
permission_classes = [IsSuperuser]
serializer_class = None
def get_data(self):

View File

@ -134,10 +134,7 @@ class BackgroundTaskTestCase(TestCase):
Create a user and token for API calls.
"""
# Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser')
self.user.is_staff = True
self.user.is_active = True
self.user.save()
self.user = User.objects.create_user(username='testuser', is_active=True)
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
@ -150,13 +147,11 @@ class BackgroundTaskTestCase(TestCase):
url = reverse('core-api:rqqueue-list')
# Attempt to load view without permission
self.user.is_staff = False
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_staff = True
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
@ -165,7 +160,16 @@ class BackgroundTaskTestCase(TestCase):
self.assertIn('low', str(response.content))
def test_background_queue(self):
response = self.client.get(reverse('core-api:rqqueue-detail', args=['default']), **self.header)
url = reverse('core-api:rqqueue-detail', args=['default'])
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn('default', str(response.content))
self.assertIn('oldest_job_timestamp', str(response.content))
@ -174,8 +178,16 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task_list(self):
queue = get_queue('default')
queue.enqueue(self.dummy_job_default)
url = reverse('core-api:rqtask-list')
response = self.client.get(reverse('core-api:rqtask-list'), **self.header)
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn('origin', str(response.content))
self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content))
@ -183,8 +195,16 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
url = reverse('core-api:rqtask-detail', args=[job.id])
response = self.client.get(reverse('core-api:rqtask-detail', args=[job.id]), **self.header)
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(job.id), str(response.content))
self.assertIn('origin', str(response.content))
@ -194,45 +214,65 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task_delete(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
url = reverse('core-api:rqtask-delete', args=[job.id])
response = self.client.post(reverse('core-api:rqtask-delete', args=[job.id]), **self.header)
# Attempt to load view without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
queue = get_queue('default')
self.assertNotIn(job.id, queue.job_ids)
def test_background_task_requeue(self):
queue = get_queue('default')
# Enqueue & run a job that will fail
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_failing)
worker = get_worker('default')
with disable_logging():
worker.work(burst=True)
self.assertTrue(job.is_failed)
url = reverse('core-api:rqtask-requeue', args=[job.id])
# Attempt to requeue the job without permission
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 403)
# Re-enqueue the failed job and check that its status has been reset
response = self.client.post(reverse('core-api:rqtask-requeue', args=[job.id]), **self.header)
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 200)
job = RQ_Job.fetch(job.id, queue.connection)
self.assertFalse(job.is_failed)
def test_background_task_enqueue(self):
queue = get_queue('default')
# Enqueue some jobs that each depends on its predecessor
queue = get_queue('default')
job = previous_job = None
for _ in range(0, 3):
job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
previous_job = job
url = reverse('core-api:rqtask-enqueue', args=[job.id])
# Check that the last job to be enqueued has a status of deferred
self.assertIsNotNone(job)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertIsNone(job.enqueued_at)
# Attempt to force-enqueue the job without permission
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 403)
# Force-enqueue the deferred job
response = self.client.post(reverse('core-api:rqtask-enqueue', args=[job.id]), **self.header)
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 200)
# Check that job's status is updated correctly
@ -242,19 +282,27 @@ class BackgroundTaskTestCase(TestCase):
def test_background_task_stop(self):
queue = get_queue('default')
worker = get_worker('default')
job = queue.enqueue(self.dummy_job_default)
worker.prepare_job_execution(job)
url = reverse('core-api:rqtask-stop', args=[job.id])
self.assertEqual(job.get_status(), JobStatus.STARTED)
response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
# Attempt to stop the task without permission
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 403)
# Stop the task
self.user.is_superuser = True
self.user.save()
response = self.client.post(url, **self.header)
self.assertEqual(response.status_code, 200)
with disable_logging():
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(started_job_registry), 0)
# Verify that the task was cancelled
canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(canceled_job_registry), 1)
self.assertIn(job.id, canceled_job_registry)
@ -262,19 +310,34 @@ class BackgroundTaskTestCase(TestCase):
def test_worker_list(self):
worker1 = get_worker('default', name=uuid.uuid4().hex)
worker1.register_birth()
worker2 = get_worker('high')
worker2.register_birth()
url = reverse('core-api:rqworker-list')
response = self.client.get(reverse('core-api:rqworker-list'), **self.header)
# Attempt to fetch the worker list without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Fetch the worker list
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))
def test_worker(self):
worker1 = get_worker('default', name=uuid.uuid4().hex)
worker1.register_birth()
url = reverse('core-api:rqworker-detail', args=[worker1.name])
response = self.client.get(reverse('core-api:rqworker-detail', args=[worker1.name]), **self.header)
# Attempt to fetch a worker without permission
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Fetch the worker
self.user.is_superuser = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))
self.assertIn('birth_date', str(response.content))

View File

@ -158,7 +158,7 @@ class BackgroundTaskTestCase(TestCase):
def setUp(self):
super().setUp()
self.user.is_staff = True
self.user.is_superuser = True
self.user.is_active = True
self.user.save()
@ -171,13 +171,13 @@ class BackgroundTaskTestCase(TestCase):
url = reverse('core:background_queue_list')
# Attempt to load view without permission
self.user.is_staff = False
self.user.is_superuser = False
self.user.save()
response = self.client.get(url)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_staff = True
self.user.is_superuser = True
self.user.save()
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
@ -356,7 +356,7 @@ class SystemTestCase(TestCase):
def setUp(self):
super().setUp()
self.user.is_staff = True
self.user.is_superuser = True
self.user.save()
def test_system_view_default(self):

View File

@ -366,7 +366,7 @@ class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
class BaseRQView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
return self.request.user.is_superuser
class BackgroundQueueListView(TableMixin, BaseRQView):
@ -549,7 +549,7 @@ class WorkerView(BaseRQView):
class SystemView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
return self.request.user.is_superuser
def get(self, request):
@ -632,7 +632,7 @@ class BasePluginView(UserPassesTestMixin, View):
CACHE_KEY_CATALOG_ERROR = 'plugins-catalog-error'
def test_func(self):
return self.request.user.is_staff
return self.request.user.is_superuser
def get_cached_plugins(self, request):
catalog_plugins = {}

View File

@ -184,14 +184,13 @@ class RemoteUserBackend(_RemoteUserBackend):
else:
user.groups.clear()
logger.debug(f"Stripping user {user} from Groups")
# Evaluate superuser status
user.is_superuser = self._is_superuser(user)
logger.debug(f"User {user} is Superuser: {user.is_superuser}")
logger.debug(
f"User {user} should be Superuser: {self._is_superuser(user)}")
user.is_staff = self._is_staff(user)
logger.debug(f"User {user} is Staff: {user.is_staff}")
logger.debug(f"User {user} should be Staff: {self._is_staff(user)}")
user.save()
return user
@ -251,19 +250,8 @@ class RemoteUserBackend(_RemoteUserBackend):
return bool(result)
def _is_staff(self, user):
logger = logging.getLogger('netbox.auth.RemoteUserBackend')
staff_groups = settings.REMOTE_AUTH_STAFF_GROUPS
logger.debug(f"Superuser Groups: {staff_groups}")
staff_users = settings.REMOTE_AUTH_STAFF_USERS
logger.debug(f"Staff Users :{staff_users}")
user_groups = set()
for g in user.groups.all():
user_groups.add(g.name)
logger.debug(f"User {user.username} is in Groups:{user_groups}")
result = user.username in staff_users or (
set(user_groups) & set(staff_groups))
logger.debug(f"User {user.username} in Staff Users :{result}")
return bool(result)
# Retain for pre-v4.5 compatibility
return user.is_superuser
def configure_user(self, request, user):
logger = logging.getLogger('netbox.auth.RemoteUserBackend')

View File

@ -3,12 +3,12 @@ from collections import OrderedDict
from django.apps import apps
from django.urls.exceptions import NoReverseMatch
from drf_spectacular.utils import extend_schema
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework.reverse import reverse
from rest_framework.views import APIView
from netbox.registry import registry
from utilities.api import IsSuperuser
@extend_schema(exclude=True)
@ -16,7 +16,7 @@ class InstalledPluginsAPIView(APIView):
"""
API view for listing all installed plugins
"""
permission_classes = [permissions.IsAdminUser]
permission_classes = [IsSuperuser]
_ignore_model_permissions = True
schema = None

View File

@ -161,8 +161,6 @@ REMOTE_AUTH_SUPERUSERS = getattr(configuration, 'REMOTE_AUTH_SUPERUSERS', [])
REMOTE_AUTH_USER_EMAIL = getattr(configuration, 'REMOTE_AUTH_USER_EMAIL', 'HTTP_REMOTE_USER_EMAIL')
REMOTE_AUTH_USER_FIRST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_FIRST_NAME', 'HTTP_REMOTE_USER_FIRST_NAME')
REMOTE_AUTH_USER_LAST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_LAST_NAME', 'HTTP_REMOTE_USER_LAST_NAME')
REMOTE_AUTH_STAFF_GROUPS = getattr(configuration, 'REMOTE_AUTH_STAFF_GROUPS', [])
REMOTE_AUTH_STAFF_USERS = getattr(configuration, 'REMOTE_AUTH_STAFF_USERS', [])
# Required by extras/migrations/0109_script_models.py
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)

View File

@ -47,9 +47,9 @@ class HomeView(ConditionalLoginRequiredMixin, View):
))
dashboard = get_default_dashboard(config=DEFAULT_DASHBOARD).get_layout()
# Check whether a new release is available. (Only for staff/superusers.)
# Check whether a new release is available. (Only for superusers.)
new_release = None
if request.user.is_staff or request.user.is_superuser:
if request.user.is_superuser:
latest_release = cache.get('latest_release')
if latest_release:
release_version, release_url = latest_release

View File

@ -39,10 +39,6 @@
<th scope="row">{% trans "Superuser" %}</th>
<td>{% checkmark request.user.is_superuser %}</td>
</tr>
<tr>
<th scope="row">{% trans "Staff" %}</th>
<td>{% checkmark request.user.is_staff %}</td>
</tr>
</table>
</div>
</div>

View File

@ -27,8 +27,6 @@
<div class="mt-1 small text-secondary">
{% if request.user.is_superuser %}
{% trans "Admin" %}
{% elif request.user.is_staff %}
{% trans "Staff" %}
{% else %}
{% trans "User" %}
{% endif %}

View File

@ -37,7 +37,7 @@
path. Refer to <a href="{{ docs_url }}">the installation documentation</a> for further guidance.
{% endblocktrans %}
<ul>
{% if request.user.is_staff or request.user.is_superuser %}
{% if request.user.is_superuser %}
<li><code>STATIC_ROOT: <strong>{{ settings.STATIC_ROOT }}</strong></code></li>
{% endif %}
<li><code>STATIC_URL: <strong>{{ settings.STATIC_URL }}</strong></code></li>

View File

@ -35,10 +35,6 @@
<th scope="row">{% trans "Active" %}</th>
<td>{% checkmark object.is_active %}</td>
</tr>
<tr>
<th scope="row">{% trans "Staff" %}</th>
<td>{% checkmark object.is_staff %}</td>
</tr>
<tr>
<th scope="row">{% trans "Superuser" %}</th>
<td>{% checkmark object.is_superuser %}</td>

View File

@ -52,7 +52,7 @@ class UserSerializer(ValidatedModelSerializer):
model = User
fields = (
'id', 'url', 'display_url', 'display', 'username', 'password', 'first_name', 'last_name', 'email',
'is_staff', 'is_active', 'date_joined', 'last_login', 'groups', 'permissions',
'is_active', 'date_joined', 'last_login', 'groups', 'permissions',
)
brief_fields = ('id', 'url', 'display', 'username')
extra_kwargs = {

View File

@ -81,7 +81,7 @@ class UserFilterSet(BaseFilterSet):
class Meta:
model = User
fields = (
'id', 'username', 'first_name', 'last_name', 'email', 'date_joined', 'last_login', 'is_staff', 'is_active',
'id', 'username', 'first_name', 'last_name', 'email', 'date_joined', 'last_login', 'is_active',
'is_superuser',
)

View File

@ -37,11 +37,6 @@ class UserBulkEditForm(BulkEditForm):
widget=BulkEditNullBooleanSelect,
label=_('Active')
)
is_staff = forms.NullBooleanField(
required=False,
widget=BulkEditNullBooleanSelect,
label=_('Staff status')
)
is_superuser = forms.NullBooleanField(
required=False,
widget=BulkEditNullBooleanSelect,
@ -50,7 +45,7 @@ class UserBulkEditForm(BulkEditForm):
model = User
fieldsets = (
FieldSet('first_name', 'last_name', 'is_active', 'is_staff', 'is_superuser'),
FieldSet('first_name', 'last_name', 'is_active', 'is_superuser'),
)
nullable_fields = ('first_name', 'last_name')

View File

@ -23,8 +23,7 @@ class UserImportForm(CSVModelForm):
class Meta:
model = User
fields = (
'username', 'first_name', 'last_name', 'email', 'password', 'is_staff',
'is_active', 'is_superuser'
'username', 'first_name', 'last_name', 'email', 'password', 'is_active', 'is_superuser'
)
def save(self, *args, **kwargs):

View File

@ -29,7 +29,7 @@ class UserFilterForm(NetBoxModelFilterSetForm):
fieldsets = (
FieldSet('q', 'filter_id',),
FieldSet('group_id', name=_('Group')),
FieldSet('is_active', 'is_staff', 'is_superuser', name=_('Status')),
FieldSet('is_active', 'is_superuser', name=_('Status')),
)
group_id = DynamicModelMultipleChoiceField(
queryset=Group.objects.all(),
@ -43,13 +43,6 @@ class UserFilterForm(NetBoxModelFilterSetForm):
),
label=_('Is Active'),
)
is_staff = forms.NullBooleanField(
required=False,
widget=forms.Select(
choices=BOOLEAN_WITH_BLANK_CHOICES
),
label=_('Is Staff'),
)
is_superuser = forms.NullBooleanField(
required=False,
widget=forms.Select(

View File

@ -192,7 +192,7 @@ class UserForm(forms.ModelForm):
fieldsets = (
FieldSet('username', 'password', 'confirm_password', 'first_name', 'last_name', 'email', name=_('User')),
FieldSet('groups', name=_('Groups')),
FieldSet('is_active', 'is_staff', 'is_superuser', name=_('Status')),
FieldSet('is_active', 'is_superuser', name=_('Status')),
FieldSet('object_permissions', name=_('Permissions')),
)
@ -200,7 +200,7 @@ class UserForm(forms.ModelForm):
model = User
fields = [
'username', 'first_name', 'last_name', 'email', 'groups', 'object_permissions',
'is_active', 'is_staff', 'is_superuser',
'is_active', 'is_superuser',
]
def __init__(self, *args, **kwargs):

View File

@ -27,7 +27,6 @@ class UserFilter(BaseObjectTypeFilterMixin):
last_name: FilterLookup[str] | None = strawberry_django.filter_field()
email: FilterLookup[str] | None = strawberry_django.filter_field()
is_superuser: FilterLookup[bool] | None = strawberry_django.filter_field()
is_staff: FilterLookup[bool] | None = strawberry_django.filter_field()
is_active: FilterLookup[bool] | None = strawberry_django.filter_field()
date_joined: DatetimeFilterLookup[datetime] | None = strawberry_django.filter_field()
last_login: DatetimeFilterLookup[datetime] | None = strawberry_django.filter_field()

View File

@ -25,7 +25,7 @@ class GroupType(BaseObjectType):
@strawberry_django.type(
User,
fields=[
'id', 'username', 'first_name', 'last_name', 'email', 'is_staff', 'is_active', 'date_joined', 'groups',
'id', 'username', 'first_name', 'last_name', 'email', 'is_active', 'date_joined', 'groups',
],
filters=UserFilter,
pagination=True

View File

@ -0,0 +1,15 @@
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('users', '0011_concrete_objecttype'),
]
operations = [
migrations.RemoveField(
model_name='user',
name='is_staff',
),
]

View File

@ -1,12 +1,16 @@
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.models import (
AbstractUser,
GroupManager as DjangoGroupManager,
Permission,
PermissionsMixin,
UserManager as DjangoUserManager
)
from django.contrib.auth.validators import UnicodeUsernameValidator
from django.core.exceptions import ValidationError
from django.core.mail import send_mail
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from utilities.querysets import RestrictedQuerySet
@ -68,10 +72,48 @@ class Group(models.Model):
class UserManager(DjangoUserManager.from_queryset(RestrictedQuerySet)):
pass
def create_user(self, username, email=None, password=None, **extra_fields):
extra_fields.setdefault("is_superuser", False)
return self._create_user(username, email, password, **extra_fields)
class User(AbstractUser):
class User(AbstractBaseUser, PermissionsMixin):
username = models.CharField(
_("username"),
max_length=150,
unique=True,
help_text=_("Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only."),
validators=[UnicodeUsernameValidator()],
error_messages={
"unique": _("A user with that username already exists."),
},
)
first_name = models.CharField(
_("first name"),
max_length=150,
blank=True,
)
last_name = models.CharField(
_("last name"),
max_length=150,
blank=True,
)
email = models.EmailField(
_("email address"),
blank=True,
)
is_active = models.BooleanField(
_("active"),
default=True,
help_text=_(
"Designates whether this user should be treated as active. Unselect this instead of deleting accounts."
),
)
date_joined = models.DateTimeField(
_("date joined"),
default=timezone.now,
)
groups = models.ManyToManyField(
to='users.Group',
verbose_name=_('groups'),
@ -87,6 +129,11 @@ class User(AbstractUser):
objects = UserManager()
# Ensure compatibility with Django's stock User model
EMAIL_FIELD = "email"
USERNAME_FIELD = "username"
REQUIRED_FIELDS = ["email"]
class Meta:
ordering = ('username',)
verbose_name = _('user')
@ -98,7 +145,25 @@ class User(AbstractUser):
def clean(self):
super().clean()
# Normalize email address
self.email = self.__class__.objects.normalize_email(self.email)
# Check for any existing Users with names that differ only in case
model = self._meta.model
if model.objects.exclude(pk=self.pk).filter(username__iexact=self.username).exists():
raise ValidationError(_("A user with this username already exists."))
def get_full_name(self):
"""
Return the first_name plus the last_name, with a space in between.
"""
full_name = "%s %s" % (self.first_name, self.last_name)
return full_name.strip()
def get_short_name(self):
"""Return the short name for the user."""
return self.first_name
def email_user(self, subject, message, from_email=None, **kwargs):
"""Send an email to this user."""
send_mail(subject, message, from_email, [self.email], **kwargs)

View File

@ -38,9 +38,6 @@ class UserTable(NetBoxTable):
is_active = columns.BooleanColumn(
verbose_name=_('Is Active'),
)
is_staff = columns.BooleanColumn(
verbose_name=_('Is Staff'),
)
is_superuser = columns.BooleanColumn(
verbose_name=_('Is Superuser'),
)
@ -51,8 +48,8 @@ class UserTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = User
fields = (
'pk', 'id', 'username', 'first_name', 'last_name', 'email', 'groups', 'is_active', 'is_staff',
'is_superuser', 'last_login',
'pk', 'id', 'username', 'first_name', 'last_name', 'email', 'groups', 'is_active', 'is_superuser',
'last_login',
)
default_columns = ('pk', 'username', 'first_name', 'last_name', 'email', 'is_active')

View File

@ -30,7 +30,6 @@ class UserTestCase(TestCase, BaseFilterSetTests):
first_name='Hank',
last_name='Hill',
email='hank@stricklandpropane.com',
is_staff=True,
is_superuser=True
),
User(
@ -98,10 +97,6 @@ class UserTestCase(TestCase, BaseFilterSetTests):
params = {'is_active': True}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_is_staff(self):
params = {'is_staff': True}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_is_superuser(self):
params = {'is_superuser': True}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)

View File

@ -6,6 +6,7 @@ from django.db.models.fields.related import ManyToOneRel, RelatedField
from django.urls import reverse
from django.utils.module_loading import import_string
from django.utils.translation import gettext_lazy as _
from rest_framework.permissions import BasePermission
from rest_framework.serializers import Serializer
from rest_framework.views import get_view_name as drf_get_view_name
@ -16,6 +17,7 @@ from .query import count_related, dict_to_filter_params
from .string import title
__all__ = (
'IsSuperuser',
'get_annotations_for_serializer',
'get_graphql_type_for_model',
'get_prefetches_for_serializer',
@ -27,6 +29,14 @@ __all__ = (
)
class IsSuperuser(BasePermission):
"""
Allows access only to superusers.
"""
def has_permission(self, request, view):
return bool(request.user and request.user.is_superuser)
def get_serializer_for_model(model, prefix=''):
"""
Return the appropriate REST API serializer for the given model.

View File

@ -30,7 +30,7 @@ def nav(context):
continue
if not user.has_perms(item.permissions):
continue
if item.staff_only and not user.is_staff:
if item.staff_only and not user.is_superuser:
continue
buttons = [
button for button in item.buttons if user.has_perms(button.permissions)