Closes: #5278 - Remove Secrets (#6397)

* Remove Secrets

* #5278: Remove secrets javascript from netbox core

* Remove userkey references

* Fix PEP8

* Remove a few more instances of secrets.  Rebundle

* Remove Secrets

Co-authored-by: checktheroads <matt@allroads.io>
This commit is contained in:
Daniel Sheppard 2021-05-17 15:26:02 -05:00 committed by GitHub
parent dc5c765c2a
commit 744792452f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
97 changed files with 26 additions and 3720 deletions

View File

@ -194,7 +194,7 @@ To delete multiple objects at once, call `delete()` on a filtered queryset. It's
>>> Device.objects.filter(name__icontains='test').count() >>> Device.objects.filter(name__icontains='test').count()
27 27
>>> Device.objects.filter(name__icontains='test').delete() >>> Device.objects.filter(name__icontains='test').delete()
(35, {'dcim.DeviceBay': 0, 'secrets.Secret': 0, 'dcim.InterfaceConnection': 4, (35, {'dcim.DeviceBay': 0, 'dcim.InterfaceConnection': 4,
'extras.ImageAttachment': 0, 'dcim.Device': 27, 'dcim.Interface': 4, 'extras.ImageAttachment': 0, 'dcim.Device': 27, 'dcim.Interface': 4,
'dcim.ConsolePort': 0, 'dcim.PowerPort': 0}) 'dcim.ConsolePort': 0, 'dcim.PowerPort': 0})
``` ```

View File

