This commit is contained in:
Jeremy Stretch 2023-11-07 16:49:03 -05:00
parent 840b7d804c
commit 60fc28e37d
26 changed files with 1514 additions and 4 deletions

View File

@ -195,17 +195,30 @@ IPAM_MENU = Menu(
), ),
) )
OVERLAY_MENU = Menu( VPN_MENU = Menu(
label=_('Overlay'), label=_('VPN'),
icon_class='mdi mdi-graph-outline', icon_class='mdi mdi-graph-outline',
groups=( groups=(
MenuGroup( MenuGroup(
label='L2VPNs', label=_('Tunnels'),
items=(
get_model_item('vpn', 'tunnel', _('Tunnels')),
get_model_item('vpn', 'tunneltermination', _('Tunnel Terminations')),
),
),
MenuGroup(
label=_('L2VPNs'),
items=( items=(
get_model_item('ipam', 'l2vpn', _('L2VPNs')), get_model_item('ipam', 'l2vpn', _('L2VPNs')),
get_model_item('ipam', 'l2vpntermination', _('Terminations')), get_model_item('ipam', 'l2vpntermination', _('Terminations')),
), ),
), ),
MenuGroup(
label=_('Security'),
items=(
get_model_item('vpn', 'ipsecprofile', _('IPSec Profiles')),
),
),
), ),
) )
@ -443,7 +456,7 @@ MENUS = [
CONNECTIONS_MENU, CONNECTIONS_MENU,
WIRELESS_MENU, WIRELESS_MENU,
IPAM_MENU, IPAM_MENU,
OVERLAY_MENU, VPN_MENU,
VIRTUALIZATION_MENU, VIRTUALIZATION_MENU,
CIRCUITS_MENU, CIRCUITS_MENU,
POWER_MENU, POWER_MENU,

View File

@ -379,6 +379,7 @@ INSTALLED_APPS = [
'users', 'users',
'utilities', 'utilities',
'virtualization', 'virtualization',
'vpn',
'wireless', 'wireless',
'django_rq', # Must come after extras to allow overriding management commands 'django_rq', # Must come after extras to allow overriding management commands
'drf_spectacular', 'drf_spectacular',

View File

@ -33,6 +33,7 @@ _patterns = [
path('tenancy/', include('tenancy.urls')), path('tenancy/', include('tenancy.urls')),
path('users/', include('users.urls')), path('users/', include('users.urls')),
path('virtualization/', include('virtualization.urls')), path('virtualization/', include('virtualization.urls')),
path('vpn/', include('vpn.urls')),
path('wireless/', include('wireless.urls')), path('wireless/', include('wireless.urls')),
# Current user views # Current user views
@ -51,6 +52,7 @@ _patterns = [
path('api/tenancy/', include('tenancy.api.urls')), path('api/tenancy/', include('tenancy.api.urls')),
path('api/users/', include('users.api.urls')), path('api/users/', include('users.api.urls')),
path('api/virtualization/', include('virtualization.api.urls')), path('api/virtualization/', include('virtualization.api.urls')),
path('api/vpn/', include('vpn.api.urls')),
path('api/wireless/', include('wireless.api.urls')), path('api/wireless/', include('wireless.api.urls')),
path('api/status/', StatusView.as_view(), name='api-status'), path('api/status/', StatusView.as_view(), name='api-status'),

0
netbox/vpn/__init__.py Normal file
View File

3
netbox/vpn/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

View File

@ -0,0 +1,40 @@
from rest_framework import serializers
from netbox.api.serializers import WritableNestedSerializer
from vpn import models
__all__ = (
'NestedIPSecProfileSerializer',
'NestedTunnelSerializer',
'NestedTunnelTerminationSerializer',
)
class NestedTunnelSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='vpn-api:tunnel-detail'
)
class Meta:
model = models.Tunnel
fields = ('id', 'url', 'display', 'name')
class NestedTunnelTerminationSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='vpn-api:tunneltermination-detail'
)
class Meta:
model = models.TunnelTermination
fields = ('id', 'url', 'display')
class NestedIPSecProfileSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='vpn-api:ipsecprofile-detail'
)
class Meta:
model = models.IPSecProfile
fields = ('id', 'url', 'display', 'name')

View File

@ -0,0 +1,117 @@
from django.contrib.contenttypes.models import ContentType
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from ipam.api.nested_serializers import NestedIPAddressSerializer
from netbox.api.fields import ChoiceField, ContentTypeField
from netbox.api.serializers import NetBoxModelSerializer
from netbox.constants import NESTED_SERIALIZER_PREFIX
from tenancy.api.nested_serializers import NestedTenantSerializer
from utilities.api import get_serializer_for_model
from vpn.choices import *
from vpn.models import *
from .nested_serializers import *
__all__ = (
'IPSecProfileSerializer',
'TunnelSerializer',
'TunnelTerminationSerializer',
)
class TunnelSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='vpn-api:tunnel-detail'
)
status = ChoiceField(
choices=TunnelStatusChoices
)
encapsulation = ChoiceField(
choices=TunnelEncapsulationChoices
)
ipsec_profile = NestedIPSecProfileSerializer(
required=False,
allow_null=True
)
tenant = NestedTenantSerializer(
required=False,
allow_null=True
)
class Meta:
model = Tunnel
fields = (
'id', 'url', 'display', 'name', 'status', 'encapsulation', 'ipsec_profile', 'tenant', 'preshared_key',
'tunnel_id', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
)
class TunnelTerminationSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='vpn-api:tunneltermination-detail'
)
tunnel = NestedTunnelSerializer()
role = ChoiceField(
choices=TunnelTerminationRoleChoices
)
interface_type = ContentTypeField(
queryset=ContentType.objects.all()
)
interface = serializers.SerializerMethodField(
read_only=True
)
outside_ip = NestedIPAddressSerializer(
required=False,
allow_null=True
)
class Meta:
model = TunnelTermination
fields = (
'id', 'url', 'display', 'tunnel', 'role', 'interface_type', 'interface_id', 'interface', 'outside_ip',
'tags', 'custom_fields', 'created', 'last_updated', '_occupied',
)
@extend_schema_field(serializers.JSONField(allow_null=True))
def get_interface(self, obj):
serializer = get_serializer_for_model(obj.interface, prefix=NESTED_SERIALIZER_PREFIX)
context = {'request': self.context['request']}
return serializer(obj.assigned_object, context=context).data
class IPSecProfileSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='vpn-api:ipsecprofile-detail'
)
protocol = ChoiceField(
choices=IPSecProtocolChoices
)
ike_version = ChoiceField(
choices=IKEVersionChoices
)
phase1_encryption = ChoiceField(
choices=EncryptionChoices
)
phase1_authentication = ChoiceField(
choices=AuthenticationChoices
)
phase1_group = ChoiceField(
choices=DHGroupChoices
)
phase2_encryption = ChoiceField(
choices=EncryptionChoices
)
phase2_authentication = ChoiceField(
choices=AuthenticationChoices
)
phase2_group = ChoiceField(
choices=DHGroupChoices
)
class Meta:
model = IPSecProfile
fields = (
'id', 'url', 'display', 'name', 'protocol', 'ike_version', 'phase1_encryption', 'phase1_authentication',
'phase1_group', 'phase1_sa_lifetime', 'phase2_encryption', 'phase2_authentication', 'phase2_group',
'phase2_sa_lifetime', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
)

