Merge pull request #9590 from netbox-community/8233-api-token-ip

Closes #8233: Restrict API tokens by source IP
This commit is contained in:
Jeremy Stretch 2022-06-23 08:29:45 -04:00 committed by GitHub
commit f563ba7a9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 221 additions and 16 deletions

View File

@ -9,4 +9,4 @@ Each token contains a 160-bit key represented as 40 hexadecimal characters. When
By default, a token can be used to perform all actions via the API that a user would be permitted to do via the web UI. Deselecting the "write enabled" option will restrict API requests made with the token to read operations (e.g. GET) only. By default, a token can be used to perform all actions via the API that a user would be permitted to do via the web UI. Deselecting the "write enabled" option will restrict API requests made with the token to read operations (e.g. GET) only.
Additionally, a token can be set to expire at a specific time. This can be useful if an external client needs to be granted temporary access to NetBox. Additionally, a token can be set to expire at a specific time. This can be useful if an external client needs to be granted temporary access to NetBox. Tokens can also be restricted by IP range: If defined, authentication for API clients connecting from an IP address outside these ranges will fail.

View File

@ -13,6 +13,8 @@
#### PoE Interface Attributes ([#1099](https://github.com/netbox-community/netbox/issues/1099)) #### PoE Interface Attributes ([#1099](https://github.com/netbox-community/netbox/issues/1099))
#### Restrict API Tokens by Client IP ([#8233](https://github.com/netbox-community/netbox/issues/8233))
### Enhancements ### Enhancements
* [#1202](https://github.com/netbox-community/netbox/issues/1202) - Support overlapping assignment of NAT IP addresses * [#1202](https://github.com/netbox-community/netbox/issues/1202) - Support overlapping assignment of NAT IP addresses

View File

@ -1,4 +1,4 @@
from .fields import ChoiceField, ContentTypeField, SerializedPKRelatedField from .fields import *
from .routers import NetBoxRouter from .routers import NetBoxRouter
from .serializers import BulkOperationSerializer, ValidatedModelSerializer, WritableNestedSerializer from .serializers import BulkOperationSerializer, ValidatedModelSerializer, WritableNestedSerializer
@ -7,6 +7,7 @@ __all__ = (
'BulkOperationSerializer', 'BulkOperationSerializer',
'ChoiceField', 'ChoiceField',
'ContentTypeField', 'ContentTypeField',
'IPNetworkSerializer',
'NetBoxRouter', 'NetBoxRouter',
'SerializedPKRelatedField', 'SerializedPKRelatedField',
'ValidatedModelSerializer', 'ValidatedModelSerializer',

View File

@ -3,14 +3,36 @@ from rest_framework import authentication, exceptions
from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS
from users.models import Token from users.models import Token
from utilities.request import get_client_ip
class TokenAuthentication(authentication.TokenAuthentication): class TokenAuthentication(authentication.TokenAuthentication):
""" """
A custom authentication scheme which enforces Token expiration times. A custom authentication scheme which enforces Token expiration times and source IP restrictions.
""" """
model = Token model = Token
def authenticate(self, request):
result = super().authenticate(request)
if result:
token = result[1]
# Enforce source IP restrictions (if any) set on the token
if token.allowed_ips:
client_ip = get_client_ip(request)
if client_ip is None:
raise exceptions.AuthenticationFailed(
"Client IP address could not be determined for validation. Check that the HTTP server is "
"correctly configured to pass the required header(s)."
)
if not token.validate_client_ip(client_ip):
raise exceptions.AuthenticationFailed(
f"Source IP {client_ip} is not permitted to authenticate using this token."
)
return result
def authenticate_credentials(self, key): def authenticate_credentials(self, key):
model = self.get_model() model = self.get_model()
try: try:

View File

@ -1,12 +1,18 @@
from collections import OrderedDict from collections import OrderedDict
import pytz
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from netaddr import IPNetwork
from rest_framework import serializers from rest_framework import serializers
from rest_framework.exceptions import ValidationError from rest_framework.exceptions import ValidationError
from rest_framework.relations import PrimaryKeyRelatedField, RelatedField from rest_framework.relations import PrimaryKeyRelatedField, RelatedField
__all__ = (
'ChoiceField',
'ContentTypeField',
'IPNetworkSerializer',
'SerializedPKRelatedField',
)
class ChoiceField(serializers.Field): class ChoiceField(serializers.Field):
""" """
@ -104,6 +110,17 @@ class ContentTypeField(RelatedField):
return f"{obj.app_label}.{obj.model}" return f"{obj.app_label}.{obj.model}"
class IPNetworkSerializer(serializers.Serializer):
"""
Representation of an IP network value (e.g. 192.0.2.0/24).
"""
def to_representation(self, instance):
return str(instance)
def to_internal_value(self, value):
return IPNetwork(value)
class SerializedPKRelatedField(PrimaryKeyRelatedField): class SerializedPKRelatedField(PrimaryKeyRelatedField):
""" """
Extends PrimaryKeyRelatedField to return a serialized object on read. This is useful for representing related Extends PrimaryKeyRelatedField to return a serialized object on read. This is useful for representing related

View File

@ -1,3 +1,5 @@
import datetime
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import Group, User from django.contrib.auth.models import Group, User
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
@ -8,10 +10,73 @@ from netaddr import IPNetwork
from rest_framework.test import APIClient from rest_framework.test import APIClient
from dcim.models import Site from dcim.models import Site
from ipam.choices import PrefixStatusChoices
from ipam.models import Prefix from ipam.models import Prefix
from users.models import ObjectPermission, Token from users.models import ObjectPermission, Token
from utilities.testing import TestCase from utilities.testing import TestCase
from utilities.testing.api import APITestCase
class TokenAuthenticationTestCase(APITestCase):
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_authentication(self):
url = reverse('dcim-api:site-list')
# Request without a token should return a 403
response = self.client.get(url)
self.assertEqual(response.status_code, 403)
# Valid token should return a 200
token = Token.objects.create(user=self.user)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 200)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_expiration(self):
url = reverse('dcim-api:site-list')
# Request without a non-expired token should succeed
token = Token.objects.create(user=self.user)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 200)
# Request with an expired token should fail
token.expires = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
token.save()
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_write_enabled(self):
url = reverse('dcim-api:site-list')
data = {
'name': 'Site 1',
'slug': 'site-1',
}
# Request with a write-disabled token should fail
token = Token.objects.create(user=self.user, write_enabled=False)
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 403)
# Request with a write-enabled token should succeed
token.write_enabled = True
token.save()
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_allowed_ips(self):
url = reverse('dcim-api:site-list')
# Request from a non-allowed client IP should fail
token = Token.objects.create(user=self.user, allowed_ips=['192.0.2.0/24'])
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='127.0.0.1')
self.assertEqual(response.status_code, 403)
# Request with an expired token should fail
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='192.0.2.1')
self.assertEqual(response.status_code, 200)
class ExternalAuthenticationTestCase(TestCase): class ExternalAuthenticationTestCase(TestCase):