@ -261,7 +261,7 @@ LOGGING = {
Default: False Default: False
Setting this to True will permit only authenticated users to access any part of NetBox. By default, anonymous users are permitted to access most data in NetBox (excluding secrets) but not make any changes. Setting this to True will permit only authenticated users to access any part of NetBox. By default, anonymous users are permitted to access most data in NetBox but not make any changes.
--- ---

View File

@ -1,8 +0,0 @@
# Secrets
{!docs/models/secrets/secret.md!}
{!docs/models/secrets/secretrole.md!}
---
{!docs/models/secrets/userkey.md!}

View File

@ -25,7 +25,6 @@ NetBox components are arranged into functional subsections called _apps_ (a carr
* `dcim`: Datacenter infrastructure management (sites, racks, and devices) * `dcim`: Datacenter infrastructure management (sites, racks, and devices)
* `extras`: Additional features not considered part of the core data model * `extras`: Additional features not considered part of the core data model
* `ipam`: IP address management (VRFs, prefixes, IP addresses, and VLANs) * `ipam`: IP address management (VRFs, prefixes, IP addresses, and VLANs)
* `secrets`: Encrypted storage of sensitive data (e.g. login credentials)
* `tenancy`: Tenants (such as customers) to which NetBox objects may be assigned * `tenancy`: Tenants (such as customers) to which NetBox objects may be assigned
* `users`: Authentication and user preferences * `users`: Authentication and user preferences
* `utilities`: Resources which are not user-facing (extendable classes, etc.) * `utilities`: Resources which are not user-facing (extendable classes, etc.)

View File

@ -47,7 +47,6 @@ The Django [content types](https://docs.djangoproject.com/en/stable/ref/contrib/
* [ipam.Service](../models/ipam/service.md) * [ipam.Service](../models/ipam/service.md)
* [ipam.VLAN](../models/ipam/vlan.md) * [ipam.VLAN](../models/ipam/vlan.md)
* [ipam.VRF](../models/ipam/vrf.md) * [ipam.VRF](../models/ipam/vrf.md)
* [secrets.Secret](../models/secrets/secret.md)
* [tenancy.Tenant](../models/tenancy/tenant.md) * [tenancy.Tenant](../models/tenancy/tenant.md)
* [virtualization.Cluster](../models/virtualization/cluster.md) * [virtualization.Cluster](../models/virtualization/cluster.md)
* [virtualization.VirtualMachine](../models/virtualization/virtualmachine.md) * [virtualization.VirtualMachine](../models/virtualization/virtualmachine.md)
@ -62,7 +61,6 @@ The Django [content types](https://docs.djangoproject.com/en/stable/ref/contrib/
* [ipam.RIR](../models/ipam/rir.md) * [ipam.RIR](../models/ipam/rir.md)
* [ipam.Role](../models/ipam/role.md) * [ipam.Role](../models/ipam/role.md)
* [ipam.VLANGroup](../models/ipam/vlangroup.md) * [ipam.VLANGroup](../models/ipam/vlangroup.md)
* [secrets.SecretRole](../models/secrets/secretrole.md)
* [virtualization.ClusterGroup](../models/virtualization/clustergroup.md) * [virtualization.ClusterGroup](../models/virtualization/clustergroup.md)
* [virtualization.ClusterType](../models/virtualization/clustertype.md) * [virtualization.ClusterType](../models/virtualization/clustertype.md)

View File

@ -2,7 +2,7 @@
A virtual chassis represents a set of devices which share a common control plane. A common example of this is a stack of switches which are connected and configured to operate as a single device. A virtual chassis must be assigned a name and may be assigned a domain. A virtual chassis represents a set of devices which share a common control plane. A common example of this is a stack of switches which are connected and configured to operate as a single device. A virtual chassis must be assigned a name and may be assigned a domain.
Each device in the virtual chassis is referred to as a VC member, and assigned a position and (optionally) a priority. VC member devices commonly reside within the same rack, though this is not a requirement. One of the devices may be designated as the VC master: This device will typically be assigned a name, secrets, services, and other attributes related to managing the VC. Each device in the virtual chassis is referred to as a VC member, and assigned a position and (optionally) a priority. VC member devices commonly reside within the same rack, though this is not a requirement. One of the devices may be designated as the VC master: This device will typically be assigned a name, services, and other attributes related to managing the VC.
!!! note !!! note
It's important to recognize the distinction between a virtual chassis and a chassis-based device. A virtual chassis is **not** suitable for modeling a chassis-based switch with removable line cards (such as the Juniper EX9208), as its line cards are _not_ physically autonomous devices. It's important to recognize the distinction between a virtual chassis and a chassis-based device. A virtual chassis is **not** suitable for modeling a chassis-based switch with removable line cards (such as the Juniper EX9208), as its line cards are _not_ physically autonomous devices.

View File

@ -1,5 +0,0 @@
# Secrets
A secret represents a single credential or other sensitive string of characters which must be stored securely. Each secret is assigned to a device within NetBox. The plaintext value of a secret is encrypted to a ciphertext immediately prior to storage within the database using a 256-bit AES master key. A SHA256 hash of the plaintext is also stored along with each ciphertext to validate the decrypted plaintext.
Each secret can also store an optional name parameter, which is not encrypted. This may be useful for storing user names.

View File

@ -1,9 +0,0 @@
# Secret Roles
Each secret is assigned a functional role which indicates what it is used for. Secret roles are customizable. Typical roles might include:
* Login credentials
* SNMP community strings
* RADIUS/TACACS+ keys
* IKE key strings
* Routing protocol shared secrets

View File

@ -1,35 +0,0 @@
# User Keys
Each user within NetBox can associate his or her account with an RSA public key. If activated by an administrator, this user key will contain a unique, encrypted copy of the AES master key needed to retrieve secret data.
User keys may be created by users individually, however they are of no use until they have been activated by a user who already possesses an active user key.
## Supported Key Format
Public key formats supported
- PKCS#1 RSAPublicKey* (PEM header: BEGIN RSA PUBLIC KEY)
- X.509 SubjectPublicKeyInfo** (PEM header: BEGIN PUBLIC KEY)
- **OpenSSH line format is not supported.**
Private key formats supported (unencrypted)
- PKCS#1 RSAPrivateKey** (PEM header: BEGIN RSA PRIVATE KEY)
- PKCS#8 PrivateKeyInfo* (PEM header: BEGIN PRIVATE KEY)
## Creating the First User Key
When NetBox is first installed, it contains no encryption keys. Before it can store secrets, a user (typically the superuser) must create a user key. This can be done by navigating to Profile > User Key.
To create a user key, you can either generate a new RSA key pair, or upload the public key belonging to a pair you already have. If generating a new key pair, **you must save the private key** locally before saving your new user key. Once your user key has been created, its public key will be displayed under your profile.
When the first user key is created in NetBox, a random master encryption key is generated automatically. This key is then encrypted using the public key provided and stored as part of your user key. **The master key cannot be recovered** without your private key.
Once a user key has been assigned an encrypted copy of the master key, it is considered activated and can now be used to encrypt and decrypt secrets.
## Creating Additional User Keys
Any user can create his or her user key by generating or uploading a public RSA key. However, a user key cannot be used to encrypt or decrypt secrets until it has been activated with an encrypted copy of the master key.
Only an administrator with an active user key can activate other user keys. To do so, access the NetBox admin UI and navigate to Secrets > User Keys. Select the user key(s) to be activated, and select "activate selected user keys" from the actions dropdown. You will need to provide your private key in order to decrypt the master key. A copy of the master key is then encrypted using the public key associated with the user key being activated.

View File

@ -67,7 +67,7 @@ Comprehensive, interactive documentation of all REST API endpoints is available
## Endpoint Hierarchy ## Endpoint Hierarchy
NetBox's entire REST API is housed under the API root at `https://<hostname>/api/`. The URL structure is divided at the root level by application: circuits, DCIM, extras, IPAM, plugins, secrets, tenancy, users, and virtualization. Within each application exists a separate path for each model. For example, the provider and circuit objects are located under the "circuits" application: NetBox's entire REST API is housed under the API root at `https://<hostname>/api/`. The URL structure is divided at the root level by application: circuits, DCIM, extras, IPAM, plugins, tenancy, users, and virtualization. Within each application exists a separate path for each model. For example, the provider and circuit objects are located under the "circuits" application:
* `/api/circuits/providers/` * `/api/circuits/providers/`
* `/api/circuits/circuits/` * `/api/circuits/circuits/`

View File

@ -1,172 +0,0 @@
# Working with Secrets
As with most other objects, the REST API can be used to view, create, modify, and delete secrets. However, additional steps are needed to encrypt or decrypt secret data.
## Generating a Session Key
In order to encrypt or decrypt secret data, a session key must be attached to the API request. To generate a session key, send an authenticated request to the `/api/secrets/get-session-key/` endpoint with the private RSA key which matches your [UserKey](../core-functionality/secrets.md#user-keys). The private key must be POSTed with the name `private_key`.
```no-highlight
$ curl -X POST http://netbox/api/secrets/get-session-key/ \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
--data-urlencode "private_key@<filename>"
```
```json
{
"session_key": "dyEnxlc9lnGzaOAV1dV/xqYPV63njIbdZYOgnAlGPHk="
}
```
!!! note
To read the private key from a file, use the convention above. Alternatively, the private key can be read from an environment variable using `--data-urlencode "private_key=$PRIVATE_KEY"`.
The request uses the provided private key to unlock your stored copy of the master key and generate a temporary session key, which can be attached in the `X-Session-Key` header of future API requests.
## Retrieving Secrets
A session key is not needed to retrieve unencrypted secrets: The secret is returned like any normal object with its `plaintext` field set to null.
```no-highlight
$ curl http://netbox/api/secrets/secrets/2587/ \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4"
```
```json
{
"id": 2587,
"url": "http://netbox/api/secrets/secrets/2587/",
"device": {
"id": 1827,
"url": "http://netbox/api/dcim/devices/1827/",
"name": "MyTestDevice",
"display_name": "MyTestDevice"
},
"role": {
"id": 1,
"url": "http://netbox/api/secrets/secret-roles/1/",
"name": "Login Credentials",
"slug": "login-creds"
},
"name": "admin",
"plaintext": null,
"hash": "pbkdf2_sha256$1000$G6mMFe4FetZQ$f+0itZbAoUqW5pd8+NH8W5rdp/2QNLIBb+LGdt4OSKA=",
"tags": [],
"custom_fields": {},
"created": "2017-03-21",
"last_updated": "2017-03-21T19:28:44.265582Z"
}
```
To decrypt a secret, we must include our session key in the `X-Session-Key` header when sending the `GET` request:
```no-highlight
$ curl http://netbox/api/secrets/secrets/2587/ \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
-H "X-Session-Key: dyEnxlc9lnGzaOAV1dV/xqYPV63njIbdZYOgnAlGPHk="
```
```json
{
"id": 2587,
"url": "http://netbox/api/secrets/secrets/2587/",
"device": {
"id": 1827,
"url": "http://netbox/api/dcim/devices/1827/",
"name": "MyTestDevice",
"display_name": "MyTestDevice"
},
"role": {
"id": 1,
"url": "http://netbox/api/secrets/secret-roles/1/",
"name": "Login Credentials",
"slug": "login-creds"
},
"name": "admin",
"plaintext": "foobar",
"hash": "pbkdf2_sha256$1000$G6mMFe4FetZQ$f+0itZbAoUqW5pd8+NH8W5rdp/2QNLIBb+LGdt4OSKA=",
"tags": [],
"custom_fields": {},
"created": "2017-03-21",
"last_updated": "2017-03-21T19:28:44.265582Z"
}
```
Multiple secrets within a list can be decrypted in this manner as well:
```no-highlight
$ curl http://netbox/api/secrets/secrets/?limit=3 \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
-H "X-Session-Key: dyEnxlc9lnGzaOAV1dV/xqYPV63njIbdZYOgnAlGPHk="
```
```json
{
"count": 3482,
"next": "http://netbox/api/secrets/secrets/?limit=3&offset=3",
"previous": null,
"results": [
{
"id": 2587,
"plaintext": "foobar",
...
},
{
"id": 2588,
"plaintext": "MyP@ssw0rd!",
...
},
{
"id": 2589,
"plaintext": "AnotherSecret!",
...
},
]
}
```
## Creating and Updating Secrets
Session keys are required when creating or modifying secrets. The secret's `plaintext` attribute is set to its non-encrypted value, and NetBox uses the session key to compute and store the encrypted value.
```no-highlight
$ curl -X POST http://netbox/api/secrets/secrets/ \
-H "Content-Type: application/json" \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
-H "X-Session-Key: dyEnxlc9lnGzaOAV1dV/xqYPV63njIbdZYOgnAlGPHk=" \
--data '{"device": 1827, "role": 1, "name": "backup", "plaintext": "Drowssap1"}'
```
```json
{
"id": 6194,
"url": "http://netbox/api/secrets/secrets/9194/",
"device": {
"id": 1827,
"url": "http://netbox/api/dcim/devices/1827/",
"name": "device43",
"display_name": "device43"
},
"role": {
"id": 1,
"url": "http://netbox/api/secrets/secret-roles/1/",
"name": "Login Credentials",
"slug": "login-creds"
},
"name": "backup",
"plaintext": "Drowssap1",
"hash": "pbkdf2_sha256$1000$J9db8sI5vBrd$IK6nFXnFl+K+nR5/KY8RSDxU1skYL8G69T5N3jZxM7c=",
"tags": [],
"custom_fields": {},
"created": "2020-08-05",
"last_updated": "2020-08-05T16:51:14.990506Z"
}
```
!!! note
Don't forget to include the `Content-Type: application/json` header when making a POST or PATCH request.

View File

@ -58,7 +58,6 @@ nav:
- Service Mapping: 'core-functionality/services.md' - Service Mapping: 'core-functionality/services.md'
- Circuits: 'core-functionality/circuits.md' - Circuits: 'core-functionality/circuits.md'
- Power Tracking: 'core-functionality/power.md' - Power Tracking: 'core-functionality/power.md'
- Secrets: 'core-functionality/secrets.md'
- Tenancy: 'core-functionality/tenancy.md' - Tenancy: 'core-functionality/tenancy.md'
- Additional Features: - Additional Features:
- Caching: 'additional-features/caching.md' - Caching: 'additional-features/caching.md'
@ -85,7 +84,6 @@ nav:
- Overview: 'rest-api/overview.md' - Overview: 'rest-api/overview.md'
- Filtering: 'rest-api/filtering.md' - Filtering: 'rest-api/filtering.md'
- Authentication: 'rest-api/authentication.md' - Authentication: 'rest-api/authentication.md'
- Working with Secrets: 'rest-api/working-with-secrets.md'
- Development: - Development:
- Introduction: 'development/index.md' - Introduction: 'development/index.md'
- Getting Started: 'development/getting-started.md' - Getting Started: 'development/getting-started.md'

View File

@ -592,12 +592,6 @@ class Device(PrimaryModel, ConfigContextModel):
images = GenericRelation( images = GenericRelation(
to='extras.ImageAttachment' to='extras.ImageAttachment'
) )
secrets = GenericRelation(
to='secrets.Secret',
content_type_field='assigned_object_type',
object_id_field='assigned_object_id',
related_query_name='device'
)
objects = ConfigContextModelQuerySet.as_manager() objects = ConfigContextModelQuerySet.as_manager()

View File

@ -19,7 +19,6 @@ from extras.views import ObjectChangeLogView, ObjectConfigContextView, ObjectJou
from ipam.models import IPAddress, Prefix, Service, VLAN from ipam.models import IPAddress, Prefix, Service, VLAN
from ipam.tables import InterfaceIPAddressTable, InterfaceVLANTable from ipam.tables import InterfaceIPAddressTable, InterfaceVLANTable
from netbox.views import generic from netbox.views import generic
from secrets.models import Secret
from utilities.forms import ConfirmationForm from utilities.forms import ConfirmationForm
from utilities.paginator import EnhancedPaginator, get_paginate_count from utilities.paginator import EnhancedPaginator, get_paginate_count
from utilities.permissions import get_permission_for_model from utilities.permissions import get_permission_for_model
@ -1293,9 +1292,6 @@ class DeviceView(generic.ObjectView):
# Services # Services
services = Service.objects.restrict(request.user, 'view').filter(device=instance) services = Service.objects.restrict(request.user, 'view').filter(device=instance)
# Secrets
secrets = Secret.objects.restrict(request.user, 'view').filter(device=instance)
# Find up to ten devices in the same site with the same functional role for quick reference. # Find up to ten devices in the same site with the same functional role for quick reference.
related_devices = Device.objects.restrict(request.user, 'view').filter( related_devices = Device.objects.restrict(request.user, 'view').filter(
site=instance.site, device_role=instance.device_role site=instance.site, device_role=instance.device_role
@ -1307,7 +1303,6 @@ class DeviceView(generic.ObjectView):
return { return {
'services': services, 'services': services,
'secrets': secrets,
'vc_members': vc_members, 'vc_members': vc_members,
'related_devices': related_devices, 'related_devices': related_devices,
'active_tab': 'device', 'active_tab': 'device',

View File

@ -9,7 +9,7 @@ from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
APPS = ['circuits', 'dcim', 'extras', 'ipam', 'secrets', 'tenancy', 'users', 'virtualization'] APPS = ['circuits', 'dcim', 'extras', 'ipam', 'tenancy', 'users', 'virtualization']
BANNER_TEXT = """### NetBox interactive shell ({node}) BANNER_TEXT = """### NetBox interactive shell ({node})
### Python {python} | Django {django} | NetBox {netbox} ### Python {python} | Django {django} | NetBox {netbox}

View File

@ -52,7 +52,6 @@ class Migration(migrations.Migration):
('circuits', '0015_custom_tag_models'), ('circuits', '0015_custom_tag_models'),
('dcim', '0070_custom_tag_models'), ('dcim', '0070_custom_tag_models'),
('ipam', '0025_custom_tag_models'), ('ipam', '0025_custom_tag_models'),
('secrets', '0006_custom_tag_models'),
('tenancy', '0006_custom_tag_models'), ('tenancy', '0006_custom_tag_models'),
('virtualization', '0009_custom_tag_models'), ('virtualization', '0009_custom_tag_models'),
] ]

View File

@ -14,17 +14,17 @@ class Migration(migrations.Migration):
migrations.AlterField( migrations.AlterField(
model_name='customfield', model_name='customfield',
name='obj_type', name='obj_type',
field=models.ManyToManyField(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['device', 'devicetype', 'powerfeed', 'rack', 'site'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'secrets'), ('model__in', ['secret'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), related_name='custom_fields', to='contenttypes.ContentType'), field=models.ManyToManyField(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['device', 'devicetype', 'powerfeed', 'rack', 'site'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), related_name='custom_fields', to='contenttypes.ContentType'),
), ),
migrations.AlterField( migrations.AlterField(
model_name='customlink', model_name='customlink',
name='content_type', name='content_type',
field=models.ForeignKey(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['cable', 'device', 'devicetype', 'powerpanel', 'powerfeed', 'rack', 'site'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'secrets'), ('model__in', ['secret'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType'), field=models.ForeignKey(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['cable', 'device', 'devicetype', 'powerpanel', 'powerfeed', 'rack', 'site'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType'),
), ),
migrations.AlterField( migrations.AlterField(
model_name='exporttemplate', model_name='exporttemplate',
name='content_type', name='content_type',
field=models.ForeignKey(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['cable', 'consoleport', 'device', 'devicetype', 'interface', 'inventoryitem', 'manufacturer', 'powerpanel', 'powerport', 'powerfeed', 'rack', 'rackgroup', 'region', 'site', 'virtualchassis'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'secrets'), ('model__in', ['secret'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType'), field=models.ForeignKey(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['cable', 'consoleport', 'device', 'devicetype', 'interface', 'inventoryitem', 'manufacturer', 'powerpanel', 'powerport', 'powerfeed', 'rack', 'rackgroup', 'region', 'site', 'virtualchassis'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType'),
), ),
migrations.AlterField( migrations.AlterField(
model_name='graph', model_name='graph',
@ -34,6 +34,6 @@ class Migration(migrations.Migration):
migrations.AlterField( migrations.AlterField(
model_name='webhook', model_name='webhook',
name='obj_type', name='obj_type',
field=models.ManyToManyField(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['cable', 'consoleport', 'consoleserverport', 'device', 'devicebay', 'devicetype', 'frontport', 'interface', 'inventoryitem', 'manufacturer', 'poweroutlet', 'powerpanel', 'powerport', 'powerfeed', 'rack', 'rearport', 'region', 'site', 'virtualchassis'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'secrets'), ('model__in', ['secret'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), related_name='webhooks', to='contenttypes.ContentType'), field=models.ManyToManyField(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'circuits'), ('model__in', ['circuit', 'provider'])), models.Q(('app_label', 'dcim'), ('model__in', ['cable', 'consoleport', 'consoleserverport', 'device', 'devicebay', 'devicetype', 'frontport', 'interface', 'inventoryitem', 'manufacturer', 'poweroutlet', 'powerpanel', 'powerport', 'powerfeed', 'rack', 'rearport', 'region', 'site', 'virtualchassis'])), models.Q(('app_label', 'ipam'), ('model__in', ['aggregate', 'ipaddress', 'prefix', 'service', 'vlan', 'vrf'])), models.Q(('app_label', 'tenancy'), ('model__in', ['tenant'])), models.Q(('app_label', 'virtualization'), ('model__in', ['cluster', 'virtualmachine'])), _connector='OR')), related_name='webhooks', to='contenttypes.ContentType'),
), ),
] ]

View File

@ -89,7 +89,6 @@ class Migration(migrations.Migration):
('dcim', '0117_custom_field_data'), ('dcim', '0117_custom_field_data'),
('extras', '0050_customfield_changes'), ('extras', '0050_customfield_changes'),
('ipam', '0038_custom_field_data'), ('ipam', '0038_custom_field_data'),
('secrets', '0010_custom_field_data'),
('tenancy', '0010_custom_field_data'), ('tenancy', '0010_custom_field_data'),
('virtualization', '0018_custom_field_data'), ('virtualization', '0018_custom_field_data'),
] ]

View File

@ -304,7 +304,6 @@ class APIRootView(APIView):
('extras', reverse('extras-api:api-root', request=request, format=format)), ('extras', reverse('extras-api:api-root', request=request, format=format)),
('ipam', reverse('ipam-api:api-root', request=request, format=format)), ('ipam', reverse('ipam-api:api-root', request=request, format=format)),
('plugins', reverse('plugins-api:api-root', request=request, format=format)), ('plugins', reverse('plugins-api:api-root', request=request, format=format)),
('secrets', reverse('secrets-api:api-root', request=request, format=format)),
('status', reverse('api-status', request=request, format=format)), ('status', reverse('api-status', request=request, format=format)),
('tenancy', reverse('tenancy-api:api-root', request=request, format=format)), ('tenancy', reverse('tenancy-api:api-root', request=request, format=format)),
('users', reverse('users-api:api-root', request=request, format=format)), ('users', reverse('users-api:api-root', request=request, format=format)),

View File

@ -150,7 +150,7 @@ INTERNAL_IPS = ('127.0.0.1', '::1')
LOGGING = {} LOGGING = {}
# Setting this to True will permit only authenticated users to access any part of NetBox. By default, anonymous users # Setting this to True will permit only authenticated users to access any part of NetBox. By default, anonymous users
# are permitted to access most data in NetBox (excluding secrets) but not make any changes. # are permitted to access most data in NetBox but not make any changes.
LOGIN_REQUIRED = False LOGIN_REQUIRED = False
# The length of time (in seconds) for which a user will remain logged into the web UI before being prompted to # The length of time (in seconds) for which a user will remain logged into the web UI before being prompted to

View File

@ -15,9 +15,6 @@ from dcim.tables import (
from ipam.filtersets import AggregateFilterSet, IPAddressFilterSet, PrefixFilterSet, VLANFilterSet, VRFFilterSet from ipam.filtersets import AggregateFilterSet, IPAddressFilterSet, PrefixFilterSet, VLANFilterSet, VRFFilterSet
from ipam.models import Aggregate, IPAddress, Prefix, VLAN, VRF from ipam.models import Aggregate, IPAddress, Prefix, VLAN, VRF
from ipam.tables import AggregateTable, IPAddressTable, PrefixTable, VLANTable, VRFTable from ipam.tables import AggregateTable, IPAddressTable, PrefixTable, VLANTable, VRFTable
from secrets.filtersets import SecretFilterSet
from secrets.models import Secret
from secrets.tables import SecretTable
from tenancy.filtersets import TenantFilterSet from tenancy.filtersets import TenantFilterSet
from tenancy.models import Tenant from tenancy.models import Tenant
from tenancy.tables import TenantTable from tenancy.tables import TenantTable
@ -161,13 +158,6 @@ SEARCH_TYPES = OrderedDict((
'table': VLANTable, 'table': VLANTable,
'url': 'ipam:vlan_list', 'url': 'ipam:vlan_list',
}), }),
# Secrets
('secret', {
'queryset': Secret.objects.prefetch_related('role', 'device'),
'filterset': SecretFilterSet,
'table': SecretTable,
'url': 'secrets:secret_list',
}),
# Tenancy # Tenancy
('tenant', { ('tenant', {
'queryset': Tenant.objects.prefetch_related('group'), 'queryset': Tenant.objects.prefetch_related('group'),

View File

@ -25,9 +25,6 @@ OBJ_TYPE_CHOICES = (
('ipaddress', 'IP Addresses'), ('ipaddress', 'IP Addresses'),
('vlan', 'VLANs'), ('vlan', 'VLANs'),
)), )),
('Secrets', (
('secret', 'Secrets'),
)),
('Tenancy', ( ('Tenancy', (
('tenant', 'Tenants'), ('tenant', 'Tenants'),
)), )),

View File

@ -292,7 +292,6 @@ INSTALLED_APPS = [
'dcim', 'dcim',
'ipam', 'ipam',
'extras', 'extras',
'secrets',
'tenancy', 'tenancy',
'users', 'users',
'utilities', 'utilities',
@ -433,7 +432,6 @@ CACHEOPS = {
'dcim.*': {'ops': 'all'}, 'dcim.*': {'ops': 'all'},
'ipam.*': {'ops': 'all'}, 'ipam.*': {'ops': 'all'},
'extras.*': {'ops': 'all'}, 'extras.*': {'ops': 'all'},
'secrets.*': {'ops': 'all'},
'users.*': {'ops': 'all'}, 'users.*': {'ops': 'all'},
'tenancy.tenantgroup': None, # MPTT models are exempt due to raw SQL 'tenancy.tenantgroup': None, # MPTT models are exempt due to raw SQL
'tenancy.*': {'ops': 'all'}, 'tenancy.*': {'ops': 'all'},
@ -574,9 +572,6 @@ RQ_QUEUES = {
# NetBox internal settings # NetBox internal settings
# #
# Secrets
SECRETS_MIN_PUBKEY_SIZE = 2048
# Pagination # Pagination
PER_PAGE_DEFAULTS = [ PER_PAGE_DEFAULTS = [
25, 50, 100, 250, 500, 1000 25, 50, 100, 250, 500, 1000

View File

@ -41,7 +41,6 @@ _patterns = [
path('dcim/', include('dcim.urls')), path('dcim/', include('dcim.urls')),
path('extras/', include('extras.urls')), path('extras/', include('extras.urls')),
path('ipam/', include('ipam.urls')), path('ipam/', include('ipam.urls')),
path('secrets/', include('secrets.urls')),
path('tenancy/', include('tenancy.urls')), path('tenancy/', include('tenancy.urls')),
path('user/', include('users.urls')), path('user/', include('users.urls')),
path('virtualization/', include('virtualization.urls')), path('virtualization/', include('virtualization.urls')),
@ -52,7 +51,6 @@ _patterns = [
path('api/dcim/', include('dcim.api.urls')), path('api/dcim/', include('dcim.api.urls')),
path('api/extras/', include('extras.api.urls')), path('api/extras/', include('extras.api.urls')),
path('api/ipam/', include('ipam.api.urls')), path('api/ipam/', include('ipam.api.urls')),
path('api/secrets/', include('secrets.api.urls')),
path('api/tenancy/', include('tenancy.api.urls')), path('api/tenancy/', include('tenancy.api.urls')),
path('api/users/', include('users.api.urls')), path('api/users/', include('users.api.urls')),
path('api/virtualization/', include('virtualization.api.urls')), path('api/virtualization/', include('virtualization.api.urls')),

View File

@ -16,7 +16,7 @@ from packaging import version
from circuits.models import Circuit, Provider from circuits.models import Circuit, Provider
from dcim.models import ( from dcim.models import (
Cable, ConsolePort, Device, DeviceType, Interface, Location, PowerPanel, PowerFeed, PowerPort, Rack, Site, Cable, ConsolePort, Device, DeviceType, Interface, PowerPanel, PowerFeed, PowerPort, Rack, Site,
) )
from extras.choices import JobResultStatusChoices from extras.choices import JobResultStatusChoices
from extras.models import ObjectChange, JobResult from extras.models import ObjectChange, JobResult
@ -24,7 +24,6 @@ from ipam.models import Aggregate, IPAddress, Prefix, VLAN, VRF
from netbox.constants import SEARCH_MAX_RESULTS, SEARCH_TYPES from netbox.constants import SEARCH_MAX_RESULTS, SEARCH_TYPES
from netbox.forms import SearchForm from netbox.forms import SearchForm
from netbox.releases import get_latest_release from netbox.releases import get_latest_release
from secrets.models import Secret
from tenancy.models import Tenant from tenancy.models import Tenant
from virtualization.models import Cluster, VirtualMachine from virtualization.models import Cluster, VirtualMachine
@ -78,9 +77,6 @@ class HomeView(View):
("circuits.view_provider", "Providers", "Organizations that provide circuits", Provider.objects.restrict(request.user, 'view').count), ("circuits.view_provider", "Providers", "Organizations that provide circuits", Provider.objects.restrict(request.user, 'view').count),
("circuits.view_circuit", "Circuits", "Communication links for transit, transport, & other services", Circuit.objects.restrict(request.user, 'view').count), ("circuits.view_circuit", "Circuits", "Communication links for transit, transport, & other services", Circuit.objects.restrict(request.user, 'view').count),
) )
secrets = (
("secrets.view_secret", "Secrets", "Cryptographically secured data", Secret.objects.restrict(request.user, 'view').count),
)
virtualization = ( virtualization = (
("virtualization.view_cluster", "Clusters", "Clusters of physical virtual machine hosts", Cluster.objects.restrict(request.user, 'view').count), ("virtualization.view_cluster", "Clusters", "Clusters of physical virtual machine hosts", Cluster.objects.restrict(request.user, 'view').count),
("virtualization.view_virtualmachine", "Virtual Machines", "Virtual compute instances running inside clusters", VirtualMachine.objects.restrict(request.user, 'view').count), ("virtualization.view_virtualmachine", "Virtual Machines", "Virtual compute instances running inside clusters", VirtualMachine.objects.restrict(request.user, 'view').count),
@ -103,7 +99,6 @@ class HomeView(View):
("Inventory", dcim), ("Inventory", dcim),
("Connections", connections), ("Connections", connections),
("Circuits", circuits), ("Circuits", circuits),
("Secrets", secrets),
("Power", power), ("Power", power),
) )
for section_label, section_items in sections: for section_label, section_items in sections:

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

Binary file not shown.

View File

@ -3,7 +3,6 @@ import { initBootstrap } from './bs';
import { initSearch } from './search'; import { initSearch } from './search';
import { initSelect } from './select'; import { initSelect } from './select';
import { initButtons } from './buttons'; import { initButtons } from './buttons';
import { initSecrets } from './secrets';
import { initColorMode } from './colorMode'; import { initColorMode } from './colorMode';
import { initMessages } from './messages'; import { initMessages } from './messages';
import { initClipboard } from './clipboard'; import { initClipboard } from './clipboard';
@ -21,7 +20,6 @@ function init() {
initDateSelector, initDateSelector,
initButtons, initButtons,
initClipboard, initClipboard,
initSecrets,
initTableConfig, initTableConfig,
]) { ]) {
init(); init();

View File

@ -1,208 +0,0 @@
import { Modal } from 'bootstrap';
import { createToast } from './bs';
import { apiGetBase, apiPostForm, getElements, isApiError, hasError } from './util';
/**
* Initialize Generate Private Key Pair Elements.
*/
function initGenerateKeyPair() {
const element = document.getElementById('new_keypair_modal') as HTMLDivElement;
const accept = document.getElementById('use_new_pubkey') as HTMLButtonElement;
// If the elements are not loaded, stop.
if (element === null || accept === null) {
return;
}
const publicElem = element.querySelector<HTMLTextAreaElement>('textarea#new_pubkey');
const privateElem = element.querySelector<HTMLTextAreaElement>('textarea#new_privkey');
/**
* Handle Generate Private Key Pair Modal opening.
*/
function handleOpen() {
// When the modal opens, set the `readonly` attribute on the textarea elements.
for (const elem of [publicElem, privateElem]) {
if (elem !== null) {
elem.setAttribute('readonly', '');
}
}
// Fetch the key pair from the API.
apiGetBase<APIKeyPair>('/api/secrets/generate-rsa-key-pair').then(data => {
if (!hasError(data)) {
// If key pair generation was successful, set the textarea elements' value to the generated
// values.
const { private_key: priv, public_key: pub } = data;
if (publicElem !== null && privateElem !== null) {
publicElem.value = pub;
privateElem.value = priv;
}
} else {
// Otherwise, show an error.
const toast = createToast('danger', 'Error', data.error);
toast.show();
}
});
}
/**
* Set the public key form field's value to the generated public key.
*/
function handleAccept() {
const publicKeyField = document.getElementById('id_public_key') as HTMLTextAreaElement;
if (publicElem !== null) {
publicKeyField.value = publicElem.value;
publicKeyField.innerText = publicElem.value;
}
}
element.addEventListener('shown.bs.modal', handleOpen);
accept.addEventListener('click', handleAccept);
}
/**
* Toggle copy/lock/unlock button visibility based on the action occurring.
* @param id Secret ID.
* @param action Lock or Unlock, so we know which buttons to display.
*/
function toggleSecretButtons(id: string, action: 'lock' | 'unlock') {
const unlockButton = document.querySelector(`button.unlock-secret[secret-id='${id}']`);
const lockButton = document.querySelector(`button.lock-secret[secret-id='${id}']`);
const copyButton = document.querySelector(`button.copy-secret[secret-id='${id}']`);
// If we're unlocking, hide the unlock button. Otherwise, show it.
if (unlockButton !== null) {
if (action === 'unlock') unlockButton.classList.add('d-none');
if (action === 'lock') unlockButton.classList.remove('d-none');
}
// If we're unlocking, show the lock button. Otherwise, hide it.
if (lockButton !== null) {
if (action === 'unlock') lockButton.classList.remove('d-none');
if (action === 'lock') lockButton.classList.add('d-none');
}
// If we're unlocking, show the copy button. Otherwise, hide it.
if (copyButton !== null) {
if (action === 'unlock') copyButton.classList.remove('d-none');
if (action === 'lock') copyButton.classList.add('d-none');
}
}
/**
* Initialize Lock & Unlock button event listeners & callbacks.
*/
function initLockUnlock() {
const privateKeyModalElem = document.getElementById('privkey_modal');
if (privateKeyModalElem === null) {
return;
}
const privateKeyModal = new Modal(privateKeyModalElem);
/**
* Unlock a secret, or prompt the user for their private key, if a session key is not available.
*
* @param id Secret ID
*/
function unlock(id: string | null) {
const target = document.getElementById(`secret_${id}`);
if (typeof id === 'string' && id !== '') {
apiGetBase<APISecret>(`/api/secrets/secrets/${id}`).then(data => {
if (!hasError(data)) {
const { plaintext } = data;
// `plaintext` is the plain text value of the secret. If it is null, it has not been
// decrypted, likely due to a mission session key.
if (target !== null && plaintext !== null) {
// If `plaintext` is not null, we have the decrypted value. Set the target element's
// inner text to the decrypted value and toggle copy/lock button visibility.
target.innerText = plaintext;
toggleSecretButtons(id, 'unlock');
} else {
// Otherwise, we do _not_ have the decrypted value and need to prompt the user for
// their private RSA key, in order to get a session key. The session key is then sent
// as a cookie in future requests.
privateKeyModal.show();
}
} else {
if (data.error.toLowerCase().includes('invalid session key')) {
// If, for some reason, a request was made but resulted in an API error that complains
// of a missing session key, prompt the user for their session key.
privateKeyModal.show();
} else {
// If we received an API error but it doesn't contain 'invalid session key', show the
// user an error message.
const toast = createToast('danger', 'Error', data.error);
toast.show();
}
}
});
}
}
/**
* Lock a secret and toggle visibility of the unlock button.
* @param id Secret ID
*/
function lock(id: string | null) {
if (typeof id === 'string' && id !== '') {
const target = document.getElementById(`secret_${id}`);
if (target !== null) {
// Obscure the inner text of the secret element.
target.innerText = '********';
}
// Toggle visibility of the copy/lock/unlock buttons.
toggleSecretButtons(id, 'lock');
}
}
for (const element of getElements<HTMLButtonElement>('button.unlock-secret')) {
element.addEventListener('click', () => unlock(element.getAttribute('secret-id')));
}
for (const element of getElements<HTMLButtonElement>('button.lock-secret')) {
element.addEventListener('click', () => lock(element.getAttribute('secret-id')));
}
}
/**
* Request a session key from the API.
* @param privateKey RSA Private Key (valid JSON string)
*/
function requestSessionKey(privateKey: string) {
apiPostForm('/api/secrets/get-session-key/', { private_key: privateKey }).then(res => {
if (!hasError(res)) {
// If the response received was not an error, show the user a success message.
const toast = createToast('success', 'Session Key Received', 'You may now unlock secrets.');
toast.show();
} else {
// Otherwise, show the user an error message.
let message = res.error;
if (isApiError(res)) {
// If the error received was a standard API error containing a Python exception message,
// append it to the error.
message += `\n${res.exception}`;
}
const toast = createToast('danger', 'Failed to Retrieve Session Key', message);
toast.show();
}
});
}
/**
* Initialize Request Session Key Elements.
*/
function initGetSessionKey() {
for (const element of getElements<HTMLButtonElement>('#request_session_key')) {
/**
* Send the user's input private key to the API to get a session key, which will be stored as
* a cookie for future requests.
*/
function handleClick() {
for (const pk of getElements<HTMLTextAreaElement>('#user_privkey')) {
requestSessionKey(pk.value);
// Clear the private key form field value.
pk.value = '';
}
}
element.addEventListener('click', handleClick);
}
}
export function initSecrets() {
for (const func of [initGenerateKeyPair, initLockUnlock, initGetSessionKey]) {
func();
}
}

View File

@ -1,11 +0,0 @@
# TODO: Rename the secrets app, probably
# Python 3.6 introduced a standard library named "secrets," which obviously conflicts with this Django app. To avoid
# renaming the app, we hotwire the components of the standard library that Django calls. (I don't like this any more
# than you do, but it works for now.) The only references to the secrets modules are in django/utils/crypto.py.
#
# First, we copy secrets.compare_digest, which comes from the hmac module:
from hmac import compare_digest
# Then, we instantiate SystemRandom and map its choice() function:
from random import SystemRandom
choice = SystemRandom().choice

View File

@ -1,61 +0,0 @@
from django.contrib import admin, messages
from django.contrib.admin.helpers import ACTION_CHECKBOX_NAME
from django.shortcuts import redirect, render
from .forms import ActivateUserKeyForm
from .models import UserKey
@admin.register(UserKey)
class UserKeyAdmin(admin.ModelAdmin):
actions = ['activate_selected']
list_display = ['user', 'is_filled', 'is_active', 'created']
fields = ['user', 'public_key', 'is_active', 'last_updated']
readonly_fields = ['user', 'is_active', 'last_updated']
def get_readonly_fields(self, request, obj=None):
# Don't allow a user to modify an existing public key directly.
if obj and obj.public_key:
return ['public_key'] + self.readonly_fields
return self.readonly_fields
def get_actions(self, request):
# Bulk deletion is disabled at the manager level, so remove the action from the admin site for this model.
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
if not request.user.has_perm('secrets.change_userkey'):
del actions['activate_selected']
return actions
def activate_selected(modeladmin, request, queryset):
"""
Enable bulk activation of UserKeys
"""
try:
my_userkey = UserKey.objects.get(user=request.user)
except UserKey.DoesNotExist:
messages.error(request, "You do not have an active User Key.")
return redirect('admin:secrets_userkey_changelist')
if 'activate' in request.POST:
form = ActivateUserKeyForm(request.POST)
if form.is_valid():
master_key = my_userkey.get_master_key(form.cleaned_data['secret_key'])
if master_key is not None:
for uk in form.cleaned_data['_selected_action']:
uk.activate(master_key)
return redirect('admin:secrets_userkey_changelist')
else:
messages.error(
request, "Invalid private key provided. Unable to retrieve master key.", extra_tags='error'
)
else:
form = ActivateUserKeyForm(
initial={'_selected_action': request.POST.getlist(ACTION_CHECKBOX_NAME)}
)
return render(request, 'activate_keys.html', {
'form': form,
})
activate_selected.short_description = "Activate selected user keys"

View File

@ -1,26 +0,0 @@
from rest_framework import serializers
from netbox.api import WritableNestedSerializer
from secrets.models import Secret, SecretRole
__all__ = [
'NestedSecretRoleSerializer',
'NestedSecretSerializer',
]
class NestedSecretSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='secrets-api:secret-detail')
class Meta:
model = Secret
fields = ['id', 'url', 'display', 'name']
class NestedSecretRoleSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='secrets-api:secretrole-detail')
secret_count = serializers.IntegerField(read_only=True)
class Meta:
model = SecretRole
fields = ['id', 'url', 'display', 'name', 'slug', 'secret_count']

View File

@ -1,63 +0,0 @@
from django.contrib.contenttypes.models import ContentType
from drf_yasg.utils import swagger_serializer_method
from rest_framework import serializers
from netbox.api import ContentTypeField
from netbox.api.serializers import OrganizationalModelSerializer, PrimaryModelSerializer
from secrets.constants import SECRET_ASSIGNMENT_MODELS
from secrets.models import Secret, SecretRole
from utilities.api import get_serializer_for_model
from .nested_serializers import *
#
# Secrets
#
class SecretRoleSerializer(OrganizationalModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='secrets-api:secretrole-detail')
secret_count = serializers.IntegerField(read_only=True)
class Meta:
model = SecretRole
fields = [
'id', 'url', 'display', 'name', 'slug', 'description', 'custom_fields', 'created', 'last_updated',
'secret_count',
]
class SecretSerializer(PrimaryModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='secrets-api:secret-detail')
assigned_object_type = ContentTypeField(
queryset=ContentType.objects.filter(SECRET_ASSIGNMENT_MODELS)
)
assigned_object = serializers.SerializerMethodField(read_only=True)
role = NestedSecretRoleSerializer()
plaintext = serializers.CharField()
class Meta:
model = Secret
fields = [
'id', 'url', 'display', 'assigned_object_type', 'assigned_object_id', 'assigned_object', 'role', 'name',
'plaintext', 'hash', 'tags', 'custom_fields', 'created', 'last_updated',
]
validators = []
@swagger_serializer_method(serializer_or_field=serializers.DictField)
def get_assigned_object(self, obj):
serializer = get_serializer_for_model(obj.assigned_object, prefix='Nested')
context = {'request': self.context['request']}
return serializer(obj.assigned_object, context=context).data
def validate(self, data):
# Encrypt plaintext data using the master key provided from the view context
if data.get('plaintext'):
s = Secret(plaintext=data['plaintext'])
s.encrypt(self.context['master_key'])
data['ciphertext'] = s.ciphertext
data['hash'] = s.hash
super().validate(data)
return data

View File

@ -1,17 +0,0 @@
from netbox.api import OrderedDefaultRouter
from . import views
router = OrderedDefaultRouter()
router.APIRootView = views.SecretsRootView
# Secrets
router.register('secret-roles', views.SecretRoleViewSet)
router.register('secrets', views.SecretViewSet)
# Miscellaneous
router.register('get-session-key', views.GetSessionKeyViewSet, basename='get-session-key')
router.register('generate-rsa-key-pair', views.GenerateRSAKeyPairViewSet, basename='generate-rsa-key-pair')
app_name = 'secrets-api'
urlpatterns = router.urls

View File

@ -1,223 +0,0 @@
import base64
from Crypto.PublicKey import RSA
from django.http import HttpResponseBadRequest
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.routers import APIRootView
from rest_framework.viewsets import ViewSet
from extras.api.views import CustomFieldModelViewSet
from netbox.api.views import ModelViewSet
from secrets import filtersets
from secrets.exceptions import InvalidKey
from secrets.models import Secret, SecretRole, SessionKey, UserKey
from utilities.utils import count_related
from . import serializers
ERR_USERKEY_MISSING = "No UserKey found for the current user."
ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
ERR_PRIVKEY_MISSING = "Private key was not provided."
ERR_PRIVKEY_INVALID = "Invalid private key."
class SecretsRootView(APIRootView):
"""
Secrets API root view
"""
def get_view_name(self):
return 'Secrets'
#
# Secret Roles
#
class SecretRoleViewSet(CustomFieldModelViewSet):
queryset = SecretRole.objects.annotate(
secret_count=count_related(Secret, 'role')
)
serializer_class = serializers.SecretRoleSerializer
filterset_class = filtersets.SecretRoleFilterSet
#
# Secrets
#
class SecretViewSet(ModelViewSet):
queryset = Secret.objects.prefetch_related('role', 'tags')
serializer_class = serializers.SecretSerializer
filterset_class = filtersets.SecretFilterSet
master_key = None
def get_serializer_context(self):
# Make the master key available to the serializer for encrypting plaintext values
context = super().get_serializer_context()
context['master_key'] = self.master_key
return context
def initial(self, request, *args, **kwargs):
super().initial(request, *args, **kwargs)
if request.user.is_authenticated:
# Read session key from HTTP cookie or header if it has been provided. The session key must be provided in
# order to encrypt/decrypt secrets.
if 'session_key' in request.COOKIES:
session_key = base64.b64decode(request.COOKIES['session_key'])
elif 'HTTP_X_SESSION_KEY' in request.META:
session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
else:
session_key = None
# We can't encrypt secret plaintext without a session key.
if self.action in ['create', 'update'] and session_key is None:
raise ValidationError("A session key must be provided when creating or updating secrets.")
# Attempt to retrieve the master key for encryption/decryption if a session key has been provided.
if session_key is not None:
try:
sk = SessionKey.objects.get(userkey__user=request.user)
self.master_key = sk.get_master_key(session_key)
except (SessionKey.DoesNotExist, InvalidKey):
raise ValidationError("Invalid session key.")
def retrieve(self, request, *args, **kwargs):
secret = self.get_object()
# Attempt to decrypt the secret if the master key is known
if self.master_key is not None:
secret.decrypt(self.master_key)
serializer = self.get_serializer(secret)
return Response(serializer.data)
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
# Attempt to decrypt all secrets if the master key is known
if self.master_key is not None:
secrets = []
for secret in page:
secret.decrypt(self.master_key)
secrets.append(secret)
serializer = self.get_serializer(secrets, many=True)
else:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class GetSessionKeyViewSet(ViewSet):
"""
Retrieve a temporary session key to use for encrypting and decrypting secrets via the API. The user's private RSA
key is POSTed with the name `private_key`. An example:
curl -v -X POST -H "Authorization: Token <token>" -H "Accept: application/json; indent=4" \\
--data-urlencode "private_key@<filename>" https://netbox/api/secrets/get-session-key/
This request will yield a base64-encoded session key to be included in an `X-Session-Key` header in future requests:
{
"session_key": "+8t4SI6XikgVmB5+/urhozx9O5qCQANyOk1MNe6taRf="
}
This endpoint accepts one optional parameter: `preserve_key`. If True and a session key exists, the existing session
key will be returned instead of a new one.
"""
permission_classes = [IsAuthenticated]
def create(self, request):
# Read private key
private_key = request.POST.get('private_key', None)
if private_key is None:
return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
# Validate user key
try:
user_key = UserKey.objects.get(user=request.user)
except UserKey.DoesNotExist:
return HttpResponseBadRequest(ERR_USERKEY_MISSING)
if not user_key.is_active():
return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
# Validate private key
master_key = user_key.get_master_key(private_key)
if master_key is None:
return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
try:
current_session_key = SessionKey.objects.get(userkey__user_id=request.user.pk)
except SessionKey.DoesNotExist:
current_session_key = None
if current_session_key and request.GET.get('preserve_key', False):
# Retrieve the existing session key
key = current_session_key.get_session_key(master_key)
else:
# Create a new SessionKey
SessionKey.objects.filter(userkey__user=request.user).delete()
sk = SessionKey(userkey=user_key)
sk.save(master_key=master_key)
key = sk.key
# Encode the key using base64. (b64decode() returns a bytestring under Python 3.)
encoded_key = base64.b64encode(key).decode()
# Craft the response
response = Response({
'session_key': encoded_key,
})
# If token authentication is not in use, assign the session key as a cookie
if request.auth is None:
response.set_cookie('session_key', value=encoded_key)
return response
class GenerateRSAKeyPairViewSet(ViewSet):
"""
This endpoint can be used to generate a new RSA key pair. The keys are returned in PEM format.
{
"public_key": "<public key>",
"private_key": "<private key>"
}
"""
permission_classes = [IsAuthenticated]
def list(self, request):
# Determine what size key to generate
key_size = request.GET.get('key_size', 2048)
if key_size not in range(2048, 4097, 256):
key_size = 2048
# Export RSA private and public keys in PEM format
key = RSA.generate(key_size)
private_key = key.exportKey('PEM')
public_key = key.publickey().exportKey('PEM')
return Response({
'private_key': private_key,
'public_key': public_key,
})

View File

@ -1,5 +0,0 @@
from django.apps import AppConfig
class SecretsConfig(AppConfig):
name = 'secrets'

View File

@ -1,13 +0,0 @@
from django.db.models import Q
#
# Secrets
#
SECRET_ASSIGNMENT_MODELS = Q(
Q(app_label='dcim', model='device') |
Q(app_label='virtualization', model='virtualmachine')
)
SECRET_PLAINTEXT_MAX_LENGTH = 65535

View File

@ -1,5 +0,0 @@
class InvalidKey(Exception):
"""
Raised when a provided key is invalid.
"""
pass

View File

@ -1,73 +0,0 @@
import django_filters
from django.db.models import Q
from dcim.models import Device
from extras.filters import TagFilter
from netbox.filtersets import OrganizationalModelFilterSet, PrimaryModelFilterSet
from virtualization.models import VirtualMachine
from .models import Secret, SecretRole
__all__ = (
'SecretFilterSet',
'SecretRoleFilterSet',
)
class SecretRoleFilterSet(OrganizationalModelFilterSet):
class Meta:
model = SecretRole
fields = ['id', 'name', 'slug']
class SecretFilterSet(PrimaryModelFilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
role_id = django_filters.ModelMultipleChoiceFilter(
queryset=SecretRole.objects.all(),
label='Role (ID)',
)
role = django_filters.ModelMultipleChoiceFilter(
field_name='role__slug',
queryset=SecretRole.objects.all(),
to_field_name='slug',
label='Role (slug)',
)
device = django_filters.ModelMultipleChoiceFilter(
field_name='device__name',
queryset=Device.objects.all(),
to_field_name='name',
label='Device (name)',
)
device_id = django_filters.ModelMultipleChoiceFilter(
field_name='device',
queryset=Device.objects.all(),
label='Device (ID)',
)
virtual_machine = django_filters.ModelMultipleChoiceFilter(
field_name='virtual_machine__name',
queryset=VirtualMachine.objects.all(),
to_field_name='name',
label='Virtual machine (name)',
)
virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
field_name='virtual_machine',
queryset=VirtualMachine.objects.all(),
label='Virtual machine (ID)',
)
tag = TagFilter()
class Meta:
model = Secret
fields = ['id', 'name']
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(name__icontains=value) |
Q(device__name__icontains=value)
)

View File

@ -1,285 +0,0 @@
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from django import forms
from django.utils.translation import gettext as _
from dcim.models import Device
from extras.forms import (
AddRemoveTagsForm, CustomFieldBulkEditForm, CustomFieldFilterForm, CustomFieldModelForm, CustomFieldModelCSVForm,
)
from extras.models import Tag
from utilities.forms import (
BootstrapMixin, CSVModelChoiceField, DynamicModelChoiceField, DynamicModelMultipleChoiceField,
SlugField, TagFilterField,
)
from virtualization.models import VirtualMachine
from .constants import *
from .models import Secret, SecretRole, UserKey
def validate_rsa_key(key, is_secret=True):
"""
Validate the format and type of an RSA key.
"""
if key.startswith('ssh-rsa '):
raise forms.ValidationError("OpenSSH line format is not supported. Please ensure that your public is in PEM (base64) format.")
try:
key = RSA.importKey(key)
except ValueError:
raise forms.ValidationError("Invalid RSA key. Please ensure that your key is in PEM (base64) format.")
except Exception as e:
raise forms.ValidationError("Invalid key detected: {}".format(e))
if is_secret and not key.has_private():
raise forms.ValidationError("This looks like a public key. Please provide your private RSA key.")
elif not is_secret and key.has_private():
raise forms.ValidationError("This looks like a private key. Please provide your public RSA key.")
try:
PKCS1_OAEP.new(key)
except Exception:
raise forms.ValidationError("Error validating RSA key. Please ensure that your key supports PKCS#1 OAEP.")
#
# Secret roles
#
class SecretRoleForm(BootstrapMixin, CustomFieldModelForm):
slug = SlugField()
class Meta:
model = SecretRole
fields = ('name', 'slug', 'description')
class SecretRoleCSVForm(CustomFieldModelCSVForm):
slug = SlugField()
class Meta:
model = SecretRole
fields = SecretRole.csv_headers
class SecretRoleBulkEditForm(BootstrapMixin, CustomFieldBulkEditForm):
pk = forms.ModelMultipleChoiceField(
queryset=SecretRole.objects.all(),
widget=forms.MultipleHiddenInput
)
description = forms.CharField(
max_length=200,
required=False
)
class Meta:
nullable_fields = ['description']
#
# Secrets
#
class SecretForm(BootstrapMixin, CustomFieldModelForm):
device = DynamicModelChoiceField(
queryset=Device.objects.all(),
required=False
)
virtual_machine = DynamicModelChoiceField(
queryset=VirtualMachine.objects.all(),
required=False
)
plaintext = forms.CharField(
max_length=SECRET_PLAINTEXT_MAX_LENGTH,
required=False,
label='Plaintext',
widget=forms.PasswordInput(
attrs={
'class': 'requires-session-key',
}
)
)
plaintext2 = forms.CharField(
max_length=SECRET_PLAINTEXT_MAX_LENGTH,
required=False,
label='Plaintext (verify)',
widget=forms.PasswordInput()
)
role = DynamicModelChoiceField(
queryset=SecretRole.objects.all()
)
tags = DynamicModelMultipleChoiceField(
queryset=Tag.objects.all(),
required=False
)
class Meta:
model = Secret
fields = [
'device', 'virtual_machine', 'role', 'name', 'plaintext', 'plaintext2', 'tags',
]
def __init__(self, *args, **kwargs):
# Initialize helper selectors
instance = kwargs.get('instance')
initial = kwargs.get('initial', {}).copy()
if instance:
if type(instance.assigned_object) is Device:
initial['device'] = instance.assigned_object
elif type(instance.assigned_object) is VirtualMachine:
initial['virtual_machine'] = instance.assigned_object
kwargs['initial'] = initial
super().__init__(*args, **kwargs)
# A plaintext value is required when creating a new Secret
if not self.instance.pk:
self.fields['plaintext'].required = True
def clean(self):
super().clean()
if not self.cleaned_data['device'] and not self.cleaned_data['virtual_machine']:
raise forms.ValidationError("Secrets must be assigned to a device or virtual machine.")
if self.cleaned_data['device'] and self.cleaned_data['virtual_machine']:
raise forms.ValidationError("Cannot select both a device and virtual machine for secret assignment.")
# Verify that the provided plaintext values match
if self.cleaned_data['plaintext'] != self.cleaned_data['plaintext2']:
raise forms.ValidationError({
'plaintext2': "The two given plaintext values do not match. Please check your input."
})
def save(self, *args, **kwargs):
# Set assigned object
self.instance.assigned_object = self.cleaned_data.get('device') or self.cleaned_data.get('virtual_machine')
return super().save(*args, **kwargs)
class SecretCSVForm(CustomFieldModelCSVForm):
role = CSVModelChoiceField(
queryset=SecretRole.objects.all(),
to_field_name='name',
help_text='Assigned role'
)
device = CSVModelChoiceField(
queryset=Device.objects.all(),
required=False,
to_field_name='name',
help_text='Assigned device'
)
virtual_machine = CSVModelChoiceField(
queryset=VirtualMachine.objects.all(),
required=False,
to_field_name='name',
help_text='Assigned VM'
)
plaintext = forms.CharField(
help_text='Plaintext secret data'
)
class Meta:
model = Secret
fields = ['role', 'name', 'plaintext', 'device', 'virtual_machine']
help_texts = {
'name': 'Name or username',
}
def clean(self):
super().clean()
device = self.cleaned_data.get('device')
virtual_machine = self.cleaned_data.get('virtual_machine')
# Validate device OR VM is assigned
if not device and not virtual_machine:
raise forms.ValidationError("Secret must be assigned to a device or a virtual machine")
if device and virtual_machine:
raise forms.ValidationError("Secret cannot be assigned to both a device and a virtual machine")
def save(self, *args, **kwargs):
# Set device/VM assignment
self.instance.assigned_object = self.cleaned_data['device'] or self.cleaned_data['virtual_machine']
s = super().save(*args, **kwargs)
# Set plaintext on instance
s.plaintext = str(self.cleaned_data['plaintext'])
return s
class SecretBulkEditForm(BootstrapMixin, AddRemoveTagsForm, CustomFieldBulkEditForm):
pk = forms.ModelMultipleChoiceField(
queryset=Secret.objects.all(),
widget=forms.MultipleHiddenInput()
)
role = DynamicModelChoiceField(
queryset=SecretRole.objects.all(),
required=False
)
name = forms.CharField(
max_length=100,
required=False
)
class Meta:
nullable_fields = [
'name',
]
class SecretFilterForm(BootstrapMixin, CustomFieldFilterForm):
model = Secret
q = forms.CharField(
required=False,
label=_('Search')
)
role_id = DynamicModelMultipleChoiceField(
queryset=SecretRole.objects.all(),
required=False,
label=_('Role')
)
tag = TagFilterField(model)
#
# UserKeys
#
class UserKeyForm(BootstrapMixin, forms.ModelForm):
class Meta:
model = UserKey
fields = ['public_key']
help_texts = {
'public_key': "Enter your public RSA key. Keep the private one with you; you'll need it for decryption. "
"Please note that passphrase-protected keys are not supported.",
}
labels = {
'public_key': ''
}
def clean_public_key(self):
key = self.cleaned_data['public_key']
# Validate the RSA key format.
validate_rsa_key(key, is_secret=False)
return key
class ActivateUserKeyForm(forms.Form):
_selected_action = forms.ModelMultipleChoiceField(
queryset=UserKey.objects.all(),
label='User Keys'
)
secret_key = forms.CharField(
widget=forms.Textarea(
attrs={
'class': 'vLargeTextField',
}
),
label='Your private key'
)

View File

@ -1,9 +0,0 @@
from django.contrib.auth.hashers import PBKDF2PasswordHasher
class SecretValidationHasher(PBKDF2PasswordHasher):
"""
We're using Django's stock SHA256 hasher with a low iteration count to avoid introducing excessive delay when
retrieving a large number of Secrets (the plaintext of each Secret is validated against its hash upon decryption).
"""
iterations = 1000

View File

@ -1,70 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.7 on 2016-06-22 18:21
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('dcim', '0002_auto_20160622_1821'),
('auth', '0007_alter_validators_add_error_messages'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Secret',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateField(auto_now_add=True)),
('last_updated', models.DateTimeField(auto_now=True)),
('name', models.CharField(blank=True, max_length=100)),
('ciphertext', models.BinaryField(max_length=65568)),
('hash', models.CharField(editable=False, max_length=128)),
('device', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='secrets', to='dcim.Device')),
],
options={
'ordering': ['device', 'role', 'name'],
},
),
migrations.CreateModel(
name='SecretRole',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=50, unique=True)),
('slug', models.SlugField(unique=True)),
('groups', models.ManyToManyField(blank=True, related_name='secretroles', to='auth.Group')),
('users', models.ManyToManyField(blank=True, related_name='secretroles', to=settings.AUTH_USER_MODEL)),
],
options={
'ordering': ['name'],
},
),
migrations.CreateModel(
name='UserKey',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateField(auto_now_add=True)),
('last_updated', models.DateTimeField(auto_now=True)),
('public_key', models.TextField(verbose_name=b'RSA public key')),
('master_key_cipher', models.BinaryField(blank=True, max_length=512, null=True)),
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='user_key', to=settings.AUTH_USER_MODEL, verbose_name=b'User')),
],
options={
'ordering': ['user__username'],
},
),
migrations.AddField(
model_name='secret',
name='role',
field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='secrets', to='secrets.SecretRole'),
),
migrations.AlterUniqueTogether(
name='secret',
unique_together=set([('device', 'role', 'name')]),
),
]