11
netbox/vpn/api/urls.py Normal file
View File

@ -0,0 +1,11 @@
from netbox.api.routers import NetBoxRouter
from . import views
router = NetBoxRouter()
router.APIRootView = views.VPNRootView
router.register('ipsec-profiles', views.IPSecProfileViewSet)
router.register('tunnels', views.TunnelViewSet)
router.register('tunnel-terminations', views.TunnelTerminationViewSet)
app_name = 'vpn-api'
urlpatterns = router.urls

46
netbox/vpn/api/views.py Normal file
View File

@ -0,0 +1,46 @@
from rest_framework.routers import APIRootView
from netbox.api.viewsets import NetBoxModelViewSet
from utilities.utils import count_related
from vpn import filtersets
from vpn.models import *
from . import serializers
__all__ = (
'IPSecProfileViewSet',
'TunnelTerminationViewSet',
'TunnelViewSet',
'VPNRootView',
)
class VPNRootView(APIRootView):
"""
VPN API root view
"""
def get_view_name(self):
return 'VPN'
#
# Viewsets
#
class TunnelViewSet(NetBoxModelViewSet):
queryset = Tunnel.objects.prefetch_related('ipsec_profile', 'tenant').annotate(
terminations_count=count_related(TunnelTermination, 'tunnel')
)
serializer_class = serializers.TunnelSerializer
filterset_class = filtersets.TunnelFilterSet
class TunnelTerminationViewSet(NetBoxModelViewSet):
queryset = Tunnel.objects.prefetch_related('tunnel')
serializer_class = serializers.TunnelTerminationSerializer
filterset_class = filtersets.TunnelTerminationFilterSet
class IPSecProfileViewSet(NetBoxModelViewSet):
queryset = IPSecProfile.objects.all()
serializer_class = serializers.IPSecProfileSerializer
filterset_class = filtersets.IPSecProfileFilterSet

9
netbox/vpn/apps.py Normal file
View File

@ -0,0 +1,9 @@
from django.apps import AppConfig
class VPNConfig(AppConfig):
name = 'vpn'
verbose_name = 'VPN'
def ready(self):
from . import search

108
netbox/vpn/choices.py Normal file
View File

@ -0,0 +1,108 @@
from django.utils.translation import gettext_lazy as _
from utilities.choices import ChoiceSet
#
# Tunnels
#
class TunnelStatusChoices(ChoiceSet):
key = 'Tunnel.status'
STATUS_PLANNED = 'planned'
STATUS_ACTIVE = 'active'
STATUS_DISABLED = 'disabled'
CHOICES = [
(STATUS_PLANNED, _('Planned'), 'cyan'),
(STATUS_ACTIVE, _('Active'), 'green'),
(STATUS_DISABLED, _('Disabled'), 'red'),
]
class TunnelEncapsulationChoices(ChoiceSet):
ENCAP_GRE = 'gre'
ENCAP_IP_IP = 'ip-ip'
ENCAP_IPSEC = 'ipsec'
CHOICES = [
(ENCAP_IPSEC, _('IPsec')),
(ENCAP_IP_IP, _('Active')),
(ENCAP_GRE, _('Disabled')),
]
class TunnelTerminationRoleChoices(ChoiceSet):
ROLE_PEER = 'peer'
ROLE_HUB = 'hub'
ROLE_SPOKE = 'spoke'
CHOICES = [
(ROLE_PEER, _('Peer')),
(ROLE_HUB, _('Hub')),
(ROLE_SPOKE, _('Spoke')),
]
#
# IKE
#
class IPSecProtocolChoices(ChoiceSet):
PROTOCOL_ESP = 'esp'
PROTOCOL_AH = 'ah'
CHOICES = (
(PROTOCOL_ESP, 'ESP'),
(PROTOCOL_AH, 'AH'),
)
class IKEVersionChoices(ChoiceSet):
VERSION_1 = 1
VERSION_2 = 2
CHOICES = (
(VERSION_1, 'IKEv1'),
(VERSION_2, 'IKEv2'),
)
class EncryptionChoices(ChoiceSet):
ENCRYPTION_AES128 = 'aes-128'
ENCRYPTION_AES192 = 'aes-192'
ENCRYPTION_AES256 = 'aes-256'
ENCRYPTION_3DES = '3des'
CHOICES = (
(ENCRYPTION_AES128, 'AES (128-bit)'),
(ENCRYPTION_AES192, 'AES (192-bit)'),
(ENCRYPTION_AES256, 'AES (256-bit)'),
(ENCRYPTION_3DES, '3DES'),
)
class AuthenticationChoices(ChoiceSet):
AUTH_SHA1 = 'SHA-1'
AUTH_MD5 = 'MD5'
CHOICES = (
(AUTH_SHA1, 'SHA-1'),
(AUTH_MD5, 'MD5'),
)
class DHGroupChoices(ChoiceSet):
# TODO: Add all the groups & annotate their attributes
GROUP_1 = 1
GROUP_2 = 2
GROUP_5 = 5
GROUP_7 = 7
CHOICES = (
(GROUP_1, _('Group {n}').format(n=1)),
(GROUP_2, _('Group {n}').format(n=2)),
(GROUP_5, _('Group {n}').format(n=5)),
(GROUP_7, _('Group {n}').format(n=7)),
)

