diff --git a/netbox/secrets/api/views.py b/netbox/secrets/api/views.py index e34302286..25663c503 100644 --- a/netbox/secrets/api/views.py +++ b/netbox/secrets/api/views.py @@ -1,11 +1,9 @@ +import base64 from Crypto.PublicKey import RSA -from django.shortcuts import get_object_or_404 +from django.http import HttpResponseBadRequest -from rest_framework import generics -from rest_framework import status from rest_framework.authentication import BasicAuthentication, SessionAuthentication -from rest_framework.exceptions import PermissionDenied from rest_framework.permissions import IsAuthenticated from rest_framework.renderers import JSONRenderer from rest_framework.response import Response @@ -39,8 +37,8 @@ class SecretRoleViewSet(ModelViewSet): # Secrets # -# TODO: Need to implement custom create() and update() methods to handle secret encryption, and custom list() and -# retrieve() methods to handle decryption. +# TODO: Need to implement custom create() and update() methods to handle secret encryption. +# TODO: Figure out a better way of transmitting the private key class SecretViewSet(WritableSerializerMixin, ModelViewSet): queryset = Secret.objects.select_related( 'device__primary_ip4', 'device__primary_ip6', 'role', @@ -56,97 +54,73 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet): authentication_classes = [BasicAuthentication, SessionAuthentication] permission_classes = [IsAuthenticated] + # The user's private key must be sent as a header (X-Private-Key). To overcome limitations around sending line + # breaks in a header field, the key must be base64-encoded and stripped of all whitespace. This is a temporary + # kludge until a more elegant approach can be devised. + def _get_private_key(self, request): + private_key_b64 = request.META.get('HTTP_X_PRIVATE_KEY', None) + if private_key_b64 is None: + return None + # TODO: Handle invalid encoding + return base64.b64decode(private_key_b64) -# TODO: Delete -class SecretListView(generics.GenericAPIView): - """ - List secrets (filterable). If a private key is POSTed, attempt to decrypt each Secret. - """ - queryset = Secret.objects.select_related('device__primary_ip4', 'device__primary_ip6', 'role')\ - .prefetch_related('role__users', 'role__groups') - serializer_class = serializers.SecretSerializer - filter_class = SecretFilter - renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer] - permission_classes = [IsAuthenticated] + def retrieve(self, request, *args, **kwargs): + private_key = self._get_private_key(request) + secret = self.get_object() - def get(self, request, private_key=None): - queryset = self.filter_queryset(self.get_queryset()) + # Attempt to unlock the Secret if a private key was provided + if private_key is not None: - # Attempt to decrypt each Secret if a private key was provided. - if private_key: try: - uk = UserKey.objects.get(user=request.user) + user_key = UserKey.objects.get(user=request.user) except UserKey.DoesNotExist: - return Response( - {'error': ERR_USERKEY_MISSING}, - status=status.HTTP_400_BAD_REQUEST - ) - if not uk.is_active(): - return Response( - {'error': ERR_USERKEY_INACTIVE}, - status=status.HTTP_400_BAD_REQUEST - ) - master_key = uk.get_master_key(private_key) - if master_key is not None: - for s in queryset: - if s.decryptable_by(request.user): - s.decrypt(master_key) - else: - return Response( - {'error': ERR_PRIVKEY_INVALID}, - status=status.HTTP_400_BAD_REQUEST - ) + return HttpResponseBadRequest(ERR_USERKEY_MISSING) + if not user_key.is_active(): + return HttpResponseBadRequest(ERR_USERKEY_INACTIVE) - serializer = self.get_serializer(queryset, many=True) - return Response(serializer.data) - - def post(self, request): - return self.get(request, private_key=request.POST.get('private_key')) - - -# TODO: Delete -class SecretDetailView(generics.GenericAPIView): - """ - Retrieve a single Secret. If a private key is POSTed, attempt to decrypt the Secret. - """ - queryset = Secret.objects.select_related('device__primary_ip4', 'device__primary_ip6', 'role')\ - .prefetch_related('role__users', 'role__groups') - serializer_class = serializers.SecretSerializer - renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer] - permission_classes = [IsAuthenticated] - - def get(self, request, pk, private_key=None): - secret = get_object_or_404(Secret, pk=pk) - - # Attempt to decrypt the Secret if a private key was provided. - if private_key: - try: - uk = UserKey.objects.get(user=request.user) - except UserKey.DoesNotExist: - return Response( - {'error': ERR_USERKEY_MISSING}, - status=status.HTTP_400_BAD_REQUEST - ) - if not uk.is_active(): - return Response( - {'error': ERR_USERKEY_INACTIVE}, - status=status.HTTP_400_BAD_REQUEST - ) - if not secret.decryptable_by(request.user): - raise PermissionDenied(detail="You do not have permission to decrypt this secret.") - master_key = uk.get_master_key(private_key) + master_key = user_key.get_master_key(private_key) if master_key is None: - return Response( - {'error': ERR_PRIVKEY_INVALID}, - status=status.HTTP_400_BAD_REQUEST - ) + return HttpResponseBadRequest(ERR_PRIVKEY_INVALID) + secret.decrypt(master_key) serializer = self.get_serializer(secret) return Response(serializer.data) - def post(self, request, pk): - return self.get(request, pk, private_key=request.POST.get('private_key')) + def list(self, request, *args, **kwargs): + private_key = self._get_private_key(request) + queryset = self.filter_queryset(self.get_queryset()) + + # Attempt to unlock the Secrets if a private key was provided + master_key = None + if private_key is not None: + + try: + user_key = UserKey.objects.get(user=request.user) + except UserKey.DoesNotExist: + return HttpResponseBadRequest(ERR_USERKEY_MISSING) + if not user_key.is_active(): + return HttpResponseBadRequest(ERR_USERKEY_INACTIVE) + + master_key = user_key.get_master_key(private_key) + if master_key is None: + return HttpResponseBadRequest(ERR_PRIVKEY_INVALID) + + # Pagination + page = self.paginate_queryset(queryset) + if page is not None: + secrets = [] + if master_key is not None: + for secret in page: + secret.decrypt(master_key) + secrets.append(secret) + serializer = self.get_serializer(secrets, many=True) + else: + serializer = self.get_serializer(page, many=True) + return self.get_paginated_response(serializer.data) + + serializer = self.get_serializer(queryset, many=True) + return Response(serializer.data) class RSAKeyGeneratorView(APIView):