Add a token prefix
Some checks failed
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled

This commit is contained in:
Jeremy Stretch 2025-10-06 17:04:10 -04:00
parent 82db8a9c02
commit c63e60a62b
10 changed files with 41 additions and 45 deletions

View File

@ -213929,7 +213929,7 @@
}, },
"mark_utilized": { "mark_utilized": {
"type": "boolean", "type": "boolean",
"description": "Report space as 100% utilized" "description": "Report space as fully utilized"
} }
}, },
"required": [ "required": [
@ -214038,7 +214038,7 @@
}, },
"mark_utilized": { "mark_utilized": {
"type": "boolean", "type": "boolean",
"description": "Report space as 100% utilized" "description": "Report space as fully utilized"
} }
}, },
"required": [ "required": [
@ -231032,7 +231032,7 @@
}, },
"mark_utilized": { "mark_utilized": {
"type": "boolean", "type": "boolean",
"description": "Report space as 100% utilized" "description": "Report space as fully utilized"
} }
} }
}, },
@ -251418,7 +251418,7 @@
}, },
"mark_utilized": { "mark_utilized": {
"type": "boolean", "type": "boolean",
"description": "Report space as 100% utilized" "description": "Report space as fully utilized"
} }
}, },
"required": [ "required": [

View File

@ -8,7 +8,7 @@ NetBox's REST API, powered by the [Django REST Framework](https://www.django-res
```no-highlight ```no-highlight
curl -s -X POST \ curl -s -X POST \
-H "Authorization: Token $TOKEN" \ -H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
http://netbox/api/ipam/prefixes/ \ http://netbox/api/ipam/prefixes/ \
--data '{"prefix": "192.0.2.0/24", "site": {"name": "Branch 12"}}' --data '{"prefix": "192.0.2.0/24", "site": {"name": "Branch 12"}}'

View File

@ -682,13 +682,13 @@ It is possible to provision authentication tokens for other users via the REST A
### Authenticating to the API ### Authenticating to the API
An authentication token is included with a request in its `Authorization` header. The format of the header value depends on the version of token in use. v2 tokens use the following form, concatenating the token's key and plaintext value with a period: An authentication token is included with a request in its `Authorization` header. The format of the header value depends on the version of token in use. v2 tokens use the following form, concatenating the token's prefix (`nbt_`) and key with its plaintext value, separated by a period:
``` ```
Authorization: Bearer <key>.<token> Authorization: Bearer nbt_<key>.<token>
``` ```
v1 tokens use the prefix `Token` rather than `Bearer`, and include only the token plaintext. (v1 tokens do not have a key.) Legacy v1 tokens use the prefix `Token` rather than `Bearer`, and include only the token plaintext. (v1 tokens do not have a key.)
``` ```
Authorization: Token <token> Authorization: Token <token>
@ -697,7 +697,7 @@ Authorization: Token <token>
Below is an example REST API request utilizing a v2 token. Below is an example REST API request utilizing a v2 token.
``` ```
$ curl -H "Authorization: Bearer <key>.<token>" \ $ curl -H "Authorization: Bearer nbt_4F9DAouzURLb.zjebxBPzICiPbWz0Wtx0fTL7bCKXKGTYhNzkgC2S" \
-H "Accept: application/json; indent=4" \ -H "Accept: application/json; indent=4" \
https://netbox/api/dcim/sites/ https://netbox/api/dcim/sites/
{ {

View File

@ -8,6 +8,7 @@ from rq.job import Job as RQ_Job, JobStatus
from rq.registry import FailedJobRegistry, StartedJobRegistry from rq.registry import FailedJobRegistry, StartedJobRegistry
from rest_framework import status from rest_framework import status
from users.constants import TOKEN_PREFIX
from users.models import Token, User from users.models import Token, User
from utilities.testing import APITestCase, APIViewTestCases, TestCase from utilities.testing import APITestCase, APIViewTestCases, TestCase
from utilities.testing.utils import disable_logging from utilities.testing.utils import disable_logging
@ -136,7 +137,7 @@ class BackgroundTaskTestCase(TestCase):
# Create the test user and assign permissions # Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser', is_active=True) self.user = User.objects.create_user(username='testuser', is_active=True)
self.token = Token.objects.create(user=self.user) self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Bearer {self.token.key}.{self.token.token}'} self.header = {'HTTP_AUTHORIZATION': f'Bearer {TOKEN_PREFIX}{self.token.key}.{self.token.token}'}
# Clear all queues prior to running each test # Clear all queues prior to running each test
get_queue('default').connection.flushall() get_queue('default').connection.flushall()

View File

@ -8,6 +8,7 @@ from rest_framework.authentication import BaseAuthentication, get_authorization_
from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS
from netbox.config import get_config from netbox.config import get_config
from users.constants import TOKEN_PREFIX
from users.models import Token from users.models import Token
from utilities.request import get_client_ip from utilities.request import get_client_ip
@ -22,40 +23,30 @@ class TokenAuthentication(BaseAuthentication):
model = Token model = Token
def authenticate(self, request): def authenticate(self, request):
# Ignore; Authorization header is not present # Authorization header is not present; ignore
if not (auth := get_authorization_header(request).split()): if not (auth := get_authorization_header(request).split()):
return return
# Unrecognized header; ignore
# Infer token version from Token/Bearer keyword in HTTP header if auth[0].lower() not in (V1_KEYWORD.lower().encode(), V2_KEYWORD.lower().encode()):
if auth[0].lower() == V1_KEYWORD.lower().encode():
version = 1
elif auth[0].lower() == V2_KEYWORD.lower().encode():
version = 2
else:
# Ignore; unrecognized header value
return return
# Check for extraneous token content
# Extract token from authorization header. This should be in one of the following two forms:
# * Authorization: Token <token> (v1)
# * Authorization: Bearer <key>.<token> (v2)
if len(auth) != 2: if len(auth) != 2:
if version == 1: raise exceptions.AuthenticationFailed(
raise exceptions.AuthenticationFailed( 'Invalid authorization header: Must be in the form "Bearer <key>.<token>" or "Token <token>"'
'Invalid authorization header: Must be in the form "Token <token>"' )
)
else:
raise exceptions.AuthenticationFailed(
'Invalid authorization header: Must be in the form "Bearer <key>.<token>"'
)
# Extract the key (if v2) & token plaintext from the auth header # Extract the key (if v2) & token plaintext from the auth header
try: try:
auth_value = auth[1].decode() auth_value = auth[1].decode()
except UnicodeError: except UnicodeError:
raise exceptions.AuthenticationFailed("Invalid authorization header: Token contains invalid characters") raise exceptions.AuthenticationFailed("Invalid authorization header: Token contains invalid characters")
# Infer token version from presence or absence of prefix
version = 2 if auth_value.startswith(TOKEN_PREFIX) else 1
if version == 1: if version == 1:
key, plaintext = None, auth_value key, plaintext = None, auth_value
else: else:
auth_value = auth_value.removeprefix(TOKEN_PREFIX)
try: try:
key, plaintext = auth_value.split('.', 1) key, plaintext = auth_value.split('.', 1)
except ValueError: except ValueError:

View File

@ -8,6 +8,7 @@ from rest_framework.test import APIClient
from core.models import ObjectType from core.models import ObjectType
from dcim.models import Rack, Site from dcim.models import Rack, Site
from users.constants import TOKEN_PREFIX
from users.models import Group, ObjectPermission, Token, User from users.models import Group, ObjectPermission, Token, User
from utilities.testing import TestCase from utilities.testing import TestCase
from utilities.testing.api import APITestCase from utilities.testing.api import APITestCase
@ -49,7 +50,7 @@ class TokenAuthenticationTestCase(APITestCase):
token = Token.objects.create(version=2, user=self.user) token = Token.objects.create(version=2, user=self.user)
# Valid token should return a 200 # Valid token should return a 200
header = f'Bearer {token.key}.{token.token}' header = f'Bearer {TOKEN_PREFIX}{token.key}.{token.token}'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header) response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 200, response.data) self.assertEqual(response.status_code, 200, response.data)
@ -60,7 +61,7 @@ class TokenAuthenticationTestCase(APITestCase):
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v2_token_invalid(self): def test_v2_token_invalid(self):
# Invalid token should return a 403 # Invalid token should return a 403
header = 'Bearer XXXXXXXXXX.XXXXXXXXXX' header = f'Bearer {TOKEN_PREFIX}XXXXXX.XXXXXXXXXX'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header) response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
self.assertEqual(response.data['detail'], "Invalid v2 token") self.assertEqual(response.data['detail'], "Invalid v2 token")
@ -77,7 +78,7 @@ class TokenAuthenticationTestCase(APITestCase):
# Request with a non-expired token should succeed # Request with a non-expired token should succeed
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.token}') response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.token}')
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}') response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}')
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
# Request with an expired token should fail # Request with an expired token should fail
@ -88,7 +89,7 @@ class TokenAuthenticationTestCase(APITestCase):
token2.save() token2.save()
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.key}') response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.key}')
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {token2.key}') response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}')
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
@ -111,7 +112,7 @@ class TokenAuthenticationTestCase(APITestCase):
token2 = Token.objects.create(version=2, user=self.user, write_enabled=False) token2 = Token.objects.create(version=2, user=self.user, write_enabled=False)
token1_header = f'Token {token1.token}' token1_header = f'Token {token1.token}'
token2_header = f'Bearer {token2.key}.{token2.token}' token2_header = f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}'
# GET request with a write-disabled token should succeed # GET request with a write-disabled token should succeed
response = self.client.get(url, HTTP_AUTHORIZATION=token1_header) response = self.client.get(url, HTTP_AUTHORIZATION=token1_header)
@ -152,7 +153,7 @@ class TokenAuthenticationTestCase(APITestCase):
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
response = self.client.get( response = self.client.get(
url, url,
HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}', HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}',
REMOTE_ADDR='127.0.0.1' REMOTE_ADDR='127.0.0.1'
) )
self.assertEqual(response.status_code, 403) self.assertEqual(response.status_code, 403)
@ -166,7 +167,7 @@ class TokenAuthenticationTestCase(APITestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
response = self.client.get( response = self.client.get(
url, url,
HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}', HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}',
REMOTE_ADDR='192.0.2.1' REMOTE_ADDR='192.0.2.1'
) )
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@ -519,7 +520,7 @@ class ObjectPermissionAPIViewTestCase(TestCase):
""" """
self.user = User.objects.create(username='testuser') self.user = User.objects.create(username='testuser')
self.token = Token.objects.create(user=self.user) self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Bearer {self.token.key}.{self.token.token}'} self.header = {'HTTP_AUTHORIZATION': f'Bearer {TOKEN_PREFIX}{self.token.key}.{self.token.token}'}
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object(self): def test_get_object(self):