137
netbox/vpn/filtersets.py Normal file
View File

@ -0,0 +1,137 @@
import django_filters
from django.db.models import Q
from django.utils.translation import gettext as _
from dcim.models import Interface
from ipam.models import IPAddress
from netbox.filtersets import NetBoxModelFilterSet
from tenancy.filtersets import TenancyFilterSet
from virtualization.models import VMInterface
from .choices import *
from .models import *
__all__ = (
'IPSecProfileFilterSet',
'TunnelFilterSet',
'TunnelTerminationFilterSet',
)
class TunnelFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
status = django_filters.MultipleChoiceFilter(
choices=TunnelStatusChoices
)
encapsulation = django_filters.MultipleChoiceFilter(
choices=TunnelEncapsulationChoices
)
ipsec_profile_id = django_filters.ModelMultipleChoiceFilter(
queryset=IPSecProfile.objects.all(),
label=_('IPSec profile (ID)'),
)
ipsec_profile = django_filters.ModelMultipleChoiceFilter(
field_name='ipsec_profile__name',
queryset=IPSecProfile.objects.all(),
to_field_name='name',
label=_('IPSec profile (name)'),
)
class Meta:
model = Tunnel
fields = ['id', 'name', 'preshared_key', 'tunnel_id']
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(name__icontains=value) |
Q(description__icontains=value) |
Q(comments__icontains=value)
)
class TunnelTerminationFilterSet(NetBoxModelFilterSet):
tunnel_id = django_filters.ModelMultipleChoiceFilter(
field_name='tunnel',
queryset=Tunnel.objects.all(),
label=_('Tunnel (ID)'),
)
tunnel = django_filters.ModelMultipleChoiceFilter(
field_name='tunnel__name',
queryset=IPSecProfile.objects.all(),
to_field_name='name',
label=_('Tunnel (name)'),
)
role = django_filters.MultipleChoiceFilter(
choices=TunnelTerminationRoleChoices
)
# interface = django_filters.ModelMultipleChoiceFilter(
# field_name='interface__name',
# queryset=Interface.objects.all(),
# to_field_name='name',
# label=_('Interface (name)'),
# )
# interface_id = django_filters.ModelMultipleChoiceFilter(
# field_name='interface',
# queryset=Interface.objects.all(),
# label=_('Interface (ID)'),
# )
# vminterface = django_filters.ModelMultipleChoiceFilter(
# field_name='interface__name',
# queryset=VMInterface.objects.all(),
# to_field_name='name',
# label=_('VM interface (name)'),
# )
# vminterface_id = django_filters.ModelMultipleChoiceFilter(
# field_name='vminterface',
# queryset=VMInterface.objects.all(),
# label=_('VM interface (ID)'),
# )
outside_ip_id = django_filters.ModelMultipleChoiceFilter(
field_name='outside_ip',
queryset=IPAddress.objects.all(),
label=_('Outside IP (ID)'),
)
class Meta:
model = TunnelTermination
fields = ['id']
class IPSecProfileFilterSet(NetBoxModelFilterSet):
protocol = django_filters.MultipleChoiceFilter(
choices=IPSecProtocolChoices
)
ike_version = django_filters.MultipleChoiceFilter(
choices=IKEVersionChoices
)
phase1_encryption = django_filters.MultipleChoiceFilter(
choices=EncryptionChoices
)
phase1_authentication = django_filters.MultipleChoiceFilter(
choices=AuthenticationChoices
)
phase1_group = django_filters.MultipleChoiceFilter(
choices=DHGroupChoices
)
phase2_encryption = django_filters.MultipleChoiceFilter(
choices=EncryptionChoices
)
phase2_authentication = django_filters.MultipleChoiceFilter(
choices=AuthenticationChoices
)
phase2_group = django_filters.MultipleChoiceFilter(
choices=DHGroupChoices
)
class Meta:
model = IPSecProfile
fields = ['id', 'name', 'phase1_sa_lifetime', 'phase2_sa_lifetime']
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(name__icontains=value) |
Q(description__icontains=value) |
Q(comments__icontains=value)
)

View File

@ -0,0 +1,4 @@
from .bulk_edit import *
from .bulk_import import *
from .filtersets import *
from .model_forms import *

View File