View File

@ -22,11 +22,11 @@
</div> </div>
<div class="card-body"> <div class="card-body">
<div class="row"> <div class="row">
<div class="col col-md-4"> <div class="col col-md-3">
<small class="text-muted">Created</small><br /> <small class="text-muted">Created</small><br />
{{ token.created|annotated_date }} {{ token.created|annotated_date }}
</div> </div>
<div class="col col-md-4"> <div class="col col-md-3">
<small class="text-muted">Expires</small><br /> <small class="text-muted">Expires</small><br />
{% if token.expires %} {% if token.expires %}
{{ token.expires|annotated_date }} {{ token.expires|annotated_date }}
@ -34,7 +34,7 @@
<span>Never</span> <span>Never</span>
{% endif %} {% endif %}
</div> </div>
<div class="col col-md-4"> <div class="col col-md-3">
<small class="text-muted">Create/Edit/Delete Operations</small><br /> <small class="text-muted">Create/Edit/Delete Operations</small><br />
{% if token.write_enabled %} {% if token.write_enabled %}
<span class="badge bg-success">Enabled</span> <span class="badge bg-success">Enabled</span>
@ -42,7 +42,14 @@
<span class="badge bg-danger">Disabled</span> <span class="badge bg-danger">Disabled</span>
{% endif %} {% endif %}
</div> </div>
</div> <div class="col col-md-3">
<small class="text-muted">Allowed Source IPs</small><br />
{% if token.allowed_ips %}
{{ token.allowed_ips|join:', ' }}
{% else %}
<span>Any</span>
{% endif %}
</div> </div>
{% if token.description %} {% if token.description %}
<br /><span>{{ token.description }}</span> <br /><span>{{ token.description }}</span>
{% endif %} {% endif %}