View File

@ -1,37 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-04-27 15:26
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('secrets', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='SessionKey',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('cipher', models.BinaryField(max_length=512)),
('hash', models.CharField(editable=False, max_length=128)),
('created', models.DateTimeField(auto_now_add=True)),
],
options={
'ordering': ['userkey__user__username'],
},
),
migrations.AlterField(
model_name='userkey',
name='user',
field=models.OneToOneField(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='user_key', to=settings.AUTH_USER_MODEL),
),
migrations.AddField(
model_name='sessionkey',
name='userkey',
field=models.OneToOneField(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='session_key', to='secrets.UserKey'),
),
]

View File

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-05-24 15:34
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0002_userkey_add_session_key'),
]
operations = [
migrations.AlterField(
model_name='userkey',
name='public_key',
field=models.TextField(verbose_name='RSA public key'),
),
]

View File

@ -1,20 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.12 on 2018-05-22 19:04
from django.db import migrations
import taggit.managers
class Migration(migrations.Migration):
dependencies = [
('taggit', '0002_auto_20150616_2121'),
('secrets', '0003_unicode_literals'),
]
operations = [
migrations.AddField(
model_name='secret',
name='tags',
field=taggit.managers.TaggableManager(help_text='A comma-separated list of tags.', through='taggit.TaggedItem', to='taggit.Tag', verbose_name='Tags'),
),
]