@ -0,0 +1,140 @@
from django import forms
from django.utils.translation import gettext_lazy as _
from netbox.forms import NetBoxModelBulkEditForm
from tenancy.models import Tenant
from utilities.forms import add_blank_choice
from utilities.forms.fields import CommentField, DynamicModelChoiceField, DynamicModelMultipleChoiceField
from vpn.choices import *
from vpn.models import *
__all__ = (
'IPSecProfileBulkEditForm',
'TunnelBulkEditForm',
'TunnelTerminationBulkEditForm',
)
class TunnelBulkEditForm(NetBoxModelBulkEditForm):
status = forms.ChoiceField(
label=_('Status'),
choices=add_blank_choice(TunnelStatusChoices),
required=False
)
encapsulation = forms.ChoiceField(
label=_('Encapsulation'),
choices=add_blank_choice(TunnelEncapsulationChoices),
required=False
)
ipsec_profile = DynamicModelMultipleChoiceField(
queryset=IPSecProfile.objects.all(),
label=_('IPSec profile'),
required=False
)
preshared_key = forms.CharField(
label=_('Pre-shared key'),
required=False
)
tenant = DynamicModelChoiceField(
label=_('Tenant'),
queryset=Tenant.objects.all(),
required=False
)
description = forms.CharField(
label=_('Description'),
max_length=200,
required=False
)
tunnel_id = forms.IntegerField(
label=_('Tunnel ID'),
required=False
)
comments = CommentField()
model = Tunnel
fieldsets = (
(_('Tunnel'), ('status', 'encapsulation', 'tunnel_id', 'description')),
(_('Security'), ('ipsec_profile', 'preshared_key')),
(_('Tenancy'), ('tenant',)),
)
nullable_fields = (
'ipsec_profile', 'preshared_key', 'tunnel_id', 'tenant', 'description', 'comments',
)
class TunnelTerminationBulkEditForm(NetBoxModelBulkEditForm):
role = forms.ChoiceField(
label=_('Role'),
choices=add_blank_choice(TunnelTerminationRoleChoices),
required=False
)
model = TunnelTermination
fieldsets = (
(None, ('role',)),
)
class IPSecProfileBulkEditForm(NetBoxModelBulkEditForm):
protocol = forms.ChoiceField(
label=_('Protocol'),
choices=add_blank_choice(IPSecProtocolChoices),
required=False
)
ike_version = forms.ChoiceField(
label=_('IKE version'),
choices=add_blank_choice(IKEVersionChoices),
required=False
)
description = forms.CharField(
label=_('Description'),
max_length=200,
required=False
)
phase1_encryption = forms.ChoiceField(
label=_('Encryption'),
choices=add_blank_choice(EncryptionChoices),
required=False
)
phase1_authentication = forms.ChoiceField(
label=_('Authentication'),
choices=add_blank_choice(AuthenticationChoices),
required=False
)
phase1_group = forms.ChoiceField(
label=_('Group'),
choices=add_blank_choice(DHGroupChoices),
required=False
)
phase1_sa_lifetime = forms.IntegerField(
required=False
)
phase2_encryption = forms.ChoiceField(
label=_('Encryption'),
choices=add_blank_choice(EncryptionChoices),
required=False
)
phase2_authentication = forms.ChoiceField(
label=_('Authentication'),
choices=add_blank_choice(AuthenticationChoices),
required=False
)
phase2_group = forms.ChoiceField(
label=_('Group'),
choices=add_blank_choice(DHGroupChoices),
required=False
)
phase2_sa_lifetime = forms.IntegerField(
required=False
)
comments = CommentField()
model = IPSecProfile
fieldsets = (
(_('Profile'), ('protocol', 'ike_version', 'description')),
(_('Phase 1 Parameters'), ('phase1_encryption', 'phase1_authentication', 'phase1_group', 'phase1_sa_lifetime')),
(_('Phase 2 Parameters'), ('phase2_encryption', 'phase2_authentication', 'phase2_group', 'phase2_sa_lifetime')),
)
nullable_fields = (
'description', 'phase1_sa_lifetime', 'phase2_sa_lifetime', 'comments',
)

View File

