mirror of
https://github.com/netbox-community/netbox.git
synced 2025-08-07 16:18:16 -06:00
Updated to use AES
This commit is contained in:
parent
c42c79b147
commit
b39e158a9c
@ -12,8 +12,10 @@ from django.db import models
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_bytes, python_2_unicode_compatible
|
||||
|
||||
from dcim.models import Device
|
||||
from utilities.models import CreatedUpdatedModel
|
||||
# from dcim.models import Device
|
||||
# from utilities.models import CreatedUpdatedModel
|
||||
from netbox.dcim.models import Device
|
||||
from netbox.utilities.models import CreatedUpdatedModel
|
||||
from .exceptions import InvalidKey
|
||||
from .hashers import SecretValidationHasher
|
||||
|
||||
@ -45,16 +47,42 @@ def decrypt_master_key(master_key_cipher, private_key):
|
||||
return cipher.decrypt(master_key_cipher)
|
||||
|
||||
|
||||
def xor_keys(key_a, key_b):
|
||||
"""
|
||||
Return the binary XOR of two given keys.
|
||||
"""
|
||||
xor = XOR.new(key_a)
|
||||
return xor.encrypt(key_b)
|
||||
# def xor_keys(key_a, key_b):
|
||||
# """
|
||||
# Return the binary XOR of two given keys.
|
||||
# """
|
||||
# # encrypts "b" with "a"
|
||||
## key_b must be a bytstring
|
||||
# cipher = AES.new(key_a, AES.MODE_CBC)
|
||||
# # xor = XOR.new(key_a)
|
||||
# # return xor.encrypt(key_b)
|
||||
# return cipher
|
||||
|
||||
|
||||
def aes_encrypt_key(key, plaintext):
|
||||
# encrypt b with a
|
||||
cipher = AES.new(key, AES.MODE_EAX)
|
||||
nonce = cipher.nonce
|
||||
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
|
||||
# xor = XOR.new(key_a)
|
||||
# return xor.encrypt(key_b)
|
||||
return ciphertext, tag, nonce
|
||||
|
||||
|
||||
def aes_decrypt_key(key, cyphertext, nonce, tag):
|
||||
# decrypt b with a
|
||||
cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
|
||||
plaintext = cipher.dectypt(cyphertext)
|
||||
try:
|
||||
cipher.verify(tag)
|
||||
except ValueError:
|
||||
raise ValidationError({
|
||||
'aes decrypt': "Key incorrect or message corrupted"
|
||||
})
|
||||
return plaintext
|
||||
|
||||
|
||||
class UserKeyQuerySet(models.QuerySet):
|
||||
|
||||
def active(self):
|
||||
return self.filter(master_key_cipher__isnull=False)
|
||||
|
||||
@ -153,6 +181,7 @@ class UserKey(CreatedUpdatedModel):
|
||||
Returns True if the UserKey has been filled with a public RSA key.
|
||||
"""
|
||||
return bool(self.public_key)
|
||||
|
||||
is_filled.boolean = True
|
||||
|
||||
def is_active(self):
|
||||
@ -160,6 +189,7 @@ class UserKey(CreatedUpdatedModel):
|
||||
Returns True if the UserKey has been populated with an encrypted copy of the master key.
|
||||
"""
|
||||
return self.master_key_cipher is not None
|
||||
|
||||
is_active.boolean = True
|
||||
|
||||
def get_master_key(self, private_key):
|
||||
@ -194,6 +224,8 @@ class SessionKey(models.Model):
|
||||
created = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
key = None
|
||||
nonce = None
|
||||
tag = None
|
||||
|
||||
class Meta:
|
||||
ordering = ['userkey__user__username']
|
||||
@ -214,7 +246,8 @@ class SessionKey(models.Model):
|
||||
self.hash = make_password(self.key)
|
||||
|
||||
# Encrypt master key using the session key
|
||||
self.cipher = xor_keys(self.key, master_key)
|
||||
# self.cipher = xor_keys(self.key, master_key)
|
||||
self.cipher, self.tag, self.nonce = aes_encrypt_key(self.key, master_key)
|
||||
|
||||
super(SessionKey, self).save(*args, **kwargs)
|
||||
|
||||
@ -225,14 +258,16 @@ class SessionKey(models.Model):
|
||||
raise InvalidKey("Invalid session key")
|
||||
|
||||
# Decrypt master key using provided session key
|
||||
master_key = xor_keys(session_key, bytes(self.cipher))
|
||||
# master_key = xor_keys(session_key, bytes(self.cipher))
|
||||
master_key = aes_decrypt_key(session_key, bytes(self.cipher), self.nonce, self.tag)
|
||||
|
||||
return master_key
|
||||
|
||||
def get_session_key(self, master_key):
|
||||
|
||||
# Recover session key using the master key
|
||||
session_key = xor_keys(master_key, bytes(self.cipher))
|
||||
# session_key = xor_keys(master_key, bytes(self.cipher))
|
||||
session_key = aes_decrypt_key(master_key, bytes(self.cipher), self.nonce, self.tag)
|
||||
|
||||
# Validate the recovered session key
|
||||
if not check_password(session_key, self.hash):
|
||||
|
Loading…
Reference in New Issue
Block a user