View File

@ -1,33 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.12 on 2018-06-13 17:29
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0004_tags'),
]
operations = [
migrations.AddField(
model_name='secretrole',
name='created',
field=models.DateField(auto_now_add=True, null=True),
),
migrations.AddField(
model_name='secretrole',
name='last_updated',
field=models.DateTimeField(auto_now=True, null=True),
),
migrations.AlterField(
model_name='secret',
name='created',
field=models.DateField(auto_now_add=True, null=True),
),
migrations.AlterField(
model_name='secret',
name='last_updated',
field=models.DateTimeField(auto_now=True, null=True),
),
]

View File

@ -1,20 +0,0 @@
# Generated by Django 2.1.4 on 2019-02-20 06:56
from django.db import migrations
import taggit.managers
class Migration(migrations.Migration):
dependencies = [
('secrets', '0005_change_logging'),
('extras', '0019_tag_taggeditem'),
]
operations = [
migrations.AlterField(
model_name='secret',
name='tags',
field=taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag'),
),
]

View File

@ -1,18 +0,0 @@
# Generated by Django 2.2.6 on 2019-12-10 18:00
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0006_custom_tag_models'),
]
operations = [
migrations.AddField(
model_name='secretrole',
name='description',
field=models.CharField(blank=True, max_length=100),
),
]

View File

@ -1,18 +0,0 @@
# Generated by Django 3.0.3 on 2020-03-13 20:27
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0007_secretrole_description'),
]
operations = [
migrations.AlterField(
model_name='secretrole',
name='description',
field=models.CharField(blank=True, max_length=200),
),
]

View File

@ -1,20 +0,0 @@
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('secrets', '0008_standardize_description'),
('users', '0009_replicate_permissions'),
]
operations = [
migrations.RemoveField(
model_name='secretrole',
name='groups',
),
migrations.RemoveField(
model_name='secretrole',
name='users',
),
]

View File

@ -1,17 +0,0 @@
import django.core.serializers.json
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0009_secretrole_drop_users_groups'),
]
operations = [
migrations.AddField(
model_name='secret',
name='custom_field_data',
field=models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder),
),
]

View File

@ -1,67 +0,0 @@
from django.db import migrations, models
import django.db.models.deletion
def device_to_generic_assignment(apps, schema_editor):
ContentType = apps.get_model('contenttypes', 'ContentType')
Device = apps.get_model('dcim', 'Device')
Secret = apps.get_model('secrets', 'Secret')
device_ct = ContentType.objects.get_for_model(Device)
Secret.objects.update(assigned_object_type=device_ct, assigned_object_id=models.F('device_id'))
class Migration(migrations.Migration):
dependencies = [
('contenttypes', '0002_remove_content_type_name'),
('secrets', '0010_custom_field_data'),
]
operations = [
migrations.AlterModelOptions(
name='secret',
options={'ordering': ('role', 'name', 'pk')},
),
# Add assigned_object type & ID fields
migrations.AddField(
model_name='secret',
name='assigned_object_id',
field=models.PositiveIntegerField(blank=True, null=True),
preserve_default=False,
),
migrations.AddField(
model_name='secret',
name='assigned_object_type',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, to='contenttypes.contenttype'),
preserve_default=False,
),
migrations.AlterUniqueTogether(
name='secret',
unique_together={('assigned_object_type', 'assigned_object_id', 'role', 'name')},
),
# Copy device assignments and delete device ForeignKey
migrations.RunPython(
code=device_to_generic_assignment,
reverse_code=migrations.RunPython.noop
),
migrations.RemoveField(
model_name='secret',
name='device',
),
# Remove blank/null from assigned_object fields
migrations.AlterField(
model_name='secret',
name='assigned_object_id',
field=models.PositiveIntegerField(),
),
migrations.AlterField(
model_name='secret',
name='assigned_object_type',
field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='contenttypes.contenttype'),
),
]

View File

@ -1,23 +0,0 @@
# Generated by Django 3.1 on 2020-10-15 19:33
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0011_secret_generic_assignments'),
]
operations = [
migrations.AlterField(
model_name='secretrole',
name='name',
field=models.CharField(max_length=100, unique=True),
),
migrations.AlterField(
model_name='secretrole',
name='slug',
field=models.SlugField(max_length=100, unique=True),
),
]

View File