@ -0,0 +1,153 @@
from django.utils.translation import gettext_lazy as _
from dcim.models import Device, Interface
from ipam.models import IPAddress
from netbox.forms import NetBoxModelImportForm
from tenancy.models import Tenant
from utilities.forms.fields import CSVChoiceField, CSVModelChoiceField
from virtualization.models import VirtualMachine, VMInterface
from vpn.choices import *
from vpn.models import *
__all__ = (
'IPSecProfileImportForm',
'TunnelImportForm',
'TunnelTerminationImportForm',
)
class TunnelImportForm(NetBoxModelImportForm):
status = CSVChoiceField(
label=_('Status'),
choices=TunnelStatusChoices,
help_text=_('Operational status')
)
encapsulation = CSVChoiceField(
label=_('Encapsulation'),
choices=TunnelEncapsulationChoices,
help_text=_('Tunnel encapsulation')
)
ipsec_profile = CSVModelChoiceField(
label=_('IPSec profile'),
queryset=IPSecProfile.objects.all(),
to_field_name='name'
)
tenant = CSVModelChoiceField(
label=_('Tenant'),
queryset=Tenant.objects.all(),
required=False,
to_field_name='name',
help_text=_('Assigned tenant')
)
class Meta:
model = Tunnel
fields = (
'name', 'status', 'encapsulation', 'ipsec_profile', 'tenant', 'preshared_key', 'tunnel_id', 'description',
'comments', 'tags',
)
class TunnelTerminationImportForm(NetBoxModelImportForm):
tunnel = CSVModelChoiceField(
label=_('Tunnel'),
queryset=Tunnel.objects.all(),
to_field_name='name'
)
role = CSVChoiceField(
label=_('Role'),
choices=TunnelTerminationRoleChoices,
help_text=_('Operational role')
)
device = CSVModelChoiceField(
label=_('Device'),
queryset=Device.objects.all(),
required=False,
to_field_name='name',
help_text=_('Parent device of assigned interface')
)
virtual_machine = CSVModelChoiceField(
label=_('Virtual machine'),
queryset=VirtualMachine.objects.all(),
required=False,
to_field_name='name',
help_text=_('Parent VM of assigned interface')
)
interface = CSVModelChoiceField(
label=_('Interface'),
queryset=Interface.objects.none(), # Can also refer to VMInterface
required=False,
to_field_name='name',
help_text=_('Assigned interface')
)
outside_ip = CSVModelChoiceField(
label=_('Outside IP'),
queryset=IPAddress.objects.all(),
to_field_name='name'
)
class Meta:
model = TunnelTermination
fields = (
'tunnel', 'role', 'outside_ip', 'tags',
)
def __init__(self, data=None, *args, **kwargs):
super().__init__(data, *args, **kwargs)
if data:
# Limit interface queryset by assigned device/VM
if data.get('device'):
self.fields['interface'].queryset = Interface.objects.filter(
**{f"device__{self.fields['device'].to_field_name}": data['device']}
)
elif data.get('virtual_machine'):
self.fields['interface'].queryset = VMInterface.objects.filter(
**{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
)
class IPSecProfileImportForm(NetBoxModelImportForm):
protocol = CSVChoiceField(
label=_('Protocol'),
choices=IPSecProtocolChoices,
help_text=_('IPSec protocol')
)
ike_version = CSVChoiceField(
label=_('IKE version'),
choices=IKEVersionChoices,
help_text=_('IKE version')
)
phase1_encryption = CSVChoiceField(
label=_('Phase 1 Encryption'),
choices=EncryptionChoices
)
phase1_authentication = CSVChoiceField(
label=_('Phase 1 Authentication'),
choices=AuthenticationChoices
)
phase1_group = CSVChoiceField(
label=_('Phase 1 Group'),
choices=DHGroupChoices
)
phase2_encryption = CSVChoiceField(
label=_('Phase 2 Encryption'),
choices=EncryptionChoices
)
phase2_authentication = CSVChoiceField(
label=_('Phase 2 Authentication'),
choices=AuthenticationChoices
)
phase2_group = CSVChoiceField(
label=_('Phase 2 Group'),
choices=DHGroupChoices
)
class Meta:
model = IPSecProfile
fields = (
'name', 'protocol', 'ike_version', 'phase1_encryption', 'phase1_authentication', 'phase1_group',
'phase1_sa_lifetime', 'phase1_encryption', 'phase1_authentication', 'phase1_group', 'phase1_sa_lifetime',
'description', 'comments', 'tags',
)

View File

@ -0,0 +1,124 @@
from django import forms
from django.utils.translation import gettext as _
from netbox.forms import NetBoxModelFilterSetForm
from utilities.forms.fields import DynamicModelMultipleChoiceField, TagFilterField
from vpn.choices import *
from vpn.models import *
__all__ = (
'IPSecProfileFilterForm',
'TunnelFilterForm',
'TunnelTerminationFilterForm',
)
class TunnelFilterForm(NetBoxModelFilterSetForm):
model = Tunnel
fieldsets = (
(None, ('q', 'filter_id', 'tag')),
(_('Tunnel'), ('status', 'encapsulation', 'tunnel_id')),
(_('Security'), ('ipsec_profile_id', 'preshared_key')),
(_('Tenancy'), ('tenant',)),
)
status = forms.MultipleChoiceField(
label=_('Status'),
choices=TunnelStatusChoices,
required=False
)
encapsulation = forms.MultipleChoiceField(
label=_('Encapsulation'),
choices=TunnelEncapsulationChoices,
required=False
)
ipsec_profile_id = DynamicModelMultipleChoiceField(
queryset=IPSecProfile.objects.all(),
required=False,
label=_('IPSec profile')
)
tag = TagFilterField(model)
class TunnelTerminationFilterForm(NetBoxModelFilterSetForm):
model = TunnelTermination
fieldsets = (
(None, ('q', 'filter_id', 'tag')),
(_('Termination'), ('tunnel_id', 'role')),
)
tunnel_id = DynamicModelMultipleChoiceField(
queryset=Tunnel.objects.all(),
required=False,
label=_('Tunnel')
)
role = forms.MultipleChoiceField(
label=_('Role'),
choices=TunnelTerminationRoleChoices,
required=False
)
tag = TagFilterField(model)
class IPSecProfileFilterForm(NetBoxModelFilterSetForm):
model = IPSecProfile
fieldsets = (
(None, ('q', 'filter_id', 'tag')),
(_('Profile'), ('protocol', 'ike_version')),
(_('Phase 1 Parameters'), ('phase1_encryption', 'phase1_authentication', 'phase1_group', 'phase1_sa_lifetime')),
(_('Phase 2 Parameters'), ('phase2_encryption', 'phase2_authentication', 'phase2_group', 'phase2_sa_lifetime')),
)
protocol = forms.MultipleChoiceField(
label=_('Protocol'),
choices=IPSecProtocolChoices,
required=False
)
ike_version = forms.MultipleChoiceField(
label=_('IKE version'),
choices=IKEVersionChoices,
required=False
)
ipsec_profile_id = DynamicModelMultipleChoiceField(
queryset=IPSecProfile.objects.all(),
required=False,
label=_('IPSec profile')
)
phase1_encryption = forms.MultipleChoiceField(
label=_('Encryption'),
choices=EncryptionChoices,
required=False
)
phase1_authentication = forms.MultipleChoiceField(
label=_('Authentication'),
choices=AuthenticationChoices,
required=False
)
phase1_group = forms.MultipleChoiceField(
label=_('Group'),
choices=DHGroupChoices,
required=False
)
phase1_sa_lifetime = forms.IntegerField(
required=False,
min_value=0,
label=_('SA lifetime')
)
phase2_encryption = forms.MultipleChoiceField(
label=_('Encryption'),
choices=EncryptionChoices,
required=False
)
phase2_authentication = forms.MultipleChoiceField(
label=_('Authentication'),
choices=AuthenticationChoices,
required=False
)
phase2_group = forms.MultipleChoiceField(
label=_('Group'),
choices=DHGroupChoices,
required=False
)
phase2_sa_lifetime = forms.IntegerField(
required=False,
min_value=0,
label=_('SA lifetime')
)
tag = TagFilterField(model)

View File

@ -0,0 +1,96 @@
from django import forms
from django.utils.translation import gettext_lazy as _
from dcim.models import Interface
from netbox.forms import NetBoxModelForm
from tenancy.forms import TenancyForm
from utilities.forms.fields import CommentField, DynamicModelChoiceField
from virtualization.models import VMInterface
from vpn.models import *
__all__ = (
'IPSecProfileForm',
'TunnelForm',
'TunnelTerminationForm',
)
class TunnelForm(TenancyForm, NetBoxModelForm):
ipsec_profile = DynamicModelChoiceField(
queryset=IPSecProfile.objects.all()
)
comments = CommentField()
fieldsets = (
(_('Tunnel'), ('name', 'status', 'encapsulation', 'description', 'tunnel_id', 'tags')),
(_('Security'), ('ipsec_profile', 'preshared_key')),
(_('Tenancy'), ('tenant_group', 'tenant')),
)
class Meta:
model = Tunnel
fields = [
'name', 'status', 'encapsulation', 'description', 'tunnel_id', 'ipsec_profile', 'preshared_key',
'tenant_group', 'tenant', 'comments', 'tags',
]
class TunnelTerminationForm(NetBoxModelForm):
tunnel = DynamicModelChoiceField(
queryset=Tunnel.objects.all()
)
interface = DynamicModelChoiceField(
label=_('Interface'),
queryset=Interface.objects.all(),
required=False,
selector=True,
)
vminterface = DynamicModelChoiceField(
queryset=VMInterface.objects.all(),
required=False,
selector=True,
label=_('Interface'),
)
class Meta:
model = TunnelTermination
fields = [
'tunnel', 'role', 'outside_ip', 'tags',
]
def __init__(self, *args, **kwargs):
# Initialize helper selectors
initial = kwargs.get('initial', {}).copy()
if instance := kwargs.get('instance'):
if type(instance.interface) is Interface:
initial['interface'] = instance.interface
elif type(instance.interface) is VMInterface:
initial['vminterface'] = instance.interface
kwargs['initial'] = initial
super().__init__(*args, **kwargs)
def clean(self):
super().clean()
# Handle interface assignment
self.instance.interface = self.cleaned_data['interface'] or self.cleaned_data['interface'] or None
class IPSecProfileForm(NetBoxModelForm):
comments = CommentField()
fieldsets = (
(_('Profile'), ('name', 'protocol', 'ike_version', 'description', 'tags')),
(_('Phase 1 Parameters'), ('phase1_encryption', 'phase1_authentication', 'phase1_group', 'phase1_sa_lifetime')),
(_('Phase 2 Parameters'), ('phase2_encryption', 'phase2_authentication', 'phase2_group', 'phase2_sa_lifetime')),
)
class Meta:
model = IPSecProfile
fields = [
'name', 'protocol', 'ike_version', 'phase1_encryption', 'phase1_authentication', 'phase1_group',
'phase1_sa_lifetime', 'phase2_encryption', 'phase2_authentication', 'phase2_group', 'phase2_sa_lifetime',
'description', 'comments', 'tags',
]

View File

View File

@ -0,0 +1,2 @@
from .crypto import *
from .tunnels import *

View File

@ -0,0 +1,86 @@
from django.db import models
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from netbox.models import PrimaryModel
from vpn.choices import *
__all__ = (
'IPSecProfile',
)
class IPSecProfile(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True
)
protocol = models.CharField(
verbose_name=_('protocol'),
choices=IPSecProtocolChoices
)
ike_version = models.PositiveSmallIntegerField(
verbose_name=_('IKE version'),
choices=IKEVersionChoices,
default=IKEVersionChoices.VERSION_2
)
# Phase 1 parameters
phase1_encryption = models.CharField(
verbose_name=_('phase 1 encryption'),
choices=EncryptionChoices
)
phase1_authentication = models.CharField(
verbose_name=_('phase 1 authentication'),
choices=AuthenticationChoices
)
phase1_group = models.PositiveSmallIntegerField(
verbose_name=_('phase 1 group'),
choices=DHGroupChoices,
help_text=_('Diffie-Hellman group')
)
phase1_sa_lifetime = models.PositiveSmallIntegerField(
verbose_name=_('phase 1 SA lifetime'),
blank=True,
null=True,
help_text=_('Security association lifetime (in seconds)')
)
# Phase 2 parameters
phase2_encryption = models.CharField(
verbose_name=_('phase 2 encryption'),
choices=EncryptionChoices
)
phase2_authentication = models.CharField(
verbose_name=_('phase 2 authentication'),
choices=AuthenticationChoices
)
phase2_group = models.PositiveSmallIntegerField(
verbose_name=_('phase 2 group'),
choices=DHGroupChoices,
help_text=_('Diffie-Hellman group')
)
phase2_sa_lifetime = models.PositiveSmallIntegerField(
verbose_name=_('phase 2 SA lifetime'),
blank=True,
null=True,
help_text=_('Security association lifetime (in seconds)')
)
# TODO: Add PFS group?
clone_fields = (
'protocol', 'ike_version', 'phase1_encryption', 'phase1_authentication', 'phase1_group', 'phase1_as_lifetime',
'phase2_encryption', 'phase2_authentication', 'phase2_group', 'phase2_as_lifetime',
)
class Meta:
ordering = ('name',)
verbose_name = _('tunnel')
verbose_name_plural = _('tunnels')
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('vpn:ipsecprofile', args=[self.pk])

View File

@ -0,0 +1,115 @@
from django.contrib.contenttypes.fields import GenericForeignKey
from django.db import models
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from netbox.models import ChangeLoggedModel, PrimaryModel
from netbox.models.features import CustomFieldsMixin, CustomLinksMixin, TagsMixin
from vpn.choices import *
__all__ = (
'Tunnel',
'TunnelTermination',
)
class Tunnel(PrimaryModel):
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True
)
status = models.CharField(
verbose_name=_('status'),
max_length=50,
choices=TunnelStatusChoices,
default=TunnelStatusChoices.STATUS_ACTIVE
)
encapsulation = models.CharField(
verbose_name=_('encapsulation'),
max_length=50,
choices=TunnelEncapsulationChoices
)
ipsec_profile = models.ForeignKey(
to='vpn.IPSecProfile',
on_delete=models.PROTECT,
related_name='tunnels',
blank=True,
null=True
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='tunnels',
blank=True,
null=True
)
preshared_key = models.TextField(
verbose_name=_('pre-shared key'),
blank=True
)
tunnel_id = models.PositiveBigIntegerField(
verbose_name=_('tunnel ID'),
blank=True
)
clone_fields = (
'status', 'encapsulation', 'ipsec_profile', 'tenant',
)
class Meta:
ordering = ('name',)
verbose_name = _('tunnel')
verbose_name_plural = _('tunnels')
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('vpn:tunnel', args=[self.pk])
def get_status_color(self):
return TunnelStatusChoices.colors.get(self.status)
class TunnelTermination(CustomFieldsMixin, CustomLinksMixin, TagsMixin, ChangeLoggedModel):
tunnel = models.ForeignKey(
to='vpn.Tunnel',
on_delete=models.CASCADE,
related_name='terminations'
)
role = models.CharField(
verbose_name=_('role'),
max_length=50,
choices=TunnelTerminationRoleChoices,
default=TunnelTerminationRoleChoices.ROLE_PEER
)
interface_type = models.ForeignKey(
to='contenttypes.ContentType',
on_delete=models.PROTECT,
related_name='+'
)
interface_id = models.PositiveBigIntegerField(
blank=True,
null=True
)
interface = GenericForeignKey(
ct_field='interface_type',
fk_field='interface_id'
)
outside_ip = models.OneToOneField(
to='ipam.IPAddress',
on_delete=models.PROTECT,
related_name='tunnel_termination'
)
class Meta:
ordering = ('tunnel', 'pk')
verbose_name = _('tunnel termination')
verbose_name_plural = _('tunnel terminations')
def __str__(self):
return f'{self.tunnel}: Termination {self.pk}'
def get_absolute_url(self):
return self.tunnel.get_absolute_url()