View File

@ -11,6 +11,7 @@ OBJECTPERMISSION_OBJECT_TYPES = Q(
CONSTRAINT_TOKEN_USER = '$user' CONSTRAINT_TOKEN_USER = '$user'
# API tokens # API tokens
TOKEN_KEY_LENGTH = 16 TOKEN_PREFIX = 'nbt_' # Used for v2 tokens only
TOKEN_KEY_LENGTH = 12
TOKEN_DEFAULT_LENGTH = 40 TOKEN_DEFAULT_LENGTH = 40
TOKEN_CHARSET = string.ascii_letters + string.digits TOKEN_CHARSET = string.ascii_letters + string.digits

View File

@ -56,10 +56,10 @@ class Migration(migrations.Migration):
name='key', name='key',
field=models.CharField( field=models.CharField(
blank=True, blank=True,
max_length=16, max_length=12,
null=True, null=True,
unique=True, unique=True,
validators=[django.core.validators.MinLengthValidator(16)] validators=[django.core.validators.MinLengthValidator(12)]
), ),
), ),
migrations.AddField( migrations.AddField(

View File

@ -15,7 +15,7 @@ from netaddr import IPNetwork
from ipam.fields import IPNetworkField from ipam.fields import IPNetworkField
from users.choices import TokenVersionChoices from users.choices import TokenVersionChoices
from users.constants import TOKEN_CHARSET, TOKEN_DEFAULT_LENGTH, TOKEN_KEY_LENGTH from users.constants import TOKEN_CHARSET, TOKEN_DEFAULT_LENGTH, TOKEN_KEY_LENGTH, TOKEN_PREFIX
from users.utils import get_current_pepper from users.utils import get_current_pepper
from utilities.querysets import RestrictedQuerySet from utilities.querysets import RestrictedQuerySet
@ -235,6 +235,7 @@ class Token(models.Model):
if self.v1: if self.v1:
return token == self.token return token == self.token
if self.v2: if self.v2:
token = token.removeprefix(TOKEN_PREFIX)
try: try:
pepper = settings.API_TOKEN_PEPPERS[self.pepper_id] pepper = settings.API_TOKEN_PEPPERS[self.pepper_id]
except KeyError: except KeyError:

View File

@ -17,6 +17,7 @@ from core.choices import ObjectChangeActionChoices
from core.models import ObjectChange, ObjectType from core.models import ObjectChange, ObjectType
from ipam.graphql.types import IPAddressFamilyType from ipam.graphql.types import IPAddressFamilyType
from netbox.models.features import ChangeLoggingMixin from netbox.models.features import ChangeLoggingMixin
from users.constants import TOKEN_PREFIX
from users.models import ObjectPermission, Token, User from users.models import ObjectPermission, Token, User
from utilities.api import get_graphql_type_for_model from utilities.api import get_graphql_type_for_model
from .base import ModelTestCase from .base import ModelTestCase
@ -50,7 +51,7 @@ class APITestCase(ModelTestCase):
self.user = User.objects.create_user(username='testuser') self.user = User.objects.create_user(username='testuser')
self.add_permissions(*self.user_permissions) self.add_permissions(*self.user_permissions)
self.token = Token.objects.create(user=self.user) self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Bearer {self.token.key}.{self.token.token}'} self.header = {'HTTP_AUTHORIZATION': f'Bearer {TOKEN_PREFIX}{self.token.key}.{self.token.token}'}
def _get_view_namespace(self): def _get_view_namespace(self):
return f'{self.view_namespace or self.model._meta.app_label}-api' return f'{self.view_namespace or self.model._meta.app_label}-api'