View File

@ -58,9 +58,13 @@ class UserAdmin(UserAdmin_):
class TokenAdmin(admin.ModelAdmin): class TokenAdmin(admin.ModelAdmin):
form = forms.TokenAdminForm form = forms.TokenAdminForm
list_display = [ list_display = [
'key', 'user', 'created', 'expires', 'write_enabled', 'description' 'key', 'user', 'created', 'expires', 'write_enabled', 'description', 'list_allowed_ips'
] ]
def list_allowed_ips(self, obj):
return obj.allowed_ips or 'Any'
list_allowed_ips.short_description = "Allowed IPs"
# #
# Permissions # Permissions

View File

@ -51,7 +51,7 @@ class TokenAdminForm(forms.ModelForm):
class Meta: class Meta:
fields = [ fields = [
'user', 'key', 'write_enabled', 'expires', 'description' 'user', 'key', 'write_enabled', 'expires', 'description', 'allowed_ips'
] ]
model = Token model = Token

View File

@ -2,7 +2,7 @@ from django.contrib.auth.models import Group, User
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from rest_framework import serializers from rest_framework import serializers
from netbox.api import ContentTypeField, SerializedPKRelatedField, ValidatedModelSerializer from netbox.api import ContentTypeField, IPNetworkSerializer, SerializedPKRelatedField, ValidatedModelSerializer
from users.models import ObjectPermission, Token from users.models import ObjectPermission, Token
from .nested_serializers import * from .nested_serializers import *
@ -64,10 +64,19 @@ class TokenSerializer(ValidatedModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='users-api:token-detail') url = serializers.HyperlinkedIdentityField(view_name='users-api:token-detail')
key = serializers.CharField(min_length=40, max_length=40, allow_blank=True, required=False) key = serializers.CharField(min_length=40, max_length=40, allow_blank=True, required=False)
user = NestedUserSerializer() user = NestedUserSerializer()
allowed_ips = serializers.ListField(
child=IPNetworkSerializer(),
required=False,
allow_empty=True,
default=[]
)
class Meta: class Meta:
model = Token model = Token
fields = ('id', 'url', 'display', 'user', 'created', 'expires', 'key', 'write_enabled', 'description') fields = (
'id', 'url', 'display', 'user', 'created', 'expires', 'key', 'write_enabled', 'description',
'allowed_ips',
)
def to_internal_value(self, data): def to_internal_value(self, data):
if 'key' not in data: if 'key' not in data:

View File