0
netbox/vpn/search.py Normal file
View File

124
netbox/vpn/tables.py Normal file
View File

@ -0,0 +1,124 @@
import django_tables2 as tables
from django.utils.translation import gettext_lazy as _
from django_tables2.utils import Accessor
from tenancy.tables import TenancyColumnsMixin
from netbox.tables import NetBoxTable, columns
from vpn.models import *
__all__ = (
'IPSecProfileTable',
'TunnelTable',
'TunnelTerminationTable',
)
class TunnelTable(TenancyColumnsMixin, NetBoxTable):
name = tables.Column(
verbose_name=_('Name'),
linkify=True
)
status = columns.ChoiceFieldColumn(
verbose_name=_('Status')
)
encapsulation = columns.ChoiceFieldColumn(
verbose_name=_('Encapsulation')
)
ipsec_profile = tables.Column(
verbose_name=_('IPSec profile'),
linkify=True
)
terminations_count = columns.LinkedCountColumn(
accessor=Accessor('count_terminations'),
viewname='vpn:tunneltermination_list',
url_params={'tunnel_id': 'pk'},
verbose_name=_('Terminations')
)
comments = columns.MarkdownColumn(
verbose_name=_('Comments'),
)
tags = columns.TagColumn(
url_name='vpn:tunnel_list'
)
class Meta(NetBoxTable.Meta):
model = Tunnel
fields = (
'pk', 'id', 'name', 'status', 'encapsulation', 'ipsec_profile', 'tenant', 'tenant_group', 'preshared_key',
'tunnel_id', 'termination_count', 'description', 'comments', 'tags', 'created', 'last_updated',
)
default_columns = ('pk', 'name', 'status', 'encapsulation', 'tenant', 'termination_count')
class TunnelTerminationTable(TenancyColumnsMixin, NetBoxTable):
tunnel = tables.Column(
verbose_name=_('Tunnel'),
linkify=True
)
role = columns.ChoiceFieldColumn(
verbose_name=_('Role')
)
interface = tables.Column(
verbose_name=_('Interface'),
linkify=True
)
outside_ip = tables.Column(
verbose_name=_('Outside IP'),
linkify=True
)
tags = columns.TagColumn(
url_name='vpn:tunneltermination_list'
)
class Meta(NetBoxTable.Meta):
model = TunnelTermination
fields = (
'pk', 'id', 'tunnel', 'role', 'interface', 'outside_ip', 'tags', 'created', 'last_updated',
)
default_columns = ('pk', 'tunnel', 'role', 'interface', 'outside_ip')
class IPSecProfileTable(TenancyColumnsMixin, NetBoxTable):
name = tables.Column(
verbose_name=_('Name'),
linkify=True
)
protocol = columns.ChoiceFieldColumn(
verbose_name=_('Protocol')
)
ike_version = columns.ChoiceFieldColumn(
verbose_name=_('IKE Version')
)
phase1_encryption = columns.ChoiceFieldColumn(
verbose_name=_('Phase 1 Encryption')
)
phase1_authentication = columns.ChoiceFieldColumn(
verbose_name=_('Phase 1 Authentication')
)
phase1_group = columns.ChoiceFieldColumn(
verbose_name=_('Phase 1 Group')
)
phase2_encryption = columns.ChoiceFieldColumn(
verbose_name=_('Phase 2 Encryption')
)
phase2_authentication = columns.ChoiceFieldColumn(
verbose_name=_('Phase 2 Authentication')
)
phase2_group = columns.ChoiceFieldColumn(
verbose_name=_('Phase 2 Group')
)
comments = columns.MarkdownColumn(
verbose_name=_('Comments'),
)
tags = columns.TagColumn(
url_name='vpn:tunnel_list'
)
class Meta(NetBoxTable.Meta):
model = IPSecProfile
fields = (
'pk', 'id', 'name', 'protocol', 'ike_version', 'phase1_encryption', 'phase1_authentication', 'phase1_group',
'phase1_sa_lifetime', 'phase2_encryption', 'phase2_authentication', 'phase2_group', 'phase1_sa_lifetime',
'description', 'comments', 'tags', 'created', 'last_updated',
)
default_columns = ('pk', 'name', 'protocol', 'ike_version', 'description')