@ -1,37 +0,0 @@
import django.core.serializers.json
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('secrets', '0012_standardize_name_length'),
]
operations = [
migrations.AddField(
model_name='secretrole',
name='custom_field_data',
field=models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder),
),
migrations.AlterField(
model_name='secret',
name='id',
field=models.BigAutoField(primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='secretrole',
name='id',
field=models.BigAutoField(primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='sessionkey',
name='id',
field=models.BigAutoField(primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='userkey',
name='id',
field=models.BigAutoField(primary_key=True, serialize=False),
),
]

View File

@ -1,424 +0,0 @@
import os
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Util import strxor
from django.conf import settings
from django.contrib.auth.hashers import make_password, check_password
from django.contrib.auth.models import Group, User
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.encoding import force_bytes
from extras.utils import extras_features
from netbox.models import BigIDModel, OrganizationalModel, PrimaryModel
from utilities.querysets import RestrictedQuerySet
from .exceptions import InvalidKey
from .hashers import SecretValidationHasher
from .querysets import UserKeyQuerySet
from .utils import encrypt_master_key, decrypt_master_key, generate_random_key
__all__ = (
'Secret',
'SecretRole',
'SessionKey',
'UserKey',
)
class UserKey(BigIDModel):
"""
A UserKey stores a user's personal RSA (public) encryption key, which is used to generate their unique encrypted
copy of the master encryption key. The encrypted instance of the master key can be decrypted only with the user's
matching (private) decryption key.
"""
created = models.DateField(
auto_now_add=True
)
last_updated = models.DateTimeField(
auto_now=True
)
user = models.OneToOneField(
to=User,
on_delete=models.CASCADE,
related_name='user_key',
editable=False
)
public_key = models.TextField(
verbose_name='RSA public key'
)
master_key_cipher = models.BinaryField(
max_length=512,
blank=True,
null=True,
editable=False
)
objects = UserKeyQuerySet.as_manager()
class Meta:
ordering = ['user__username']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Store the initial public_key and master_key_cipher to check for changes on save().
self.__initial_public_key = self.public_key
self.__initial_master_key_cipher = self.master_key_cipher
def __str__(self):
return self.user.username
def clean(self):
super().clean()
if self.public_key:
# Validate the public key format
try:
pubkey = RSA.import_key(self.public_key)
except ValueError:
raise ValidationError({
'public_key': "Invalid RSA key format."
})
except Exception:
raise ValidationError("Something went wrong while trying to save your key. Please ensure that you're "
"uploading a valid RSA public key in PEM format (no SSH/PGP).")
# Validate the public key length
pubkey_length = pubkey.size_in_bits()
if pubkey_length < settings.SECRETS_MIN_PUBKEY_SIZE:
raise ValidationError({
'public_key': "Insufficient key length. Keys must be at least {} bits long.".format(
settings.SECRETS_MIN_PUBKEY_SIZE
)
})
# We can't use keys bigger than our master_key_cipher field can hold
if pubkey_length > 4096:
raise ValidationError({
'public_key': "Public key size ({}) is too large. Maximum key size is 4096 bits.".format(
pubkey_length
)
})
def save(self, *args, **kwargs):
# Check whether public_key has been modified. If so, nullify the initial master_key_cipher.
if self.__initial_master_key_cipher and self.public_key != self.__initial_public_key:
self.master_key_cipher = None
# If no other active UserKeys exist, generate a new master key and use it to activate this UserKey.
if self.is_filled() and not self.is_active() and not UserKey.objects.active().count():
master_key = generate_random_key()
self.master_key_cipher = encrypt_master_key(master_key, self.public_key)
super().save(*args, **kwargs)
def delete(self, *args, **kwargs):
# If Secrets exist and this is the last active UserKey, prevent its deletion. Deleting the last UserKey will
# result in the master key being destroyed and rendering all Secrets inaccessible.
if Secret.objects.count() and [uk.pk for uk in UserKey.objects.active()] == [self.pk]:
raise Exception("Cannot delete the last active UserKey when Secrets exist! This would render all secrets "
"inaccessible.")
super().delete(*args, **kwargs)
def is_filled(self):
"""
Returns True if the UserKey has been filled with a public RSA key.
"""
return bool(self.public_key)
is_filled.boolean = True
def is_active(self):
"""
Returns True if the UserKey has been populated with an encrypted copy of the master key.
"""
return self.master_key_cipher is not None
is_active.boolean = True
def get_master_key(self, private_key):
"""
Given the User's private key, return the encrypted master key.
"""
if not self.is_active:
raise ValueError("Unable to retrieve master key: UserKey is inactive.")
try:
return decrypt_master_key(force_bytes(self.master_key_cipher), private_key)
except ValueError:
return None
def activate(self, master_key):
"""
Activate the UserKey by saving an encrypted copy of the master key to the database.
"""
if not self.public_key:
raise Exception("Cannot activate UserKey: Its public key must be filled first.")
self.master_key_cipher = encrypt_master_key(master_key, self.public_key)
self.save()
class SessionKey(BigIDModel):
"""
A SessionKey stores a User's temporary key to be used for the encryption and decryption of secrets.
"""
userkey = models.OneToOneField(
to='secrets.UserKey',
on_delete=models.CASCADE,
related_name='session_key',
editable=False
)
cipher = models.BinaryField(
max_length=512,
editable=False
)
hash = models.CharField(
max_length=128,
editable=False
)
created = models.DateTimeField(
auto_now_add=True
)
key = None
class Meta:
ordering = ['userkey__user__username']
def __str__(self):
return self.userkey.user.username
def save(self, master_key=None, *args, **kwargs):
if master_key is None:
raise Exception("The master key must be provided to save a session key.")
# Generate a random 256-bit session key if one is not already defined
if self.key is None:
self.key = generate_random_key()
# Generate SHA256 hash using Django's built-in password hashing mechanism
self.hash = make_password(self.key)
# Encrypt master key using the session key
self.cipher = strxor.strxor(self.key, master_key)
super().save(*args, **kwargs)
def get_master_key(self, session_key):
# Validate the provided session key
if not check_password(session_key, self.hash):
raise InvalidKey("Invalid session key")
# Decrypt master key using provided session key
master_key = strxor.strxor(session_key, bytes(self.cipher))
return master_key
def get_session_key(self, master_key):
# Recover session key using the master key
session_key = strxor.strxor(master_key, bytes(self.cipher))
# Validate the recovered session key
if not check_password(session_key, self.hash):
raise InvalidKey("Invalid master key")
return session_key
@extras_features('custom_fields', 'custom_links', 'export_templates', 'webhooks')
class SecretRole(OrganizationalModel):
"""
A SecretRole represents an arbitrary functional classification of Secrets. For example, a user might define roles
such as "Login Credentials" or "SNMP Communities."
"""
name = models.CharField(
max_length=100,
unique=True
)
slug = models.SlugField(
max_length=100,
unique=True
)
description = models.CharField(
max_length=200,
blank=True,
)
objects = RestrictedQuerySet.as_manager()
csv_headers = ['name', 'slug', 'description']
class Meta:
ordering = ['name']
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('secrets:secretrole', args=[self.pk])
def to_csv(self):
return (
self.name,
self.slug,
self.description,
)
@extras_features('custom_fields', 'custom_links', 'export_templates', 'webhooks')
class Secret(PrimaryModel):
"""
A Secret stores an AES256-encrypted copy of sensitive data, such as passwords or secret keys. An irreversible
SHA-256 hash is stored along with the ciphertext for validation upon decryption. Each Secret is assigned to exactly
one NetBox object, and objects may have multiple Secrets associated with them. A name can optionally be defined
along with the ciphertext; this string is stored as plain text in the database.
A Secret can be up to 65,535 bytes (64KB - 1B) in length. Each secret string will be padded with random data to
a minimum of 64 bytes during encryption in order to protect short strings from ciphertext analysis.
"""
assigned_object_type = models.ForeignKey(
to=ContentType,
on_delete=models.PROTECT
)
assigned_object_id = models.PositiveIntegerField()
assigned_object = GenericForeignKey(
ct_field='assigned_object_type',
fk_field='assigned_object_id'
)
role = models.ForeignKey(
to='secrets.SecretRole',
on_delete=models.PROTECT,
related_name='secrets'
)
name = models.CharField(
max_length=100,
blank=True
)
ciphertext = models.BinaryField(
max_length=65568, # 128-bit IV + 16-bit pad length + 65535B secret + 15B padding
editable=False
)
hash = models.CharField(
max_length=128,
editable=False
)
objects = RestrictedQuerySet.as_manager()
plaintext = None
csv_headers = ['assigned_object_type', 'assigned_object_id', 'role', 'name', 'plaintext']
class Meta:
ordering = ('role', 'name', 'pk')
unique_together = ('assigned_object_type', 'assigned_object_id', 'role', 'name')
def __init__(self, *args, **kwargs):
self.plaintext = kwargs.pop('plaintext', None)
super().__init__(*args, **kwargs)
def __str__(self):
return self.name or 'Secret'
def get_absolute_url(self):
return reverse('secrets:secret', args=[self.pk])
def to_csv(self):
return (
f'{self.assigned_object_type.app_label}.{self.assigned_object_type.model}',
self.assigned_object_id,
self.role,
self.name,
self.plaintext or '',
)
def _pad(self, s):
"""
Prepend the length of the plaintext (2B) and pad with garbage to a multiple of 16B (minimum of 64B).
+--+--------+-------------------------------------------+
|LL|MySecret|xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx|
+--+--------+-------------------------------------------+
"""
s = s.encode('utf8')
if len(s) > 65535:
raise ValueError("Maximum plaintext size is 65535 bytes.")
# Minimum ciphertext size is 64 bytes to conceal the length of short secrets.
if len(s) <= 62:
pad_length = 62 - len(s)
elif (len(s) + 2) % 16:
pad_length = 16 - ((len(s) + 2) % 16)
else:
pad_length = 0
header = bytes([len(s) >> 8]) + bytes([len(s) % 256])
return header + s + os.urandom(pad_length)
def _unpad(self, s):
"""
Consume the first two bytes of s as a plaintext length indicator and return only that many bytes as the
plaintext.
"""
if isinstance(s[0], str):
plaintext_length = (ord(s[0]) << 8) + ord(s[1])
else:
plaintext_length = (s[0] << 8) + s[1]
return s[2:plaintext_length + 2].decode('utf8')
def encrypt(self, secret_key):
"""
Generate a random initialization vector (IV) for AES. Pad the plaintext to the AES block size (16 bytes) and
encrypt. Prepend the IV for use in decryption. Finally, record the SHA256 hash of the plaintext for validation
upon decryption.
"""
if self.plaintext is None:
raise Exception("Must unlock or set plaintext before locking.")
# Pad and encrypt plaintext
iv = os.urandom(16)
aes = AES.new(secret_key, AES.MODE_CFB, iv)
self.ciphertext = iv + aes.encrypt(self._pad(self.plaintext))
# Generate SHA256 using Django's built-in password hashing mechanism
self.hash = make_password(self.plaintext, hasher=SecretValidationHasher())
self.plaintext = None
def decrypt(self, secret_key):
"""
Consume the first 16 bytes of self.ciphertext as the AES initialization vector (IV). The remainder is decrypted
using the IV and the provided secret key. Padding is then removed to reveal the plaintext. Finally, validate the
decrypted plaintext value against the stored hash.
"""
if self.plaintext is not None:
return
if not self.ciphertext:
raise Exception("Must define ciphertext before unlocking.")
# Decrypt ciphertext and remove padding
iv = bytes(self.ciphertext[0:16])
ciphertext = bytes(self.ciphertext[16:])
aes = AES.new(secret_key, AES.MODE_CFB, iv)
plaintext = self._unpad(aes.decrypt(ciphertext))
# Verify decrypted plaintext against hash
if not self.validate(plaintext):
raise ValueError("Invalid key or ciphertext!")
self.plaintext = plaintext
def validate(self, plaintext):
"""
Validate that a given plaintext matches the stored hash.
"""
if not self.hash:
raise Exception("Hash has not been generated for this secret.")
return check_password(plaintext, self.hash, preferred=SecretValidationHasher())

View File

@ -1,11 +0,0 @@
from django.db.models import QuerySet
class UserKeyQuerySet(QuerySet):
def active(self):
return self.filter(master_key_cipher__isnull=False)
def delete(self):
# Disable bulk deletion to avoid accidentally wiping out all copies of the master key.
raise Exception("Bulk deletion has been disabled.")

View File

@ -1,52 +0,0 @@
import django_tables2 as tables
from utilities.tables import BaseTable, ButtonsColumn, LinkedCountColumn, TagColumn, ToggleColumn
from .models import SecretRole, Secret
#
# Secret roles
#
class SecretRoleTable(BaseTable):
pk = ToggleColumn()
name = tables.Column(
linkify=True
)
secret_count = LinkedCountColumn(
viewname='secrets:secret_list',
url_params={'role_id': 'pk'},
verbose_name='Secrets'
)
actions = ButtonsColumn(SecretRole)
class Meta(BaseTable.Meta):
model = SecretRole
fields = ('pk', 'name', 'secret_count', 'description', 'slug', 'actions')
default_columns = ('pk', 'name', 'secret_count', 'description', 'actions')
#
# Secrets
#
class SecretTable(BaseTable):
pk = ToggleColumn()
id = tables.Column( # Provides a link to the secret
linkify=True
)
assigned_object = tables.Column(
linkify=True,
verbose_name='Assigned object'
)
role = tables.Column(
linkify=True
)
tags = TagColumn(
url_name='secrets:secret_list'
)
class Meta(BaseTable.Meta):
model = Secret
fields = ('pk', 'id', 'assigned_object', 'role', 'name', 'last_updated', 'hash', 'tags')
default_columns = ('pk', 'id', 'assigned_object', 'role', 'name', 'last_updated')

View File

@ -1,12 +0,0 @@
{% extends "admin/base_site.html" %}
{% block content %}
<form action="." method="post">
{% csrf_token %}
{{ form.as_p }}
<input type="hidden" name="action" value="activate_selected" />
<input type="submit" name="activate" value="Activate User Key(s)" />
</form>
{% endblock %}

View File

@ -1,40 +0,0 @@
# Dummy RSA key pair for testing use only
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA97wPWxpq5cClRu8Ssq609ZLfyx6E8ln/v/PdFZ7fxxmA4k+z
1Q/Rn9/897PWy+1x2ZKlHjmaw1z7dS3PlGqdd453d1eY95xYVbFrIHs7yJy8lcDR
2criwGEI68VP1FwcOkkwhicjtQZQS5fkkBIbRjA2wmt2PVT26YbOX2qCMItV1+me
o/Ogh+uI1oNePJ8VYuGXbGNggf1qMY8fGhhhGY2b4PKuSTcsYjbg8adOGzFL9RXL
I1X4PHNCzD/Y1vdM3jJXv+luk3TU+JIbzJeN5ZEEz+sIdlMPCAACaZAY/t9Kd/Lx
Hr0o4K/6gqkZIukxFCK6sN53gibAXfaKc4xlqQIDAQABAoIBAQC4pDQVxNTTtQf6
nImlH83EEto1++M+9pFFsi6fxLApJvsGsjzomke1Dy7uN93qVGk8rq3enzSYU58f
sSs8BVKkH00vZ9ydAKxeAkREC1V9qkRsoTBHUY47sJcDkyZyssxfLNm7w0Q70h7a
mLVEJBqr75eAxLN19vOpDk6Wkz3Bi0Dj27HLeme3hH5jLVQIIswWZnUDP3r/sdM/
WA2GjoycPbug0r1FVZnxkFCrQ5yMfH3VzKBelj7356+5sc/TUXedDFN/DV2b90Ll
+au7EEXecFYZwmX3SX2hpe6IWEpUW3B0fvm+Ipm8h7x68i7J0oi9EUXW2+UQYfOx
dDLxTLvhAoGBAPtJJox4XcpzipSAzKxyV8K9ikUZCG2wJU7VHyZ5zpSXwl/husls
brAzHQcnWayhxxuWeiQ6pLnRFPFXjlOH2FZqHXSLnfpDaymEksDPvo9GqRE3Q+F+
lDRn72H1NLIj3Y3t5SwWRB34Dhy+gd5Ht9L3dCTH8cYvJGnmS4sH/z0NAoGBAPxh
2rhS1B0S9mqqvpduUPxqUIWaztXaHC6ZikloOFcgVMdh9MRrpa2sa+bqcygyqrbH
GZIIeGcWpmzeitWgSUNLMSIpdl/VoBSvZUMggdJyOHXayo/EhfFddGHdkfz0B0GW
LzH8ow4JcYdhkTl4+xQstXJNVRJyw5ezFy35FHwNAoGAGZzjKP470R7lyS03r3wY
Jelb5p8elM+XfemLO0i/HbY6QbuoZk9/GMac9tWz9jynJtC3smmn0KjXEaJzB2CZ
VHWMewygFZo5mgnBS5XhPoldQjv310wnnw/Y/osXy/CL7KOK8Gt0lflqttNUOWvl
+MLwO6+FnUXA2Gp42Lr/8SECgYANf2pEK2HewDHfmIwi6yp3pXPzAUmIlGanc1y6
+lDxD/CYzTta+erdc/g9XFKWVsdciR9r+Pn/gW2bKve/3xer+qyBCDilfXZXRN4k
jeuDhspQO0hUEg2b0AS2azQwlBiDQHX7tWg/CvBAbk5nBXpgJNf7aflfyDV/untF
4SlgTQKBgGmcyU02lyM6ogGbzWqSsHgR1ZhYyTV9DekQx9GysLG1wT2QzgjxOw4K
5PnVkOXr/ORqt+vJsYrtqBZQihmPPREKEwr2n8BRw0364z02wjvP04hDBHp4S5Ej
PQeC5qErboVGMMpM2SamqGEfr+HJ/uRF6mEmm+xjI57aOvAwPW0B
-----END RSA PRIVATE KEY-----"""
PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA97wPWxpq5cClRu8Ssq60
9ZLfyx6E8ln/v/PdFZ7fxxmA4k+z1Q/Rn9/897PWy+1x2ZKlHjmaw1z7dS3PlGqd
d453d1eY95xYVbFrIHs7yJy8lcDR2criwGEI68VP1FwcOkkwhicjtQZQS5fkkBIb
RjA2wmt2PVT26YbOX2qCMItV1+meo/Ogh+uI1oNePJ8VYuGXbGNggf1qMY8fGhhh
GY2b4PKuSTcsYjbg8adOGzFL9RXLI1X4PHNCzD/Y1vdM3jJXv+luk3TU+JIbzJeN
5ZEEz+sIdlMPCAACaZAY/t9Kd/LxHr0o4K/6gqkZIukxFCK6sN53gibAXfaKc4xl
qQIDAQAB
-----END PUBLIC KEY-----"""
SSH_PUBLIC_KEY = """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCy2yMGnuvmM5CnFG8CsohfUYobXU7+pz/RJtvUUnARAY11Ybc3cn0tvzn4aPxclX8+514n6R7jJCZuVGJXXapqZDq2l+PLmgLhyBJxE9qq7rbp4EAJiUP0inDyf8qFzSKT7Rm8cjHvY3v2GI32JUXuWACA23t5YPUqVglkjfdVX8VHJh6fMQrQ4O3CKKh2x0S82UHH7SaYH0HqOknPgyRQ+ZQorUU25IpzJPesk29nN3DYqfY+VQsKJOLglWvoapaZiu+wK/7ovXqYXNuhfAwlkjbCRKjwix1kZjtDS44US1//BCaT7AeuwMpFLI44v/VajoxTfE0h74Mxl48mNt7Qme4lbXxH8yMa6HNfDp4vjnxPE1CWuSrFo4G+HI1rc22qSmw9e67qIGRbcI7/cIFpeBvnfCCgWrqWZ6ZzdAZJCnu7/aWn00+VG+54GFmJ+3R2xhWcu+Uzn+o1aWROtUuzq0qR6zdXME3A0Oud2uQrQAiAGFdWpfvcOEbD+tlPNDk= test"""

View File

@ -1,169 +0,0 @@
import base64
from django.urls import reverse
from rest_framework import status
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from secrets.models import Secret, SecretRole, SessionKey, UserKey
from utilities.testing import APITestCase, APIViewTestCases
from .constants import PRIVATE_KEY, PUBLIC_KEY
class AppTest(APITestCase):
def test_root(self):
url = reverse('secrets-api:api-root')
response = self.client.get('{}?format=api'.format(url), **self.header)
self.assertEqual(response.status_code, 200)
class SecretRoleTest(APIViewTestCases.APIViewTestCase):
model = SecretRole
brief_fields = ['display', 'id', 'name', 'secret_count', 'slug', 'url']
create_data = [
{
'name': 'Secret Role 4',
'slug': 'secret-role-4',
},
{
'name': 'Secret Role 5',
'slug': 'secret-role-5',
},
{
'name': 'Secret Role 6',
'slug': 'secret-role-6',
},
]
bulk_update_data = {
'description': 'New description',
}
@classmethod
def setUpTestData(cls):
secret_roles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
SecretRole(name='Secret Role 3', slug='secret-role-3'),
)
SecretRole.objects.bulk_create(secret_roles)
class SecretTest(APIViewTestCases.APIViewTestCase):
model = Secret
brief_fields = ['display', 'id', 'name', 'url']
def setUp(self):
super().setUp()
# Create a UserKey for the test user
userkey = UserKey(user=self.user, public_key=PUBLIC_KEY)
userkey.save()
# Create a SessionKey for the user
self.master_key = userkey.get_master_key(PRIVATE_KEY)
session_key = SessionKey(userkey=userkey)
session_key.save(self.master_key)
# Append the session key to the test client's request header
self.header['HTTP_X_SESSION_KEY'] = base64.b64encode(session_key.key)
site = Site.objects.create(name='Site 1', slug='site-1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
devicetype = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
devicerole = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
device = Device.objects.create(name='Device 1', site=site, device_type=devicetype, device_role=devicerole)
secret_roles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
)
SecretRole.objects.bulk_create(secret_roles)
secrets = (
Secret(assigned_object=device, role=secret_roles[0], name='Secret 1', plaintext='ABC'),
Secret(assigned_object=device, role=secret_roles[0], name='Secret 2', plaintext='DEF'),
Secret(assigned_object=device, role=secret_roles[0], name='Secret 3', plaintext='GHI'),
)
for secret in secrets:
secret.encrypt(self.master_key)
secret.save()
self.create_data = [
{
'assigned_object_type': 'dcim.device',
'assigned_object_id': device.pk,
'role': secret_roles[1].pk,
'name': 'Secret 4',
'plaintext': 'JKL',
},
{
'assigned_object_type': 'dcim.device',
'assigned_object_id': device.pk,
'role': secret_roles[1].pk,
'name': 'Secret 5',
'plaintext': 'MNO',
},
{
'assigned_object_type': 'dcim.device',
'assigned_object_id': device.pk,
'role': secret_roles[1].pk,
'name': 'Secret 6',
'plaintext': 'PQR',
},
]
self.bulk_update_data = {
'role': secret_roles[1].pk,
}
def prepare_instance(self, instance):
# Unlock the plaintext prior to evaluation of the instance
instance.decrypt(self.master_key)
return instance
class GetSessionKeyTest(APITestCase):
def setUp(self):
super().setUp()
userkey = UserKey(user=self.user, public_key=PUBLIC_KEY)
userkey.save()
master_key = userkey.get_master_key(PRIVATE_KEY)
self.session_key = SessionKey(userkey=userkey)
self.session_key.save(master_key)
self.header = {
'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key),
}
def test_get_session_key(self):
encoded_session_key = base64.b64encode(self.session_key.key).decode()
url = reverse('secrets-api:get-session-key-list')
data = {
'private_key': PRIVATE_KEY,
}
response = self.client.post(url, data, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertIsNotNone(response.data.get('session_key'))
self.assertNotEqual(response.data.get('session_key'), encoded_session_key)
def test_get_session_key_preserved(self):
encoded_session_key = base64.b64encode(self.session_key.key).decode()
url = reverse('secrets-api:get-session-key-list') + '?preserve_key=True'
data = {
'private_key': PRIVATE_KEY,
}
response = self.client.post(url, data, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data.get('session_key'), encoded_session_key)

View File

@ -1,103 +0,0 @@
from django.test import TestCase
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from secrets.filtersets import *
from secrets.models import Secret, SecretRole
from utilities.testing import ChangeLoggedFilterSetTests
from virtualization.models import Cluster, ClusterType, VirtualMachine
class SecretRoleTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = SecretRole.objects.all()
filterset = SecretRoleFilterSet
@classmethod
def setUpTestData(cls):
roles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
SecretRole(name='Secret Role 3', slug='secret-role-3'),
)
SecretRole.objects.bulk_create(roles)
def test_name(self):
params = {'name': ['Secret Role 1', 'Secret Role 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['secret-role-1', 'secret-role-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class SecretTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = Secret.objects.all()
filterset = SecretFilterSet
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site-1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
)
Device.objects.bulk_create(devices)
cluster_type = ClusterType.objects.create(name='Cluster Type 1', slug='cluster-type-1')
cluster = Cluster.objects.create(name='Cluster 1', type=cluster_type)
virtual_machines = (
VirtualMachine(name='Virtual Machine 1', cluster=cluster),
VirtualMachine(name='Virtual Machine 2', cluster=cluster),
VirtualMachine(name='Virtual Machine 3', cluster=cluster),
)
VirtualMachine.objects.bulk_create(virtual_machines)
roles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
SecretRole(name='Secret Role 3', slug='secret-role-3'),
)
SecretRole.objects.bulk_create(roles)
secrets = (
Secret(assigned_object=devices[0], role=roles[0], name='Secret 1', plaintext='SECRET DATA'),
Secret(assigned_object=devices[1], role=roles[1], name='Secret 2', plaintext='SECRET DATA'),
Secret(assigned_object=devices[2], role=roles[2], name='Secret 3', plaintext='SECRET DATA'),
Secret(assigned_object=virtual_machines[0], role=roles[0], name='Secret 4', plaintext='SECRET DATA'),
Secret(assigned_object=virtual_machines[1], role=roles[1], name='Secret 5', plaintext='SECRET DATA'),
Secret(assigned_object=virtual_machines[2], role=roles[2], name='Secret 6', plaintext='SECRET DATA'),
)
# Must call save() to encrypt Secrets
for s in secrets:
s.save()
def test_name(self):
params = {'name': ['Secret 1', 'Secret 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_role(self):
roles = SecretRole.objects.all()[:2]
params = {'role_id': [roles[0].pk, roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'role': [roles[0].slug, roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'device': [devices[0].name, devices[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_virtual_machine(self):
virtual_machines = VirtualMachine.objects.all()[:2]
params = {'virtual_machine_id': [virtual_machines[0].pk, virtual_machines[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'virtual_machine': [virtual_machines[0].name, virtual_machines[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -1,32 +0,0 @@
from django.test import TestCase
from secrets.forms import UserKeyForm
from secrets.models import UserKey
from utilities.testing import create_test_user
from .constants import PUBLIC_KEY, SSH_PUBLIC_KEY
class UserKeyFormTestCase(TestCase):
def setUp(self):
user = create_test_user(
permissions=[
'secrets.view_secretrole',
'secrets.add_secretrole',
]
)
self.userkey = UserKey(user=user)
def test_upload_rsakey(self):
form = UserKeyForm(
data={'public_key': PUBLIC_KEY},
instance=self.userkey,
)
self.assertTrue(form.is_valid())
self.assertTrue(form.save())
def test_upload_sshkey(self):
form = UserKeyForm(
data={'public_key': SSH_PUBLIC_KEY},
instance=self.userkey,
)
self.assertFalse(form.is_valid())

View File

@ -1,169 +0,0 @@
import string
from Crypto.PublicKey import RSA
from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.test import TestCase
from secrets.hashers import SecretValidationHasher
from secrets.models import Secret, UserKey
from secrets.utils import encrypt_master_key, decrypt_master_key, generate_random_key
class UserKeyTestCase(TestCase):
def setUp(self):
self.TEST_KEYS = {}
key_size = getattr(settings, 'SECRETS_MIN_PUBKEY_SIZE', 2048)
for username in ['alice', 'bob']:
User.objects.create_user(username=username, password=username)
key = RSA.generate(key_size)
self.TEST_KEYS['{}_public'.format(username)] = key.publickey().exportKey('PEM')
self.TEST_KEYS['{}_private'.format(username)] = key.exportKey('PEM')
def test_01_fill(self):
"""
Validate the filling of a UserKey with public key material.
"""
alice_uk = UserKey(user=User.objects.get(username='alice'))
self.assertFalse(alice_uk.is_filled(), "UserKey with empty public_key is_filled() did not return False")
alice_uk.public_key = self.TEST_KEYS['alice_public']
self.assertTrue(alice_uk.is_filled(), "UserKey with public key is_filled() did not return True")
def test_02_activate(self):
"""
Validate the activation of a UserKey.
"""
master_key = generate_random_key()
alice_uk = UserKey(user=User.objects.get(username='alice'), public_key=self.TEST_KEYS['alice_public'])
self.assertFalse(alice_uk.is_active(), "Inactive UserKey is_active() did not return False")
alice_uk.activate(master_key)
self.assertTrue(alice_uk.is_active(), "ActiveUserKey is_active() did not return True")
def test_03_key_sizes(self):
"""
Ensure that RSA keys which are too small or too large are rejected.
"""
rsa = RSA.generate(getattr(settings, 'SECRETS_MIN_PUBKEY_SIZE', 2048) - 256)
small_key = rsa.publickey().exportKey('PEM')
try:
UserKey(public_key=small_key).clean()
self.fail("UserKey.clean() did not fail with an undersized RSA key")
except ValidationError:
pass
rsa = RSA.generate(4096 + 256) # Max size is 4096 (enforced by master_key_cipher field size)
big_key = rsa.publickey().exportKey('PEM')
try:
UserKey(public_key=big_key).clean()
self.fail("UserKey.clean() did not fail with an oversized RSA key")
except ValidationError:
pass
def test_04_master_key_retrieval(self):
"""
Test the decryption of a master key using the user's private key.
"""
master_key = generate_random_key()
alice_uk = UserKey(user=User.objects.get(username='alice'), public_key=self.TEST_KEYS['alice_public'])
alice_uk.activate(master_key)
retrieved_master_key = alice_uk.get_master_key(self.TEST_KEYS['alice_private'])
self.assertEqual(master_key, retrieved_master_key, "Master key retrieval failed with correct private key")
def test_05_invalid_private_key(self):
"""
Ensure that an exception is raised when attempting to retrieve a secret key using an invalid private key.
"""
secret_key = generate_random_key()
secret_key_cipher = encrypt_master_key(secret_key, self.TEST_KEYS['alice_public'])
try:
decrypted_secret_key = decrypt_master_key(secret_key_cipher, self.TEST_KEYS['bob_private'])
self.fail("Decrypting secret key from Alice's UserKey using Bob's private key did not fail")
except ValueError:
pass
class SecretTestCase(TestCase):
@classmethod
def setUpTestData(cls):
# Generate a random key for encryption/decryption of secrets
cls.secret_key = generate_random_key()
def test_01_encrypt_decrypt(self):
"""
Test basic encryption and decryption functionality using a random master key.
"""
plaintext = string.printable * 2
s = Secret(plaintext=plaintext)
s.encrypt(self.secret_key)
# Ensure plaintext is deleted upon encryption
self.assertIsNone(s.plaintext, "Plaintext must be None after encrypting.")
# Enforce minimum ciphertext length
self.assertGreaterEqual(len(s.ciphertext), 80, "Ciphertext must be at least 80 bytes (16B IV + 64B+ ciphertext")
# Ensure proper hashing algorithm is used
hasher, iterations, salt, sha256 = s.hash.split('$')
self.assertEqual(hasher, 'pbkdf2_sha256', "Hashing algorithm has been modified to: {}".format(hasher))
self.assertGreaterEqual(int(iterations), SecretValidationHasher.iterations, "Insufficient iteration count ({}) for hash".format(iterations))
self.assertGreaterEqual(len(salt), 12, "Hash salt is too short ({} chars)".format(len(salt)))
# Test hash validation
self.assertTrue(s.validate(plaintext), "Plaintext does not validate against the generated hash")
self.assertFalse(s.validate(""), "Empty plaintext validated against hash")
self.assertFalse(s.validate("Invalid plaintext"), "Invalid plaintext validated against hash")
# Test decryption
s.decrypt(self.secret_key)
self.assertEqual(plaintext, s.plaintext, "Decrypting Secret returned incorrect plaintext")
def test_02_ciphertext_uniqueness(self):
"""
Generate 50 Secrets using the same plaintext and check for duplicate IVs or payloads.
"""
plaintext = "1234567890abcdef"
ivs = []
ciphertexts = []
for i in range(1, 51):
s = Secret(plaintext=plaintext)
s.encrypt(self.secret_key)
ivs.append(s.ciphertext[0:16])
ciphertexts.append(s.ciphertext[16:32])
duplicate_ivs = [i for i, x in enumerate(ivs) if ivs.count(x) > 1]
self.assertEqual(duplicate_ivs, [], "One or more duplicate IVs found!")
duplicate_ciphertexts = [i for i, x in enumerate(ciphertexts) if ciphertexts.count(x) > 1]
self.assertEqual(duplicate_ciphertexts, [], "One or more duplicate ciphertexts (first blocks) found!")
def test_minimum_length(self):
"""
Test enforcement of the minimum length for ciphertexts.
"""
plaintext = 'A' # One-byte plaintext
secret = Secret(plaintext=plaintext)
secret.encrypt(self.secret_key)
# 16B IV + 2B length + 1B secret + 61B padding = 80 bytes
self.assertEqual(len(secret.ciphertext), 80)
self.assertIsNone(secret.plaintext)
secret.decrypt(self.secret_key)
self.assertEqual(secret.plaintext, plaintext)
def test_maximum_length(self):
"""
Test encrypting a plaintext value of the maximum length.
"""
plaintext = '0123456789abcdef' * 4096
plaintext = plaintext[:65535] # 65,535 chars
secret = Secret(plaintext=plaintext)
secret.encrypt(self.secret_key)
# 16B IV + 2B length + 65535B secret + 15B padding = 65568 bytes
self.assertEqual(len(secret.ciphertext), 65568)
self.assertIsNone(secret.plaintext)
secret.decrypt(self.secret_key)
self.assertEqual(secret.plaintext, plaintext)

View File

@ -1,123 +0,0 @@
import base64
from django.test import override_settings
from django.urls import reverse
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from secrets.models import Secret, SecretRole, SessionKey, UserKey
from utilities.testing import ViewTestCases
from .constants import PRIVATE_KEY, PUBLIC_KEY
class SecretRoleTestCase(ViewTestCases.OrganizationalObjectViewTestCase):
model = SecretRole
@classmethod
def setUpTestData(cls):
SecretRole.objects.bulk_create([
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
SecretRole(name='Secret Role 3', slug='secret-role-3'),
])
cls.form_data = {
'name': 'Secret Role X',
'slug': 'secret-role-x',
'description': 'A secret role',
}
cls.csv_data = (
"name,slug",
"Secret Role 4,secret-role-4",
"Secret Role 5,secret-role-5",
"Secret Role 6,secret-role-6",
)
cls.bulk_edit_data = {
'description': 'New description',
}
# TODO: Change base class to PrimaryObjectViewTestCase
class SecretTestCase(
ViewTestCases.GetObjectViewTestCase,
ViewTestCases.GetObjectChangelogViewTestCase,
ViewTestCases.DeleteObjectViewTestCase,
ViewTestCases.ListObjectsViewTestCase,
ViewTestCases.BulkEditObjectsViewTestCase,
ViewTestCases.BulkDeleteObjectsViewTestCase
):
model = Secret
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site-1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
devicetype = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
devicerole = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', site=site, device_type=devicetype, device_role=devicerole),
Device(name='Device 2', site=site, device_type=devicetype, device_role=devicerole),
Device(name='Device 3', site=site, device_type=devicetype, device_role=devicerole),
)
Device.objects.bulk_create(devices)
secretroles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
)
SecretRole.objects.bulk_create(secretroles)
# Create one secret per device to allow bulk-editing of names (which must be unique per device/role)
Secret.objects.bulk_create((
Secret(assigned_object=devices[0], role=secretroles[0], name='Secret 1', ciphertext=b'1234567890'),
Secret(assigned_object=devices[1], role=secretroles[0], name='Secret 2', ciphertext=b'1234567890'),
Secret(assigned_object=devices[2], role=secretroles[0], name='Secret 3', ciphertext=b'1234567890'),
))
cls.form_data = {
'assigned_object_type': 'dcim.device',
'assigned_object_id': devices[1].pk,
'role': secretroles[1].pk,
'name': 'Secret X',
}
cls.bulk_edit_data = {
'role': secretroles[1].pk,
'name': 'New name',
}
def setUp(self):
super().setUp()
# Set up a master key for the test user
userkey = UserKey(user=self.user, public_key=PUBLIC_KEY)
userkey.save()
master_key = userkey.get_master_key(PRIVATE_KEY)
self.session_key = SessionKey(userkey=userkey)
self.session_key.save(master_key)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_import_objects(self):
self.add_permissions('secrets.add_secret')
device = Device.objects.get(name='Device 1')
csv_data = (
"device,role,name,plaintext",
f"{device.name},Secret Role 1,Secret 4,abcdefghij",
f"{device.name},Secret Role 1,Secret 5,abcdefghij",
f"{device.name},Secret Role 1,Secret 6,abcdefghij",
)
# Set the session_key cookie on the request
session_key = base64.b64encode(self.session_key.key).decode('utf-8')
self.client.cookies['session_key'] = session_key
response = self.client.post(reverse('secrets:secret_import'), {'csv': '\n'.join(csv_data)})
self.assertHttpStatus(response, 200)
self.assertEqual(Secret.objects.count(), 6)

View File

@ -1,33 +0,0 @@
from django.urls import path
from extras.views import ObjectChangeLogView, ObjectJournalView
from . import views
from .models import Secret, SecretRole
app_name = 'secrets'
urlpatterns = [
# Secret roles
path('secret-roles/', views.SecretRoleListView.as_view(), name='secretrole_list'),
path('secret-roles/add/', views.SecretRoleEditView.as_view(), name='secretrole_add'),
path('secret-roles/import/', views.SecretRoleBulkImportView.as_view(), name='secretrole_import'),
path('secret-roles/edit/', views.SecretRoleBulkEditView.as_view(), name='secretrole_bulk_edit'),
path('secret-roles/delete/', views.SecretRoleBulkDeleteView.as_view(), name='secretrole_bulk_delete'),
path('secret-roles/<int:pk>/', views.SecretRoleView.as_view(), name='secretrole'),
path('secret-roles/<int:pk>/edit/', views.SecretRoleEditView.as_view(), name='secretrole_edit'),
path('secret-roles/<int:pk>/delete/', views.SecretRoleDeleteView.as_view(), name='secretrole_delete'),
path('secret-roles/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='secretrole_changelog', kwargs={'model': SecretRole}),
# Secrets
path('secrets/', views.SecretListView.as_view(), name='secret_list'),
path('secrets/add/', views.SecretEditView.as_view(), name='secret_add'),
path('secrets/import/', views.SecretBulkImportView.as_view(), name='secret_import'),
path('secrets/edit/', views.SecretBulkEditView.as_view(), name='secret_bulk_edit'),
path('secrets/delete/', views.SecretBulkDeleteView.as_view(), name='secret_bulk_delete'),
path('secrets/<int:pk>/', views.SecretView.as_view(), name='secret'),
path('secrets/<int:pk>/edit/', views.SecretEditView.as_view(), name='secret_edit'),
path('secrets/<int:pk>/delete/', views.SecretDeleteView.as_view(), name='secret_delete'),
path('secrets/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='secret_changelog', kwargs={'model': Secret}),
path('secrets/<int:pk>/journal/', ObjectJournalView.as_view(), name='secret_journal', kwargs={'model': Secret}),
]

View File

@ -1,31 +0,0 @@
import os
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
def generate_random_key(bits=256):
"""
Generate a random encryption key. Sizes is given in bits and must be in increments of 32.
"""
if bits % 32:
raise Exception("Invalid key size ({}). Key sizes must be in increments of 32 bits.".format(bits))
return os.urandom(int(bits / 8))
def encrypt_master_key(master_key, public_key):
"""
Encrypt a secret key with the provided public RSA key.
"""
key = RSA.importKey(public_key)
cipher = PKCS1_OAEP.new(key)
return cipher.encrypt(master_key)
def decrypt_master_key(master_key_cipher, private_key):
"""
Decrypt a secret key with the provided private RSA key.
"""
key = RSA.importKey(private_key)
cipher = PKCS1_OAEP.new(key)
return cipher.decrypt(master_key_cipher)

View File

@ -1,231 +0,0 @@
import base64
import logging
from django.contrib import messages
from django.shortcuts import redirect, render
from django.utils.html import escape
from django.utils.safestring import mark_safe
from netbox.views import generic
from utilities.tables import paginate_table
from utilities.utils import count_related
from . import filtersets, forms, tables
from .models import SecretRole, Secret, SessionKey, UserKey
def get_session_key(request):
"""
Extract and decode the session key sent with a request. Returns None if no session key was provided.
"""
session_key = request.COOKIES.get('session_key', None)
if session_key is not None:
return base64.b64decode(session_key)
return session_key
#
# Secret roles
#
class SecretRoleListView(generic.ObjectListView):
queryset = SecretRole.objects.annotate(
secret_count=count_related(Secret, 'role')
)
table = tables.SecretRoleTable
class SecretRoleView(generic.ObjectView):
queryset = SecretRole.objects.all()
def get_extra_context(self, request, instance):
secrets = Secret.objects.restrict(request.user, 'view').filter(
role=instance
)
secrets_table = tables.SecretTable(secrets)
secrets_table.columns.hide('role')
paginate_table(secrets_table, request)
return {
'secrets_table': secrets_table,
}
class SecretRoleEditView(generic.ObjectEditView):
queryset = SecretRole.objects.all()
model_form = forms.SecretRoleForm
class SecretRoleDeleteView(generic.ObjectDeleteView):
queryset = SecretRole.objects.all()
class SecretRoleBulkImportView(generic.BulkImportView):
queryset = SecretRole.objects.all()
model_form = forms.SecretRoleCSVForm
table = tables.SecretRoleTable
class SecretRoleBulkEditView(generic.BulkEditView):
queryset = SecretRole.objects.annotate(
secret_count=count_related(Secret, 'role')
)
filterset = filtersets.SecretRoleFilterSet
table = tables.SecretRoleTable
form = forms.SecretRoleBulkEditForm
class SecretRoleBulkDeleteView(generic.BulkDeleteView):
queryset = SecretRole.objects.annotate(
secret_count=count_related(Secret, 'role')
)
table = tables.SecretRoleTable
#
# Secrets
#
class SecretListView(generic.ObjectListView):
queryset = Secret.objects.all()
filterset = filtersets.SecretFilterSet
filterset_form = forms.SecretFilterForm
table = tables.SecretTable
action_buttons = ('add', 'import', 'export')
class SecretView(generic.ObjectView):
queryset = Secret.objects.all()
class SecretEditView(generic.ObjectEditView):
queryset = Secret.objects.all()
model_form = forms.SecretForm
template_name = 'secrets/secret_edit.html'
def dispatch(self, request, *args, **kwargs):
# Check that the user has a valid UserKey
try:
uk = UserKey.objects.get(user=request.user)
except UserKey.DoesNotExist:
messages.warning(request, "This operation requires an active user key, but you don't have one.")
return redirect('user:userkey')
if not uk.is_active():
messages.warning(request, "This operation is not available. Your user key has not been activated.")
return redirect('user:userkey')
return super().dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
logger = logging.getLogger('netbox.views.ObjectEditView')
session_key = get_session_key(request)
secret = self.get_object(kwargs)
form = self.model_form(request.POST, instance=secret)
if form.is_valid():
logger.debug("Form validation was successful")
secret = form.save(commit=False)
# We must have a session key in order to set the plaintext of a Secret
if form.cleaned_data['plaintext'] and session_key is None:
logger.debug("Unable to proceed: No session key was provided with the request")
form.add_error(None, "No session key was provided with the request. Unable to encrypt secret data.")
elif form.cleaned_data['plaintext']:
master_key = None
try:
sk = SessionKey.objects.get(userkey__user=request.user)
master_key = sk.get_master_key(session_key)
except SessionKey.DoesNotExist:
logger.debug("Unable to proceed: User has no session key assigned")
form.add_error(None, "No session key found for this user.")
if master_key is not None:
logger.debug("Successfully resolved master key for encryption")
secret.plaintext = str(form.cleaned_data['plaintext'])
secret.encrypt(master_key)
secret.save()
form.save_m2m()
msg = '{} secret'.format('Created' if not form.instance.pk else 'Modified')
logger.info(f"{msg} {secret} (PK: {secret.pk})")
msg = f'{msg} <a href="{secret.get_absolute_url()}">{escape(secret)}</a>'
messages.success(request, mark_safe(msg))
return redirect(self.get_return_url(request, secret))
else:
logger.debug("Form validation failed")
return render(request, self.template_name, {
'obj': secret,
'obj_type': self.queryset.model._meta.verbose_name,
'form': form,
'return_url': self.get_return_url(request, secret),
})
class SecretDeleteView(generic.ObjectDeleteView):
queryset = Secret.objects.all()
class SecretBulkImportView(generic.BulkImportView):
queryset = Secret.objects.all()
model_form = forms.SecretCSVForm
table = tables.SecretTable
template_name = 'secrets/secret_import.html'
widget_attrs = {'class': 'requires-session-key'}
master_key = None
def _save_obj(self, obj_form, request):
"""
Encrypt each object before saving it to the database.
"""
obj = obj_form.save(commit=False)
obj.encrypt(self.master_key)
obj.save()
return obj
def post(self, request):
# Grab the session key from cookies.
session_key = request.COOKIES.get('session_key')
if session_key:
# Attempt to derive the master key using the provided session key.
try:
sk = SessionKey.objects.get(userkey__user=request.user)
self.master_key = sk.get_master_key(base64.b64decode(session_key))
except SessionKey.DoesNotExist:
messages.error(request, "No session key found for this user.")
if self.master_key is not None:
return super().post(request)
else:
messages.error(request, "Invalid private key! Unable to encrypt secret data.")
else:
messages.error(request, "No session key was provided with the request. Unable to encrypt secret data.")
return render(request, self.template_name, {
'form': self._import_form(request.POST),
'fields': self.model_form().fields,
'obj_type': self.model_form._meta.model._meta.verbose_name,
'return_url': self.get_return_url(request),
})
class SecretBulkEditView(generic.BulkEditView):
queryset = Secret.objects.prefetch_related('role')
filterset = filtersets.SecretFilterSet
table = tables.SecretTable
form = forms.SecretBulkEditForm
class SecretBulkDeleteView(generic.BulkDeleteView):
queryset = Secret.objects.prefetch_related('role')
filterset = filtersets.SecretFilterSet
table = tables.SecretTable

View File

@ -274,23 +274,6 @@
</div> </div>
</div> </div>
{% endif %} {% endif %}
{% if perms.secrets.view_secret %}
<div class="card">
<h5 class="card-header">
Secrets
</h5>
<div class="card-body">
{% include 'secrets/inc/assigned_secrets.html' %}
</div>
{% if perms.secrets.add_secret %}
<div class="card-footer text-end noprint">
<a href="{% url 'secrets:secret_add' %}?device={{ object.pk }}&return_url={{ object.get_absolute_url }}" class="btn btn-sm btn-primary">
<i class="mdi mdi-plus-thick"></i> Add Secret
</a>
</div>
{% endif %}
</div>
{% endif %}
<div class="card"> <div class="card">
<h5 class="card-header"> <h5 class="card-header">
Services Services
@ -377,5 +360,4 @@
</div> </div>
</div> </div>
</div> </div>
{% include 'secrets/inc/private_key_modal.html' %}
{% endblock %} {% endblock %}

View File

@ -1,29 +0,0 @@
{% if secrets %}
<form id="secret_form">
{% csrf_token %}
</form>
<table class="table table-hover">
{% for secret in secrets %}
<tr>
<td><a href="{% url 'secrets:secret' pk=secret.pk %}">{{ secret.role }}</a></td>
<td>{{ secret.name }}</td>
<td id="secret_{{ secret.pk }}">********</td>
<td class="text-end noprint">
<button class="btn btn-sm btn-success unlock-secret" secret-id="{{ secret.pk }}">
<i class="mdi mdi-lock"></i> Unlock
</button>
<button class="btn btn-sm btn-outline-dark copy-secret collapse" secret-id="{{ secret.pk }}" data-clipboard-target="#secret_{{ secret.pk }}">
<i class="mdi mdi-content-copy"></i> Copy
</button>
<button class="btn btn-sm btn-danger lock-secret collapse" secret-id="{{ secret.pk }}">
<i class="mdi mdi-lock-open"></i> Lock
</button>
</td>
</tr>
{% endfor %}
</table>
{% else %}
<div class="text-muted">
None found
</div>
{% endif %}

View File

@ -1,27 +0,0 @@
<div class="modal fade" id="privkey_modal" tabindex="-1" role="dialog">
<div class="modal-dialog modal-md" role="document">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title" id="privkey_modal_title">
<span class="mdi mdi-lock" aria-hidden="true"></span>
Enter Private RSA Key
</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body">
<p class="small text-muted">
You do not have an active session key. To request one, please provide your private RSA key below.
Once retrieved, your session key will be saved for future requests.
</p>
<div class="form-group">
<textarea class="form-control font-monospace" id="user_privkey" style="height: 300px;"></textarea>
</div>
</div>
<div class="modal-footer float-end">
<button id="request_session_key" class="btn btn-primary" data-bs-dismiss="modal">
Request Session Key
</button>
</div>
</div>
</div>
</div>

View File

@ -1,82 +0,0 @@
{% extends 'generic/object.html' %}
{% load buttons %}
{% load helpers %}
{% load static %}
{% load plugins %}
{% block breadcrumbs %}
<li class="breadcrumb-item"><a href="{% url 'secrets:secret_list' %}">Secrets</a></li>
<li class="breadcrumb-item"><a href="{% url 'secrets:secret_list' %}?role_id={{ object.role.pk }}">{{ object.role }}</a></li>
<li class="breadcrumb-item"><a href="{{ object.assigned_object.get_absolute_url }}">{{ object.assigned_object }}</a></li>
<li class="breadcrumb-item">{{ object }}</li>
{% endblock %}
{% block content %}
<div class="row">
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">
Secret Attributes
</h5>
<div class="card-body">
<table class="table table-hover">
<tr>
<th scope="row">Assigned Object</th>
<td>
<a href="{{ object.assigned_object.get_absolute_url }}">{{ object.assigned_object }}</a>
</td>
</tr>
<tr>
<th scope="row">Role</th>
<td>
<a href="{{ object.role.get_absolute_url }}">{{ object.role }}</a>
</td>
</tr>
<tr>
<th scope="row">Name</th>
<td>{{ object.name|placeholder }}</td>
</tr>
</table>
</div>
</div>
{% include 'inc/custom_fields_panel.html' %}
{% plugin_left_page object %}
</div>
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">
Secret Data
</h5>
<div class="card-body">
<form id="secret_form">
{% csrf_token %}
</form>
<div class="row">
<div class="col col-md-2">Secret</div>
<div class="col col-md-6"><code id="secret_{{ object.pk }}">********</code></div>
<div class="col col-md-4 text-end noprint">
<button class="btn btn-sm btn-success unlock-secret" secret-id="{{ object.pk }}">
<i class="mdi mdi-lock"></i> Unlock
</button>
<button class="btn btn-sm btn-outline-dark copy-secret d-none" secret-id="{{ object.pk }}" data-clipboard-target="#secret_{{ object.pk }}">
<i class="mdi mdi-content-copy"></i> Copy
</button>
<button class="btn btn-sm btn-danger lock-secret d-none" secret-id="{{ object.pk }}">
<i class="mdi mdi-lock-open"></i> Lock
</button>
</div>
</div>
</div>
</div>
{% include 'extras/inc/tags_panel.html' with tags=object.tags.all url='secrets:secret_list' %}
{% plugin_right_page object %}
</div>
</div>
<div class="row">
<div class="col col-md-12">
{% plugin_full_width_page object %}
</div>
</div>
{% include 'secrets/inc/private_key_modal.html' %}
{% endblock %}

View File

@ -1,8 +0,0 @@
{% extends 'utilities/confirmation_form.html' %}
{% load form_helpers %}
{% block title %}Delete secret {{ secret }}?{% endblock %}
{% block message %}
<p>Are you sure you want to delete secret {{ secret }}?</p>
{% endblock %}

View File

@ -1,88 +0,0 @@
{% extends 'generic/object_edit.html' %}
{% load static %}
{% load form_helpers %}
{% block title %}{% if obj.pk %}Editing {{ obj }}{% else %}Add a Secret{% endif %}{% endblock %}
{% block form %}
{% render_errors form %}
{{ form.private_key }}
<div class="field-group">
<h4>Secret Assignment</h4>
<ul class="nav nav-tabs mb-3" role="tablist">
<li class="nav-item" role="presentation">
<button
role="tab"
type="button"
id="device_tab"
data-bs-toggle="tab"
class="nav-link{% if not vm_tab_active %} active{% endif %}"
data-bs-target="#device"
aria-controls="device"
>
Device
</button>
</li>
<li class="nav-item" role="presentation">
<button
role="tab"
type="button"
id="vm_tab"
data-bs-toggle="tab"
class="nav-link{% if vm_tab_active %} active{% endif %}"
data-bs-target="#virtualmachine"
aria-controls="virtualmachine"
>
Virtual Machine
</button>
</li>
</ul>
{% with vm_tab_active=form.initial.virtual_machine %}
<div class="tab-content">
<div class="tab-pane{% if not vm_tab_active %} active{% endif %}" id="device">
{% render_field form.device %}
</div>
<div class="tab-pane{% if vm_tab_active %} active{% endif %}" id="virtualmachine">
{% render_field form.virtual_machine %}
</div>
</div>
{% endwith %}
{% render_field form.role %}
{% render_field form.name %}
{% render_field form.userkeys %}
{% render_field form.tags %}
</div>
<div class="field-group">
<h4>Secret Data</h4>
{% if obj.pk %}
<div class="form-floating mb-3">
<input class="form-control" value="********" id="secret_{{ obj.pk }}" />
<label class="required">Current Plain Text</label>
</div>
<div class="col col-md-2 text-end">
<button class="btn btn-sm btn-success unlock-secret" data-secret-id="{{ obj.pk }}">
<i class="mdi mdi-lock"></i> Unlock
</button>
<button class="btn btn-sm, btn-danger lock-secret collapse" data-secret-id="{{ obj.pk }}">
<i class="mdi mdi-lock-open"></i> Lock
</button>
</div>
{% endif %}
{% render_field form.plaintext %}
{% render_field form.plaintext2 %}
</div>
{% if form.custom_fields %}
<div class="card">
<h5 class="card-header">Custom Fields</h5>
<div class="card-body">
{% render_custom_fields form %}
</div>
</div>
{% endif %}
{% include 'secrets/inc/private_key_modal.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'generic/object_bulk_import.html' %}
{% load static %}
{% block content %}
{{ block.super }}
{% include 'secrets/inc/private_key_modal.html' %}
{% endblock %}

View File

@ -1,64 +0,0 @@
{% extends 'generic/object.html' %}
{% load helpers %}
{% load plugins %}
{% block breadcrumbs %}
<li class="breadcrumb-item"><a href="{% url 'secrets:secretrole_list' %}">Secret Roles</a></li>
<li class="breadcrumb-item">{{ object }}</li>
{% endblock %}
{% block content %}
<div class="row mb-3">
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">
Secret Role
</h5>
<div class="card-body">
<table class="table table-hover attr-table">
<tr>
<th scope="row">Name</th>
<td>{{ object.name }}</td>
</tr>
<tr>
<th scope="row">Description</th>
<td>{{ object.description|placeholder }}</td>
</tr>
<tr>
<th scope="row">Secrets</th>
<td>
<a href="{% url 'secrets:secret_list' %}?role_id={{ object.pk }}">{{ secrets_table.rows|length }}</a>
</td>
</tr>
</table>
</div>
</div>
{% plugin_left_page object %}
</div>
<div class="col col-md-6">
{% include 'inc/custom_fields_panel.html' %}
{% plugin_right_page object %}
</div>
</div>
<div class="row">
<div class="col col-md-12">
<div class="card">
<h5 class="card-header">
Secrets
</h5>
<div class="card-body">
{% include 'inc/table.html' with table=secrets_table %}
</div>
{% if perms.secrets.add_secret %}
<div class="card-footer text-end noprint">
<a href="{% url 'secrets:secret_add' %}?role={{ object.pk }}" class="btn btn-sm btn-primary">
<span class="mdi mdi-plus-thick" aria-hidden="true"></span> Add Secret
</a>
</div>
{% endif %}
</div>
{% include 'inc/paginator.html' with paginator=secrets_table.paginator page=secrets_table.page %}
{% plugin_full_width_page object %}
</div>
</div>
{% endblock %}

View File

@ -12,7 +12,6 @@
<a class="nav-link nav-item text-start{% if active_tab == 'change-password' %} active{% endif %}" href="{% url 'user:change_password' %}">Change Password</a> <a class="nav-link nav-item text-start{% if active_tab == 'change-password' %} active{% endif %}" href="{% url 'user:change_password' %}">Change Password</a>
{% endif %} {% endif %}
<a class="nav-link nav-item text-start{% if active_tab == 'api-tokens' %} active{% endif %}" href="{% url 'user:token_list' %}">API Tokens</a> <a class="nav-link nav-item text-start{% if active_tab == 'api-tokens' %} active{% endif %}" href="{% url 'user:token_list' %}">API Tokens</a>
<a class="nav-link nav-item text-start{% if active_tab == 'userkey' %} active{% endif %}" href="{% url 'user:userkey' %}">User Key</a>
</nav> </nav>
</div> </div>
<div class="col-sm-9 col-md-8 px-4"> <div class="col-sm-9 col-md-8 px-4">

View File

@ -1,5 +0,0 @@
{% extends 'generic/object_delete.html' %}
{% block message %}
<p>Are you sure you want to delete your session key?</p>
{% endblock %}

View File

@ -1,53 +0,0 @@
{% extends 'users/base.html' %}
{% block title %}User Key{% endblock %}
{% block usercontent %}
{% if object %}
<div class="float-end noprint">
<a href="{% url 'user:userkey_edit' %}" class="btn btn-warning">
<span class="mdi mdi-pencil" aria-hidden="true"></span>
Edit User Key
</a>
</div>
<h4>
Your User Key is
{% if object.is_active %}
<span class="badge bg-success">Active</span>
{% else %}
<span class="badge bg-danger">Inactive</span>
{% endif %}
</h4>
<p>
<small class="text-muted">Created {{ object.created }} &middot; Updated <span title="{{ object.last_updated }}">{{ object.last_updated|timesince }}</span> ago</small>
</p>
{% if not object.is_active %}
<div class="alert alert-warning" role="alert">
<i class="mdi mdi-alert"></i>
Your user key is inactive. Ask an administrator to enable it for you.
</div>
{% endif %}
<pre class="copyable">{{ object.public_key }}</pre>
<hr />
{% if object.session_key %}
<div class="float-end noprint">
<a href="{% url 'user:sessionkey_delete' %}" class="btn btn-danger">
<span class="mdi mdi-trash-can-outline" aria-hidden="true"></span>
Delete Session Key
</a>
</div>
<h4>Session Key: <span class="badge bg-success">Active</span></h4>
<small class="text-muted">Created {{ object.session_key.created }}</small>
{% else %}
<h4>No Active Session Key</h4>
{% endif %}
{% else %}
<p>You don't have a user key on file.</p>
<p>
<a href="{% url 'user:userkey_edit' %}" class="btn btn-primary">
<span class="mdi mdi-plus-thick" aria-hidden="true"></span>
Create a User Key
</a>
</p>
{% endif %}
{% endblock %}

View File

@ -1,55 +0,0 @@
{% extends 'users/base.html' %}
{% load static %}
{% load form_helpers %}
{% block title %}User Key{% endblock %}
{% block usercontent %}
{% if object.is_active %}
<div class="alert alert-danger" role="alert">
<strong>Warning:</strong> Changing your public key will require your user key to be re-activated by another
user. You will be unable to retrieve any secrets until your key has been reactivated.
</div>
{% endif %}
<form action="." method="post" class="form">
{% csrf_token %}
<div class="field-group">
{% render_field form.public_key %}
</div>
<div class="row my-3">
<div class="col-4 text-start">
<button type="button" class="btn btn-info" id="generate_keypair" data-bs-toggle="modal" data-bs-target="#new_keypair_modal">Generate a New Key Pair</button>
</div>
<div class="col-8 text-end">
<a href="{% url 'user:userkey' %}" class="btn btn-outline-danger">Cancel</a>
<button type="submit" name="_update" class="btn btn-primary">Save</button>
</div>
</div>
</form>
<div class="modal fade" id="new_keypair_modal" tabindex="-1" role="dialog">
<div class="modal-dialog modal-lg" role="document">
<div class="modal-content">
<div class="modal-header">
<h4 class="modal-title" id="new_keypair_modal_title">
New RSA Key Pair
</h4>
<button type="button" class="btn btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body">
<div class="field-group">
<h5>New Public Key</h5>
<textarea class="form-control" rows="10" id="new_pubkey" style="height: 250px;font-family:var(--bs-font-monospace);"></textarea>
</div>
<div class="field-group">
<h5>New Private Key</h5>
<textarea class="form-control" rows="10" id="new_privkey" style="height: 250px;font-family:var(--bs-font-monospace);"></textarea>
</div>
</div>
<div class="modal-footer text-center">
<button type="button" class="btn btn-danger" id="use_new_pubkey" data-bs-dismiss="modal">I Saved My New Private Key</button>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@ -161,23 +161,6 @@
</table> </table>
</div> </div>
</div> </div>
{% if perms.secrets.view_secret %}
<div class="card">
<h5 class="card-header">
Secrets
</h5>
<div class="card-body">
{% include 'secrets/inc/assigned_secrets.html' %}
</div>
{% if perms.secrets.add_secret %}
<div class="card-footer text-end noprint">
<a href="{% url 'secrets:secret_add' %}?virtual_machine={{ object.pk }}&return_url={{ object.get_absolute_url }}" class="btn btn-sm btn-primary">
<span class="mdi mdi-plus-thick" aria-hidden="true"></span> Add Secret
</a>
</div>
{% endif %}
</div>
{% endif %}
<div class="card"> <div class="card">
<h5 class="card-header"> <h5 class="card-header">
Services Services
@ -209,5 +192,4 @@
{% plugin_full_width_page object %} {% plugin_full_width_page object %}
</div> </div>
</div> </div>
{% include 'secrets/inc/private_key_modal.html' %}
{% endblock %} {% endblock %}

View File

@ -10,7 +10,6 @@ def replicate_permissions(apps, schema_editor):
""" """
Permission = apps.get_model('auth', 'Permission') Permission = apps.get_model('auth', 'Permission')
ObjectPermission = apps.get_model('users', 'ObjectPermission') ObjectPermission = apps.get_model('users', 'ObjectPermission')
SecretRole = apps.get_model('secrets', 'SecretRole')
# TODO: Optimize this iteration so that ObjectPermissions with identical sets of users and groups # TODO: Optimize this iteration so that ObjectPermissions with identical sets of users and groups
# are combined into a single ObjectPermission instance. # are combined into a single ObjectPermission instance.
@ -27,37 +26,18 @@ def replicate_permissions(apps, schema_editor):
if perm.group_set.exists() or perm.user_set.exists(): if perm.group_set.exists() or perm.user_set.exists():
# Handle replication of SecretRole user/group assignments for Secrets # Handle replication of SecretRole user/group assignments for Secrets
if perm.codename == 'view_secret': obj_perm = ObjectPermission(
for secretrole in SecretRole.objects.prefetch_related('users', 'groups'): # Copy name from original Permission object
obj_perm = ObjectPermission( name=f'{perm.content_type.app_label}.{perm.codename}'[:100],
name=f'{perm.content_type.app_label}.{perm.codename} ({secretrole.name})'[:100], actions=[action]
actions=[action], )
constraints={'role__name': secretrole.name} obj_perm.save()
) obj_perm.object_types.add(perm.content_type)
obj_perm.save()
obj_perm.object_types.add(perm.content_type)
# Assign only users/groups who both a) are assigned to the SecretRole and b) have the view_secret
# permission
obj_perm.groups.add(
*list(secretrole.groups.filter(permissions=perm))
)
obj_perm.users.add(*list(secretrole.users.filter(
Q(user_permissions=perm) | Q(groups__permissions=perm)
)))
else: if perm.group_set.exists():
obj_perm = ObjectPermission( obj_perm.groups.add(*list(perm.group_set.all()))
# Copy name from original Permission object if perm.user_set.exists():
name=f'{perm.content_type.app_label}.{perm.codename}'[:100], obj_perm.users.add(*list(perm.user_set.all()))
actions=[action]
)
obj_perm.save()
obj_perm.object_types.add(perm.content_type)
if perm.group_set.exists():
obj_perm.groups.add(*list(perm.group_set.all()))
if perm.user_set.exists():
obj_perm.users.add(*list(perm.user_set.all()))
class Migration(migrations.Migration): class Migration(migrations.Migration):

View File

@ -12,8 +12,5 @@ urlpatterns = [
path('api-tokens/add/', views.TokenEditView.as_view(), name='token_add'), path('api-tokens/add/', views.TokenEditView.as_view(), name='token_add'),
path('api-tokens/<int:pk>/edit/', views.TokenEditView.as_view(), name='token_edit'), path('api-tokens/<int:pk>/edit/', views.TokenEditView.as_view(), name='token_edit'),
path('api-tokens/<int:pk>/delete/', views.TokenDeleteView.as_view(), name='token_delete'), path('api-tokens/<int:pk>/delete/', views.TokenDeleteView.as_view(), name='token_delete'),
path('user-key/', views.UserKeyView.as_view(), name='userkey'),
path('user-key/edit/', views.UserKeyEditView.as_view(), name='userkey_edit'),
path('session-key/delete/', views.SessionKeyDeleteView.as_view(), name='sessionkey_delete'),
] ]

View File

@ -14,8 +14,6 @@ from django.utils.http import is_safe_url
from django.views.decorators.debug import sensitive_post_parameters from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic import View from django.views.generic import View
from secrets.forms import UserKeyForm
from secrets.models import SessionKey, UserKey
from utilities.forms import ConfirmationForm from utilities.forms import ConfirmationForm
from .forms import LoginForm, PasswordChangeForm, TokenForm from .forms import LoginForm, PasswordChangeForm, TokenForm
from .models import Token from .models import Token
@ -184,93 +182,6 @@ class ChangePasswordView(LoginRequiredMixin, View):
}) })
class UserKeyView(LoginRequiredMixin, View):
template_name = 'users/userkey.html'
def get(self, request):
try:
userkey = UserKey.objects.get(user=request.user)
except UserKey.DoesNotExist:
userkey = None
return render(request, self.template_name, {
'object': userkey,
'active_tab': 'userkey',
})
class UserKeyEditView(LoginRequiredMixin, View):
template_name = 'users/userkey_edit.html'
def dispatch(self, request, *args, **kwargs):
try:
self.userkey = UserKey.objects.get(user=request.user)
except UserKey.DoesNotExist:
self.userkey = UserKey(user=request.user)
return super().dispatch(request, *args, **kwargs)
def get(self, request):
form = UserKeyForm(instance=self.userkey)
return render(request, self.template_name, {
'object': self.userkey,
'form': form,
'active_tab': 'userkey',
})
def post(self, request):
form = UserKeyForm(data=request.POST, instance=self.userkey)
if form.is_valid():
uk = form.save(commit=False)
uk.user = request.user
uk.save()
messages.success(request, "Your user key has been saved.")
return redirect('user:userkey')
return render(request, self.template_name, {
'userkey': self.userkey,
'form': form,
'active_tab': 'userkey',
})
class SessionKeyDeleteView(LoginRequiredMixin, View):
def get(self, request):
sessionkey = get_object_or_404(SessionKey, userkey__user=request.user)
form = ConfirmationForm()
return render(request, 'users/sessionkey_delete.html', {
'obj_type': sessionkey._meta.verbose_name,
'form': form,
'return_url': reverse('user:userkey'),
})
def post(self, request):
sessionkey = get_object_or_404(SessionKey, userkey__user=request.user)
form = ConfirmationForm(request.POST)
if form.is_valid():
# Delete session key
sessionkey.delete()
messages.success(request, "Session key deleted")
# Delete cookie
response = redirect('user:userkey')
response.delete_cookie('session_key')
return response
return render(request, 'users/sessionkey_delete.html', {
'obj_type': sessionkey._meta.verbose_name,
'form': form,
'return_url': reverse('user:userkey'),
})
# #
# API tokens # API tokens
# #

View File

@ -220,19 +220,6 @@ POWER_MENU = Menu(
), ),
) )
SECRETS_MENU = Menu(
label="Secrets",
groups=(
MenuGroup(
label="Secrets",
items=(
MenuItem(label="Secrets", url="secrets:secret_list"),
MenuItem(label="Secret Roles", url="secrets:secretrole_list"),
),
),
),
)
OTHER_MENU = Menu( OTHER_MENU = Menu(
label="Other", label="Other",
groups=( groups=(
@ -261,7 +248,6 @@ MENUS = (
VIRTUALIZATION_MENU, VIRTUALIZATION_MENU,
CIRCUITS_MENU, CIRCUITS_MENU,
POWER_MENU, POWER_MENU,
SECRETS_MENU,
OTHER_MENU, OTHER_MENU,
) )

View File

@ -284,12 +284,6 @@ class VirtualMachine(PrimaryModel, ConfigContextModel):
comments = models.TextField( comments = models.TextField(
blank=True blank=True
) )
secrets = GenericRelation(
to='secrets.Secret',
content_type_field='assigned_object_type',
object_id_field='assigned_object_id',
related_query_name='virtual_machine'
)
objects = ConfigContextModelQuerySet.as_manager() objects = ConfigContextModelQuerySet.as_manager()

View File

@ -10,7 +10,6 @@ from extras.views import ObjectConfigContextView
from ipam.models import IPAddress, Service from ipam.models import IPAddress, Service
from ipam.tables import InterfaceIPAddressTable, InterfaceVLANTable from ipam.tables import InterfaceIPAddressTable, InterfaceVLANTable
from netbox.views import generic from netbox.views import generic
from secrets.models import Secret
from utilities.tables import paginate_table from utilities.tables import paginate_table
from utilities.utils import count_related from utilities.utils import count_related
from . import filtersets, forms, tables from . import filtersets, forms, tables
@ -332,13 +331,9 @@ class VirtualMachineView(generic.ObjectView):
Prefetch('ipaddresses', queryset=IPAddress.objects.restrict(request.user)) Prefetch('ipaddresses', queryset=IPAddress.objects.restrict(request.user))
) )
# Secrets
secrets = Secret.objects.restrict(request.user, 'view').filter(virtual_machine=instance)
return { return {
'vminterface_table': vminterface_table, 'vminterface_table': vminterface_table,
'services': services, 'services': services,
'secrets': secrets,
} }