@ -1,7 +1,9 @@
from django import forms from django import forms
from django.contrib.auth.forms import AuthenticationForm, PasswordChangeForm as DjangoPasswordChangeForm from django.contrib.auth.forms import AuthenticationForm, PasswordChangeForm as DjangoPasswordChangeForm
from django.contrib.postgres.forms import SimpleArrayField
from django.utils.html import mark_safe from django.utils.html import mark_safe
from ipam.formfields import IPNetworkFormField
from netbox.preferences import PREFERENCES from netbox.preferences import PREFERENCES
from utilities.forms import BootstrapMixin, DateTimePicker, StaticSelect from utilities.forms import BootstrapMixin, DateTimePicker, StaticSelect
from utilities.utils import flatten_dict from utilities.utils import flatten_dict
@ -99,11 +101,18 @@ class TokenForm(BootstrapMixin, forms.ModelForm):
required=False, required=False,
help_text="If no key is provided, one will be generated automatically." help_text="If no key is provided, one will be generated automatically."
) )
allowed_ips = SimpleArrayField(
base_field=IPNetworkFormField(),
required=False,
label='Allowed IPs',
help_text='Allowed IPv4/IPv6 networks from where the token can be used. Leave blank for no restrictions. '
'Ex: "10.1.1.0/24, 192.168.10.16/32, 2001:DB8:1::/64"',
)
class Meta: class Meta:
model = Token model = Token
fields = [ fields = [
'key', 'write_enabled', 'expires', 'description', 'key', 'write_enabled', 'expires', 'description', 'allowed_ips',
] ]
widgets = { widgets = {
'expires': DateTimePicker(), 'expires': DateTimePicker(),

View File

@ -0,0 +1,20 @@
# Generated by Django 3.2.12 on 2022-04-19 12:37
import django.contrib.postgres.fields
from django.db import migrations
import ipam.fields
class Migration(migrations.Migration):
dependencies = [
('users', '0002_standardize_id_fields'),
]
operations = [
migrations.AddField(
model_name='token',
name='allowed_ips',
field=django.contrib.postgres.fields.ArrayField(base_field=ipam.fields.IPNetworkField(), blank=True, null=True, size=None),
),
]

View File

@ -9,13 +9,14 @@ from django.db import models
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
from django.utils import timezone from django.utils import timezone
from netaddr import IPNetwork
from ipam.fields import IPNetworkField
from netbox.config import get_config from netbox.config import get_config
from utilities.querysets import RestrictedQuerySet from utilities.querysets import RestrictedQuerySet
from utilities.utils import flatten_dict from utilities.utils import flatten_dict
from .constants import * from .constants import *
__all__ = ( __all__ = (
'ObjectPermission', 'ObjectPermission',
'Token', 'Token',
@ -216,6 +217,14 @@ class Token(models.Model):
max_length=200, max_length=200,
blank=True blank=True
) )
allowed_ips = ArrayField(
base_field=IPNetworkField(),
blank=True,
null=True,
verbose_name='Allowed IPs',
help_text='Allowed IPv4/IPv6 networks from where the token can be used. Leave blank for no restrictions. '
'Ex: "10.1.1.0/24, 192.168.10.16/32, 2001:DB8:1::/64"',
)
class Meta: class Meta:
pass pass
@ -240,6 +249,19 @@ class Token(models.Model):
return False return False
return True return True
def validate_client_ip(self, client_ip):
"""
Validate the API client IP address against the source IP restrictions (if any) set on the token.
"""
if not self.allowed_ips:
return True
for ip_network in self.allowed_ips:
if client_ip in IPNetwork(ip_network):
return True
return False
# #
# Permissions # Permissions

View File

@ -0,0 +1,27 @@
from netaddr import IPAddress
__all__ = (
'get_client_ip',
)
def get_client_ip(request, additional_headers=()):
"""
Return the client (source) IP address of the given request.
"""
HTTP_HEADERS = (
'HTTP_X_REAL_IP',
'HTTP_X_FORWARDED_FOR',
'REMOTE_ADDR',
*additional_headers
)
for header in HTTP_HEADERS:
if header in request.META:
client_ip = request.META[header].split(',')[0]
try:
return IPAddress(client_ip)
except ValueError:
raise ValueError(f"Invalid IP address set for {header}: {client_ip}")
# Could not determine the client IP address from request headers
return None