33
netbox/vpn/urls.py Normal file
View File

@ -0,0 +1,33 @@
from django.urls import include, path
from utilities.urls import get_model_urls
from . import views
app_name = 'vpn'
urlpatterns = [
# Tunnels
path('tunnels/', views.TunnelListView.as_view(), name='tunnel_list'),
path('tunnels/add/', views.TunnelEditView.as_view(), name='tunnel_add'),
path('tunnels/import/', views.TunnelBulkImportView.as_view(), name='tunnel_import'),
path('tunnels/edit/', views.TunnelBulkEditView.as_view(), name='tunnel_bulk_edit'),
path('tunnels/delete/', views.TunnelBulkDeleteView.as_view(), name='tunnel_bulk_delete'),
path('tunnels/<int:pk>/', include(get_model_urls('vpn', 'tunnel'))),
# Tunnel terminations
path('tunnel-terminations/', views.TunnelTerminationListView.as_view(), name='tunneltermination_list'),
path('tunnel-terminations/add/', views.TunnelTerminationEditView.as_view(), name='tunneltermination_add'),
path('tunnel-terminations/import/', views.TunnelTerminationBulkImportView.as_view(), name='tunneltermination_import'),
path('tunnel-terminations/edit/', views.TunnelTerminationBulkEditView.as_view(), name='tunneltermination_bulk_edit'),
path('tunnel-terminations/delete/', views.TunnelTerminationBulkDeleteView.as_view(), name='tunneltermination_bulk_delete'),
path('tunnel-terminations/<int:pk>/', include(get_model_urls('vpn', 'tunneltermination'))),
# IPSec profiles
path('ipsec-profiles/', views.IPSecProfileListView.as_view(), name='ipsecprofile_list'),
path('ipsec-profiles/add/', views.IPSecProfileEditView.as_view(), name='ipsecprofile_add'),
path('ipsec-profiles/import/', views.IPSecProfileBulkImportView.as_view(), name='ipsecprofile_import'),
path('ipsec-profiles/edit/', views.IPSecProfileBulkEditView.as_view(), name='ipsecprofile_bulk_edit'),
path('ipsec-profiles/delete/', views.IPSecProfileBulkDeleteView.as_view(), name='ipsecprofile_bulk_delete'),
path('ipsec-profiles/<int:pk>/', include(get_model_urls('vpn', 'ipsecprofile'))),
]

146
netbox/vpn/views.py Normal file
View File

@ -0,0 +1,146 @@
from netbox.views import generic
from utilities.utils import count_related
from utilities.views import register_model_view
from . import filtersets, forms, tables
from .models import *
#
# Tunnels
#
class TunnelListView(generic.ObjectListView):
queryset = Tunnel.objects.annotate(
count_terminations=count_related(TunnelTermination, 'tunnel')
)
filterset = filtersets.TunnelFilterSet
filterset_form = forms.TunnelFilterForm
table = tables.TunnelTable
@register_model_view(Tunnel)
class TunnelView(generic.ObjectView):
queryset = Tunnel.objects.all()
@register_model_view(Tunnel, 'edit')
class TunnelEditView(generic.ObjectEditView):
queryset = Tunnel.objects.all()
form = forms.TunnelForm
@register_model_view(Tunnel, 'delete')
class TunnelDeleteView(generic.ObjectDeleteView):
queryset = Tunnel.objects.all()
class TunnelBulkImportView(generic.BulkImportView):
queryset = Tunnel.objects.all()
model_form = forms.TunnelImportForm
class TunnelBulkEditView(generic.BulkEditView):
queryset = Tunnel.objects.annotate(
count_terminations=count_related(TunnelTermination, 'tunnel')
)
filterset = filtersets.TunnelFilterSet
table = tables.TunnelTable
form = forms.TunnelBulkEditForm
class TunnelBulkDeleteView(generic.BulkDeleteView):
queryset = Tunnel.objects.annotate(
count_terminations=count_related(TunnelTermination, 'tunnel')
)
filterset = filtersets.TunnelFilterSet
table = tables.TunnelTable
#
# Tunnel terminations
#
class TunnelTerminationListView(generic.ObjectListView):
queryset = TunnelTermination.objects.all()
filterset = filtersets.TunnelTerminationFilterSet
filterset_form = forms.TunnelTerminationFilterForm
table = tables.TunnelTerminationTable
@register_model_view(TunnelTermination)
class TunnelTerminationView(generic.ObjectView):
queryset = TunnelTermination.objects.all()
@register_model_view(TunnelTermination, 'edit')
class TunnelTerminationEditView(generic.ObjectEditView):
queryset = TunnelTermination.objects.all()
form = forms.TunnelTerminationForm
@register_model_view(TunnelTermination, 'delete')
class TunnelTerminationDeleteView(generic.ObjectDeleteView):
queryset = TunnelTermination.objects.all()
class TunnelTerminationBulkImportView(generic.BulkImportView):
queryset = TunnelTermination.objects.all()
model_form = forms.TunnelTerminationImportForm
class TunnelTerminationBulkEditView(generic.BulkEditView):
queryset = TunnelTermination.objects.all()
filterset = filtersets.TunnelTerminationFilterSet
table = tables.TunnelTerminationTable
form = forms.TunnelTerminationBulkEditForm
class TunnelTerminationBulkDeleteView(generic.BulkDeleteView):
queryset = TunnelTermination.objects.all()
filterset = filtersets.TunnelTerminationFilterSet
table = tables.TunnelTerminationTable
#
# IPSec profiles
#
class IPSecProfileListView(generic.ObjectListView):
queryset = IPSecProfile.objects.all()
filterset = filtersets.IPSecProfileFilterSet
filterset_form = forms.IPSecProfileFilterForm
table = tables.IPSecProfileTable
@register_model_view(IPSecProfile)
class IPSecProfileView(generic.ObjectView):
queryset = IPSecProfile.objects.all()
@register_model_view(IPSecProfile, 'edit')
class IPSecProfileEditView(generic.ObjectEditView):
queryset = IPSecProfile.objects.all()
form = forms.IPSecProfileForm
@register_model_view(IPSecProfile, 'delete')
class IPSecProfileDeleteView(generic.ObjectDeleteView):
queryset = IPSecProfile.objects.all()
class IPSecProfileBulkImportView(generic.BulkImportView):
queryset = IPSecProfile.objects.all()
model_form = forms.IPSecProfileImportForm
class IPSecProfileBulkEditView(generic.BulkEditView):
queryset = IPSecProfile.objects.all()
filterset = filtersets.IPSecProfileFilterSet
table = tables.IPSecProfileTable
form = forms.IPSecProfileBulkEditForm
class IPSecProfileBulkDeleteView(generic.BulkDeleteView):
queryset = IPSecProfile.objects.all()
filterset = filtersets.IPSecProfileFilterSet
table = tables.IPSecProfileTable