Merge pull request #9631 from netbox-community/8157-l2vpn

Closes: #8157 - Add L2VPN support
This commit is contained in:
Jeremy Stretch 2022-07-06 13:14:21 -04:00 committed by GitHub
commit 5fd3eb82cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1545 additions and 8 deletions

View File

@ -26,3 +26,8 @@
---
{!models/ipam/asn.md!}
---
{!models/ipam/l2vpn.md!}
{!models/ipam/l2vpntermination.md!}

View File

@ -45,6 +45,8 @@ The Django [content types](https://docs.djangoproject.com/en/stable/ref/contrib/
* [ipam.FHRPGroup](../models/ipam/fhrpgroup.md)
* [ipam.IPAddress](../models/ipam/ipaddress.md)
* [ipam.IPRange](../models/ipam/iprange.md)
* [ipam.L2VPN](../models/ipam/l2vpn.md)
* [ipam.L2VPNTermination](../models/ipam/l2vpntermination.md)
* [ipam.Prefix](../models/ipam/prefix.md)
* [ipam.RouteTarget](../models/ipam/routetarget.md)
* [ipam.Service](../models/ipam/service.md)

21
docs/models/ipam/l2vpn.md Normal file
View File

@ -0,0 +1,21 @@
# L2VPN
A L2VPN object is NetBox is a representation of a layer 2 bridge technology such as VXLAN, VPLS or EPL. Each L2VPN can be identified by name as well as an optional unique identifier (VNI would be an example).
Each L2VPN instance must have one of the following type associated with it:
* VPLS
* VPWS
* EPL
* EVPL
* EP-LAN
* EVP-LAN
* EP-TREE
* EVP-TREE
* VXLAN
* VXLAN EVPN
* MPLS-EVPN
* PBB-EVPN
!!!note
Choosing VPWS, EPL, EP-LAN, EP-TREE will result in only being able to add 2 terminations to a given L2VPN.

View File

@ -0,0 +1,15 @@
# L2VPN Termination
A L2VPN Termination is the termination point of a L2VPN. Certain types of L2VPN's may only have 2 termination points (point-to-point) while others may have many terminations (multipoint).
Each termination consists of a L2VPN it is a member of as well as the connected endpoint which can be an interface or a VLAN.
The following types of L2VPN's are considered point-to-point:
* VPWS
* EPL
* EP-LAN
* EP-TREE
!!!note
Choosing any of the above types of L2VPN's will result in only being able to add 2 terminations to a given L2VPN.

View File

@ -10,6 +10,7 @@ from dcim.constants import *
from dcim.models import *
from ipam.api.nested_serializers import (
NestedASNSerializer, NestedIPAddressSerializer, NestedVLANSerializer, NestedVRFSerializer,
NestedL2VPNTerminationSerializer,
)
from ipam.models import ASN, VLAN
from netbox.api import ChoiceField, ContentTypeField, SerializedPKRelatedField
@ -823,6 +824,7 @@ class InterfaceSerializer(NetBoxModelSerializer, LinkTerminationSerializer, Conn
many=True
)
vrf = NestedVRFSerializer(required=False, allow_null=True)
l2vpn_termination = NestedL2VPNTerminationSerializer(read_only=True)
cable = NestedCableSerializer(read_only=True)
wireless_link = NestedWirelessLinkSerializer(read_only=True)
wireless_lans = SerializedPKRelatedField(
@ -841,7 +843,7 @@ class InterfaceSerializer(NetBoxModelSerializer, LinkTerminationSerializer, Conn
'mtu', 'mac_address', 'speed', 'duplex', 'wwn', 'mgmt_only', 'description', 'mode', 'rf_role', 'rf_channel',
'poe_mode', 'poe_type', 'rf_channel_frequency', 'rf_channel_width', 'tx_power', 'untagged_vlan',
'tagged_vlans', 'mark_connected', 'cable', 'wireless_link', 'link_peer', 'link_peer_type', 'wireless_lans',
'vrf', 'connected_endpoint', 'connected_endpoint_type', 'connected_endpoint_reachable', 'tags',
'vrf', 'l2vpn_termination', 'connected_endpoint', 'connected_endpoint_type', 'connected_endpoint_reachable', 'tags',
'custom_fields', 'created', 'last_updated', 'count_ipaddresses', 'count_fhrp_groups', '_occupied',
]

View File

@ -649,6 +649,12 @@ class Interface(ModularComponentModel, BaseInterface, LinkTermination, PathEndpo
object_id_field='interface_id',
related_query_name='+'
)
l2vpn_terminations = GenericRelation(
to='ipam.L2VPNTermination',
content_type_field='assigned_object_type',
object_id_field='assigned_object_id',
related_query_name='interface',
)
clone_fields = ['device', 'parent', 'bridge', 'lag', 'type', 'mgmt_only', 'poe_mode', 'poe_type']
@ -823,6 +829,10 @@ class Interface(ModularComponentModel, BaseInterface, LinkTermination, PathEndpo
def link(self):
return self.cable or self.wireless_link
@property
def l2vpn_termination(self):
return self.l2vpn_terminations.first()
#
# Pass-through ports

View File

@ -1,6 +1,7 @@
from rest_framework import serializers
from ipam import models
from ipam.models.l2vpn import L2VPNTermination, L2VPN
from netbox.api import WritableNestedSerializer
__all__ = [
@ -10,6 +11,8 @@ __all__ = [
'NestedFHRPGroupAssignmentSerializer',
'NestedIPAddressSerializer',
'NestedIPRangeSerializer',
'NestedL2VPNSerializer',
'NestedL2VPNTerminationSerializer',
'NestedPrefixSerializer',
'NestedRIRSerializer',
'NestedRoleSerializer',
@ -190,3 +193,28 @@ class NestedServiceSerializer(WritableNestedSerializer):
class Meta:
model = models.Service
fields = ['id', 'url', 'display', 'name', 'protocol', 'ports']
#
# L2VPN
#
class NestedL2VPNSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpn-detail')
class Meta:
model = L2VPN
fields = [
'id', 'url', 'display', 'identifier', 'name', 'slug', 'type'
]
class NestedL2VPNTerminationSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpntermination-detail')
l2vpn = NestedL2VPNSerializer()
class Meta:
model = L2VPNTermination
fields = [
'id', 'url', 'display', 'l2vpn'
]

View File

@ -19,6 +19,9 @@ from .nested_serializers import *
#
# ASNs
#
from .nested_serializers import NestedL2VPNSerializer
from ..models.l2vpn import L2VPNTermination, L2VPN
class ASNSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:asn-detail')
@ -204,13 +207,14 @@ class VLANSerializer(NetBoxModelSerializer):
tenant = NestedTenantSerializer(required=False, allow_null=True)
status = ChoiceField(choices=VLANStatusChoices, required=False)
role = NestedRoleSerializer(required=False, allow_null=True)
l2vpn_termination = NestedL2VPNTerminationSerializer(read_only=True)
prefix_count = serializers.IntegerField(read_only=True)
class Meta:
model = VLAN
fields = [
'id', 'url', 'display', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'tags',
'custom_fields', 'created', 'last_updated', 'prefix_count',
'id', 'url', 'display', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description',
'l2vpn_termination', 'tags', 'custom_fields', 'created', 'last_updated', 'prefix_count',
]
@ -433,3 +437,54 @@ class ServiceSerializer(NetBoxModelSerializer):
'id', 'url', 'display', 'device', 'virtual_machine', 'name', 'ports', 'protocol', 'ipaddresses',
'description', 'tags', 'custom_fields', 'created', 'last_updated',
]
#
# L2VPN
#
class L2VPNSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpn-detail')
type = ChoiceField(choices=L2VPNTypeChoices, required=False)
import_targets = SerializedPKRelatedField(
queryset=RouteTarget.objects.all(),
serializer=NestedRouteTargetSerializer,
required=False,
many=True
)
export_targets = SerializedPKRelatedField(
queryset=RouteTarget.objects.all(),
serializer=NestedRouteTargetSerializer,
required=False,
many=True
)
tenant = NestedTenantSerializer(required=False, allow_null=True)
class Meta:
model = L2VPN
fields = [
'id', 'url', 'display', 'identifier', 'name', 'slug', 'type', 'import_targets', 'export_targets',
'description', 'tenant', 'tags', 'custom_fields', 'created', 'last_updated'
]
class L2VPNTerminationSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpntermination-detail')
l2vpn = NestedL2VPNSerializer()
assigned_object_type = ContentTypeField(
queryset=ContentType.objects.all()
)
assigned_object = serializers.SerializerMethodField(read_only=True)
class Meta:
model = L2VPNTermination
fields = [
'id', 'url', 'display', 'l2vpn', 'assigned_object_type', 'assigned_object_id',
'assigned_object', 'tags', 'custom_fields', 'created', 'last_updated'
]
@swagger_serializer_method(serializer_or_field=serializers.DictField)
def get_assigned_object(self, instance):
serializer = get_serializer_for_model(instance.assigned_object, prefix='Nested')
context = {'request': self.context['request']}
return serializer(instance.assigned_object, context=context).data

View File

@ -45,6 +45,10 @@ router.register('vlans', views.VLANViewSet)
router.register('service-templates', views.ServiceTemplateViewSet)
router.register('services', views.ServiceViewSet)
# L2VPN
router.register('l2vpns', views.L2VPNViewSet)
router.register('l2vpn-terminations', views.L2VPNTerminationViewSet)
app_name = 'ipam-api'
urlpatterns = [

View File

@ -18,6 +18,7 @@ from netbox.config import get_config
from utilities.constants import ADVISORY_LOCK_KEYS
from utilities.utils import count_related
from . import serializers
from ipam.models import L2VPN, L2VPNTermination
class IPAMRootView(APIRootView):
@ -157,6 +158,18 @@ class ServiceViewSet(NetBoxModelViewSet):
filterset_class = filtersets.ServiceFilterSet
class L2VPNViewSet(NetBoxModelViewSet):
queryset = L2VPN.objects.prefetch_related('import_targets', 'export_targets', 'tenant', 'tags')
serializer_class = serializers.L2VPNSerializer
filterset_class = filtersets.L2VPNFilterSet
class L2VPNTerminationViewSet(NetBoxModelViewSet):
queryset = L2VPNTermination.objects.prefetch_related('assigned_object')
serializer_class = serializers.L2VPNTerminationSerializer
filterset_class = filtersets.L2VPNTerminationFilterSet
#
# Views
#

View File

@ -170,3 +170,52 @@ class ServiceProtocolChoices(ChoiceSet):
(PROTOCOL_UDP, 'UDP'),
(PROTOCOL_SCTP, 'SCTP'),
)
class L2VPNTypeChoices(ChoiceSet):
TYPE_VPLS = 'vpls'
TYPE_VPWS = 'vpws'
TYPE_EPL = 'epl'
TYPE_EVPL = 'evpl'
TYPE_EPLAN = 'ep-lan'
TYPE_EVPLAN = 'evp-lan'
TYPE_EPTREE = 'ep-tree'
TYPE_EVPTREE = 'evp-tree'
TYPE_VXLAN = 'vxlan'
TYPE_VXLAN_EVPN = 'vxlan-evpn'
TYPE_MPLS_EVPN = 'mpls-evpn'
TYPE_PBB_EVPN = 'pbb-evpn'
CHOICES = (
('VPLS', (
(TYPE_VPWS, 'VPWS'),
(TYPE_VPLS, 'VPLS'),
)),
('VXLAN', (
(TYPE_VXLAN, 'VXLAN'),
(TYPE_VXLAN_EVPN, 'VXLAN-EVPN'),
)),
('L2VPN E-VPN', (
(TYPE_MPLS_EVPN, 'MPLS EVPN'),
(TYPE_PBB_EVPN, 'PBB EVPN'),
)),
('E-Line', (
(TYPE_EPL, 'EPL'),
(TYPE_EVPL, 'EVPL'),
)),
('E-LAN', (
(TYPE_EPLAN, 'Ethernet Private LAN'),
(TYPE_EVPLAN, 'Ethernet Virtual Private LAN'),
)),
('E-Tree', (
(TYPE_EPTREE, 'Ethernet Private Tree'),
(TYPE_EVPTREE, 'Ethernet Virtual Private Tree'),
)),
)
P2P = (
TYPE_VPWS,
TYPE_EPL,
TYPE_EPLAN,
TYPE_EPTREE
)

View File

@ -90,3 +90,9 @@ VLANGROUP_SCOPE_TYPES = (
# 16-bit port number
SERVICE_PORT_MIN = 1
SERVICE_PORT_MAX = 65535
L2VPN_ASSIGNMENT_MODELS = Q(
Q(app_label='dcim', model='interface') |
Q(app_label='ipam', model='vlan') |
Q(app_label='virtualization', model='vminterface')
)

View File

@ -23,6 +23,8 @@ __all__ = (
'FHRPGroupFilterSet',
'IPAddressFilterSet',
'IPRangeFilterSet',
'L2VPNFilterSet',
'L2VPNTerminationFilterSet',
'PrefixFilterSet',
'RIRFilterSet',
'RoleFilterSet',
@ -922,3 +924,113 @@ class ServiceFilterSet(NetBoxModelFilterSet):
return queryset
qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
return queryset.filter(qs_filter)
#
# L2VPN
#
class L2VPNFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
import_target_id = django_filters.ModelMultipleChoiceFilter(
field_name='import_targets',
queryset=RouteTarget.objects.all(),
label='Import target',
)
import_target = django_filters.ModelMultipleChoiceFilter(
field_name='import_targets__name',
queryset=RouteTarget.objects.all(),
to_field_name='name',
label='Import target (name)',
)
export_target_id = django_filters.ModelMultipleChoiceFilter(
field_name='export_targets',
queryset=RouteTarget.objects.all(),
label='Export target',
)
export_target = django_filters.ModelMultipleChoiceFilter(
field_name='export_targets__name',
queryset=RouteTarget.objects.all(),
to_field_name='name',
label='Export target (name)',
)
class Meta:
model = L2VPN
fields = ['id', 'identifier', 'name', 'type', 'description']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(identifier=value) | Q(name__icontains=value) | Q(description__icontains=value)
return queryset.filter(qs_filter)
class L2VPNTerminationFilterSet(NetBoxModelFilterSet):
l2vpn_id = django_filters.ModelMultipleChoiceFilter(
queryset=L2VPN.objects.all(),
label='L2VPN (ID)',
)
l2vpn = django_filters.ModelMultipleChoiceFilter(
field_name='l2vpn__name',
queryset=L2VPN.objects.all(),
to_field_name='name',
label='L2VPN (name)',
)
device = MultiValueCharFilter(
method='filter_device',
field_name='name',
label='Device (name)',
)
device_id = MultiValueNumberFilter(
method='filter_device',
field_name='pk',
label='Device (ID)',
)
interface = django_filters.ModelMultipleChoiceFilter(
field_name='interface__name',
queryset=Interface.objects.all(),
to_field_name='name',
label='Interface (name)',
)
interface_id = django_filters.ModelMultipleChoiceFilter(
field_name='interface',
queryset=Interface.objects.all(),
label='Interface (ID)',
)
vlan = django_filters.ModelMultipleChoiceFilter(
field_name='vlan__name',
queryset=VLAN.objects.all(),
to_field_name='name',
label='VLAN (name)',
)
vlan_vid = django_filters.NumberFilter(
field_name='vlan__vid',
label='VLAN number (1-4094)',
)
vlan_id = django_filters.ModelMultipleChoiceFilter(
field_name='vlan',
queryset=VLAN.objects.all(),
label='VLAN (ID)',
)
class Meta:
model = L2VPNTermination
fields = ['id', ]
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(l2vpn__name__icontains=value)
return queryset.filter(qs_filter)
def filter_device(self, queryset, name, value):
devices = Device.objects.filter(**{'{}__in'.format(name): value})
if not devices.exists():
return queryset.none()
interface_ids = []
for device in devices:
interface_ids.extend(device.vc_interfaces().values_list('id', flat=True))
return queryset.filter(
interface__in=interface_ids
)

View File

@ -18,6 +18,8 @@ __all__ = (
'FHRPGroupBulkEditForm',
'IPAddressBulkEditForm',
'IPRangeBulkEditForm',
'L2VPNBulkEditForm',
'L2VPNTerminationBulkEditForm',
'PrefixBulkEditForm',
'RIRBulkEditForm',
'RoleBulkEditForm',
@ -440,3 +442,24 @@ class ServiceTemplateBulkEditForm(NetBoxModelBulkEditForm):
class ServiceBulkEditForm(ServiceTemplateBulkEditForm):
model = Service
class L2VPNBulkEditForm(NetBoxModelBulkEditForm):
tenant = DynamicModelChoiceField(
queryset=Tenant.objects.all(),
required=False
)
description = forms.CharField(
max_length=100,
required=False
)
model = L2VPN
fieldsets = (
(None, ('tenant', 'description')),
)
nullable_fields = ('tenant', 'description',)
class L2VPNTerminationBulkEditForm(NetBoxModelBulkEditForm):
model = L2VPN

View File

@ -1,5 +1,6 @@
from django import forms
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from dcim.models import Device, Interface, Site
from ipam.choices import *
@ -16,6 +17,8 @@ __all__ = (
'FHRPGroupCSVForm',
'IPAddressCSVForm',
'IPRangeCSVForm',
'L2VPNCSVForm',
'L2VPNTerminationCSVForm',
'PrefixCSVForm',
'RIRCSVForm',
'RoleCSVForm',
@ -425,3 +428,83 @@ class ServiceCSVForm(NetBoxModelCSVForm):
class Meta:
model = Service
fields = ('device', 'virtual_machine', 'name', 'protocol', 'ports', 'description')
class L2VPNCSVForm(NetBoxModelCSVForm):
tenant = CSVModelChoiceField(
queryset=Tenant.objects.all(),
required=False,
to_field_name='name',
)
type = CSVChoiceField(
choices=L2VPNTypeChoices,
help_text='IP protocol'
)
class Meta:
model = L2VPN
fields = ('identifier', 'name', 'slug', 'type', 'description')
class L2VPNTerminationCSVForm(NetBoxModelCSVForm):
l2vpn = CSVModelChoiceField(
queryset=L2VPN.objects.all(),
required=True,
to_field_name='name',
label='L2VPN',
)
device = CSVModelChoiceField(
queryset=Device.objects.all(),
required=False,
to_field_name='name',
help_text='Parent device (for interface)'
)
virtual_machine = CSVModelChoiceField(
queryset=VirtualMachine.objects.all(),
required=False,
to_field_name='name',
help_text='Parent virtual machine (for interface)'
)
interface = CSVModelChoiceField(
queryset=Interface.objects.none(), # Can also refer to VMInterface
required=False,
to_field_name='name',
help_text='Assigned interface (device or VM)'
)
vlan = CSVModelChoiceField(
queryset=VLAN.objects.all(),
required=False,
to_field_name='name',
help_text='Assigned VLAN'
)
class Meta:
model = L2VPNTermination
fields = ('l2vpn', 'device', 'virtual_machine', 'interface', 'vlan')
def __init__(self, data=None, *args, **kwargs):
super().__init__(data, *args, **kwargs)
if data:
# Limit interface queryset by device or VM
if data.get('device'):
self.fields['interface'].queryset = Interface.objects.filter(
**{f"device__{self.fields['device'].to_field_name}": data['device']}
)
elif data.get('virtual_machine'):
self.fields['interface'].queryset = VMInterface.objects.filter(
**{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
)
def clean(self):
super().clean()
if self.cleaned_data.get('device') and self.cleaned_data.get('virtual_machine'):
raise ValidationError('Cannot import device and VM interface terminations simultaneously.')
if not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')):
raise ValidationError('Each termination must specify either an interface or a VLAN.')
if self.cleaned_data.get('interface') and self.cleaned_data.get('vlan'):
raise ValidationError('Cannot assign both an interface and a VLAN.')
self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')

View File

@ -20,6 +20,8 @@ __all__ = (
'FHRPGroupFilterForm',
'IPAddressFilterForm',
'IPRangeFilterForm',
'L2VPNFilterForm',
'L2VPNTerminationFilterForm',
'PrefixFilterForm',
'RIRFilterForm',
'RoleFilterForm',
@ -475,3 +477,30 @@ class ServiceTemplateFilterForm(NetBoxModelFilterSetForm):
class ServiceFilterForm(ServiceTemplateFilterForm):
model = Service
class L2VPNFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
model = L2VPN
fieldsets = (
(None, ('type', )),
('Tenant', ('tenant_group_id', 'tenant_id')),
)
type = forms.ChoiceField(
choices=add_blank_choice(L2VPNTypeChoices),
required=False,
widget=StaticSelect()
)
class L2VPNTerminationFilterForm(NetBoxModelFilterSetForm):
model = L2VPNTermination
fieldsets = (
(None, ('l2vpn', )),
)
l2vpn = DynamicModelChoiceField(
queryset=L2VPN.objects.all(),
required=True,
query_params={},
label='L2VPN',
fetch_trigger='open'
)

View File

@ -1,5 +1,6 @@
from django import forms
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from dcim.models import Device, Interface, Location, Rack, Region, Site, SiteGroup
from extras.models import Tag
@ -7,9 +8,9 @@ from ipam.choices import *
from ipam.constants import *
from ipam.formfields import IPNetworkFormField
from ipam.models import *
from ipam.models import ASN
from netbox.forms import NetBoxModelForm
from tenancy.forms import TenancyForm
from tenancy.models import Tenant
from utilities.exceptions import PermissionsViolation
from utilities.forms import (
add_blank_choice, BootstrapMixin, ContentTypeChoiceField, DatePicker, DynamicModelChoiceField,
@ -26,6 +27,8 @@ __all__ = (
'IPAddressBulkAddForm',
'IPAddressForm',
'IPRangeForm',
'L2VPNForm',
'L2VPNTerminationForm',
'PrefixForm',
'RIRForm',
'RoleForm',
@ -861,3 +864,110 @@ class ServiceCreateForm(ServiceForm):
self.cleaned_data['description'] = service_template.description
elif not all(self.cleaned_data[f] for f in ('name', 'protocol', 'ports')):
raise forms.ValidationError("Must specify name, protocol, and port(s) if not using a service template.")
#
# L2VPN
#
class L2VPNForm(TenancyForm, NetBoxModelForm):
slug = SlugField()
import_targets = DynamicModelMultipleChoiceField(
queryset=RouteTarget.objects.all(),
required=False
)
export_targets = DynamicModelMultipleChoiceField(
queryset=RouteTarget.objects.all(),
required=False
)
fieldsets = (
('L2VPN', ('name', 'slug', 'type', 'identifier', 'description', 'tags')),
('Route Targets', ('import_targets', 'export_targets')),
('Tenancy', ('tenant_group', 'tenant')),
)
class Meta:
model = L2VPN
fields = (
'name', 'slug', 'type', 'identifier', 'description', 'import_targets', 'export_targets', 'tenant', 'tags'
)
widgets = {
'type': StaticSelect(),
}
class L2VPNTerminationForm(NetBoxModelForm):
l2vpn = DynamicModelChoiceField(
queryset=L2VPN.objects.all(),
required=True,
query_params={},
label='L2VPN',
fetch_trigger='open'
)
device = DynamicModelChoiceField(
queryset=Device.objects.all(),
required=False,
query_params={}
)
vlan = DynamicModelChoiceField(
queryset=VLAN.objects.all(),
required=False,
query_params={
'available_on_device': '$device'
}
)
interface = DynamicModelChoiceField(
queryset=Interface.objects.all(),
required=False,
query_params={
'device_id': '$device'
}
)
virtual_machine = DynamicModelChoiceField(
queryset=VirtualMachine.objects.all(),
required=False,
query_params={}
)
vminterface = DynamicModelChoiceField(
queryset=VMInterface.objects.all(),
required=False,
query_params={
'virtual_machine_id': '$virtual_machine'
}
)
class Meta:
model = L2VPNTermination
fields = ('l2vpn', )
def __init__(self, *args, **kwargs):
instance = kwargs.get('instance')
initial = kwargs.get('initial', {}).copy()
if instance:
if type(instance.assigned_object) is Interface:
initial['device'] = instance.assigned_object.parent
initial['interface'] = instance.assigned_object
elif type(instance.assigned_object) is VLAN:
initial['vlan'] = instance.assigned_object
elif type(instance.assigned_object) is VMInterface:
initial['vminterface'] = instance.assigned_object
kwargs['initial'] = initial
super().__init__(*args, **kwargs)
def clean(self):
super().clean()
interface = self.cleaned_data.get('interface')
vminterface = self.cleaned_data.get('vminterface')
vlan = self.cleaned_data.get('vlan')
if not (interface or vminterface or vlan):
raise ValidationError('A termination must specify an interface or VLAN.')
if len([x for x in (interface, vminterface, vlan) if x]) > 1:
raise ValidationError('A termination can only have one terminating object (an interface or VLAN).')
self.instance.assigned_object = interface or vminterface or vlan

View File

@ -17,6 +17,12 @@ class IPAMQuery(graphene.ObjectType):
ip_range = ObjectField(IPRangeType)
ip_range_list = ObjectListField(IPRangeType)
l2vpn = ObjectField(L2VPNType)
l2vpn_list = ObjectListField(L2VPNType)
l2vpn_termination = ObjectField(L2VPNTerminationType)
l2vpn_termination_list = ObjectListField(L2VPNTerminationType)
prefix = ObjectField(PrefixType)
prefix_list = ObjectListField(PrefixType)

View File

@ -11,6 +11,8 @@ __all__ = (
'FHRPGroupAssignmentType',
'IPAddressType',
'IPRangeType',
'L2VPNType',
'L2VPNTerminationType',
'PrefixType',
'RIRType',
'RoleType',
@ -151,3 +153,17 @@ class VRFType(NetBoxObjectType):
model = models.VRF
fields = '__all__'
filterset_class = filtersets.VRFFilterSet
class L2VPNType(NetBoxObjectType):
class Meta:
model = models.L2VPN
fields = '__all__'
filtersets_class = filtersets.L2VPNFilterSet
class L2VPNTerminationType(NetBoxObjectType):
class Meta:
model = models.L2VPNTermination
fields = '__all__'
filtersets_class = filtersets.L2VPNTerminationFilterSet

View File

@ -0,0 +1,62 @@
# Generated by Django 4.0.5 on 2022-07-06 16:51
import django.core.serializers.json
from django.db import migrations, models
import django.db.models.deletion
import taggit.managers
class Migration(migrations.Migration):
dependencies = [
('extras', '0076_configcontext_locations'),
('contenttypes', '0002_remove_content_type_name'),
('tenancy', '0007_contact_link'),
('ipam', '0058_ipaddress_nat_inside_nonunique'),
]
operations = [
migrations.CreateModel(
name='L2VPN',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False)),
('created', models.DateTimeField(auto_now_add=True, null=True)),
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder)),
('name', models.CharField(max_length=100, unique=True)),
('slug', models.SlugField()),
('type', models.CharField(max_length=50)),
('identifier', models.BigIntegerField(blank=True, null=True, unique=True)),
('description', models.TextField(blank=True, null=True)),
('export_targets', models.ManyToManyField(blank=True, related_name='exporting_l2vpns', to='ipam.routetarget')),
('import_targets', models.ManyToManyField(blank=True, related_name='importing_l2vpns', to='ipam.routetarget')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),
('tenant', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='l2vpns', to='tenancy.tenant')),
],
options={
'verbose_name': 'L2VPN',
'ordering': ('identifier', 'name'),
},
),
migrations.CreateModel(
name='L2VPNTermination',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False)),
('created', models.DateTimeField(auto_now_add=True, null=True)),
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder)),
('assigned_object_id', models.PositiveBigIntegerField()),
('assigned_object_type', models.ForeignKey(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'dcim'), ('model', 'interface')), models.Q(('app_label', 'ipam'), ('model', 'vlan')), models.Q(('app_label', 'virtualization'), ('model', 'vminterface')), _connector='OR')), on_delete=django.db.models.deletion.PROTECT, related_name='+', to='contenttypes.contenttype')),
('l2vpn', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='terminations', to='ipam.l2vpn')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),
],
options={
'verbose_name': 'L2VPN Termination',
'ordering': ('l2vpn',),
},
),
migrations.AddConstraint(
model_name='l2vpntermination',
constraint=models.UniqueConstraint(fields=('assigned_object_type', 'assigned_object_id'), name='ipam_l2vpntermination_assigned_object'),
),
]

View File

@ -2,6 +2,7 @@
from .fhrp import *
from .vrfs import *
from .ip import *
from .l2vpn import *
from .services import *
from .vlans import *
@ -12,6 +13,8 @@ __all__ = (
'IPRange',
'FHRPGroup',
'FHRPGroupAssignment',
'L2VPN',
'L2VPNTermination',
'Prefix',
'RIR',
'Role',

112
netbox/ipam/models/l2vpn.py Normal file
View File

@ -0,0 +1,112 @@
from django.contrib.contenttypes.fields import GenericRelation, GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from ipam.choices import L2VPNTypeChoices
from ipam.constants import L2VPN_ASSIGNMENT_MODELS
from netbox.models import NetBoxModel
class L2VPN(NetBoxModel):
name = models.CharField(
max_length=100,
unique=True
)
slug = models.SlugField()
type = models.CharField(max_length=50, choices=L2VPNTypeChoices)
identifier = models.BigIntegerField(
null=True,
blank=True,
unique=True
)
import_targets = models.ManyToManyField(
to='ipam.RouteTarget',
related_name='importing_l2vpns',
blank=True,
)
export_targets = models.ManyToManyField(
to='ipam.RouteTarget',
related_name='exporting_l2vpns',
blank=True
)
description = models.TextField(null=True, blank=True)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='l2vpns',
blank=True,
null=True
)
contacts = GenericRelation(
to='tenancy.ContactAssignment'
)
class Meta:
ordering = ('identifier', 'name')
verbose_name = 'L2VPN'
def __str__(self):
if self.identifier:
return f'{self.name} ({self.identifier})'
return f'{self.name}'
def get_absolute_url(self):
return reverse('ipam:l2vpn', args=[self.pk])
class L2VPNTermination(NetBoxModel):
l2vpn = models.ForeignKey(
to='ipam.L2VPN',
on_delete=models.CASCADE,
related_name='terminations'
)
assigned_object_type = models.ForeignKey(
to=ContentType,
limit_choices_to=L2VPN_ASSIGNMENT_MODELS,
on_delete=models.PROTECT,
related_name='+'
)
assigned_object_id = models.PositiveBigIntegerField()
assigned_object = GenericForeignKey(
ct_field='assigned_object_type',
fk_field='assigned_object_id'
)
class Meta:
ordering = ('l2vpn',)
verbose_name = 'L2VPN Termination'
constraints = (
models.UniqueConstraint(
fields=('assigned_object_type', 'assigned_object_id'),
name='ipam_l2vpntermination_assigned_object'
),
)
def __str__(self):
if self.pk is not None:
return f'{self.assigned_object} <> {self.l2vpn}'
return super().__str__()
def get_absolute_url(self):
return reverse('ipam:l2vpntermination', args=[self.pk])
def clean(self):
# Only check is assigned_object is set. Required otherwise we have an Integrity Error thrown.
if self.assigned_object:
obj_id = self.assigned_object.pk
obj_type = ContentType.objects.get_for_model(self.assigned_object)
if L2VPNTermination.objects.filter(assigned_object_id=obj_id, assigned_object_type=obj_type).\
exclude(pk=self.pk).count() > 0:
raise ValidationError(f'L2VPN Termination already assigned ({self.assigned_object})')
# Only check if L2VPN is set and is of type P2P
if self.l2vpn and self.l2vpn.type in L2VPNTypeChoices.P2P:
terminations_count = L2VPNTermination.objects.filter(l2vpn=self.l2vpn).exclude(pk=self.pk).count()
if terminations_count >= 2:
l2vpn_type = self.l2vpn.get_type_display()
raise ValidationError(
f'{l2vpn_type} L2VPNs cannot have more than two terminations; found {terminations_count} already '
f'defined.'
)

View File

@ -1,4 +1,4 @@
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
@ -8,6 +8,7 @@ from django.urls import reverse
from dcim.models import Interface
from ipam.choices import *
from ipam.constants import *
from ipam.models import L2VPNTermination
from ipam.querysets import VLANQuerySet
from netbox.models import OrganizationalModel, NetBoxModel
from virtualization.models import VMInterface
@ -173,6 +174,13 @@ class VLAN(NetBoxModel):
blank=True
)
l2vpn_terminations = GenericRelation(
to='ipam.L2VPNTermination',
content_type_field='assigned_object_type',
object_id_field='assigned_object_id',
related_query_name='vlan'
)
objects = VLANQuerySet.as_manager()
clone_fields = [
@ -227,3 +235,7 @@ class VLAN(NetBoxModel):
Q(untagged_vlan_id=self.pk) |
Q(tagged_vlans=self.pk)
).distinct()
@property
def l2vpn_termination(self):
return self.l2vpn_terminations.first()

View File

@ -1,5 +1,6 @@
from .fhrp import *
from .ip import *
from .l2vpn import *
from .services import *
from .vlans import *
from .vrfs import *

View File

@ -0,0 +1,57 @@
import django_tables2 as tables
from ipam.models import *
from ipam.models.l2vpn import L2VPN, L2VPNTermination
from netbox.tables import NetBoxTable, columns
__all__ = (
'L2VPNTable',
'L2VPNTerminationTable',
)
L2VPN_TARGETS = """
{% for rt in value.all %}
<a href="{{ rt.get_absolute_url }}">{{ rt }}</a>{% if not forloop.last %}<br />{% endif %}
{% endfor %}
"""
class L2VPNTable(NetBoxTable):
pk = columns.ToggleColumn()
name = tables.Column(
linkify=True
)
import_targets = columns.TemplateColumn(
template_code=L2VPN_TARGETS,
orderable=False
)
export_targets = columns.TemplateColumn(
template_code=L2VPN_TARGETS,
orderable=False
)
class Meta(NetBoxTable.Meta):
model = L2VPN
fields = ('pk', 'name', 'slug', 'type', 'description', 'import_targets', 'export_targets', 'tenant', 'actions')
default_columns = ('pk', 'name', 'type', 'description', 'actions')
class L2VPNTerminationTable(NetBoxTable):
pk = columns.ToggleColumn()
l2vpn = tables.Column(
verbose_name='L2VPN',
linkify=True
)
assigned_object_type = columns.ContentTypeColumn(
verbose_name='Object Type'
)
assigned_object = tables.Column(
verbose_name='Assigned Object',
linkify=True,
orderable=False
)
class Meta(NetBoxTable.Meta):
model = L2VPNTermination
fields = ('pk', 'l2vpn', 'assigned_object_type', 'assigned_object', 'actions')
default_columns = ('pk', 'l2vpn', 'assigned_object_type', 'assigned_object', 'actions')

View File

@ -914,3 +914,98 @@ class ServiceTest(APIViewTestCases.APIViewTestCase):
'ports': [6],
},
]
class L2VPNTest(APIViewTestCases.APIViewTestCase):
model = L2VPN
brief_fields = ['display', 'id', 'identifier', 'name', 'slug', 'type', 'url']
create_data = [
{
'name': 'L2VPN 4',
'slug': 'l2vpn-4',
'type': 'vxlan',
'identifier': 33343344
},
{
'name': 'L2VPN 5',
'slug': 'l2vpn-5',
'type': 'vxlan',
'identifier': 33343345
},
{
'name': 'L2VPN 6',
'slug': 'l2vpn-6',
'type': 'vpws',
'identifier': 33343346
},
]
bulk_update_data = {
'description': 'New description',
}
@classmethod
def setUpTestData(cls):
l2vpns = (
L2VPN(name='L2VPN 1', slug='l2vpn-1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', slug='l2vpn-2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', slug='l2vpn-3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
class L2VPNTerminationTest(APIViewTestCases.APIViewTestCase):
model = L2VPNTermination
brief_fields = ['display', 'id', 'l2vpn', 'url']
@classmethod
def setUpTestData(cls):
vlans = (
VLAN(name='VLAN 1', vid=651),
VLAN(name='VLAN 2', vid=652),
VLAN(name='VLAN 3', vid=653),
VLAN(name='VLAN 4', vid=654),
VLAN(name='VLAN 5', vid=655),
VLAN(name='VLAN 6', vid=656),
VLAN(name='VLAN 7', vid=657)
)
VLAN.objects.bulk_create(vlans)
l2vpns = (
L2VPN(name='L2VPN 1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
l2vpnterminations = (
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[2])
)
L2VPNTermination.objects.bulk_create(l2vpnterminations)
cls.create_data = [
{
'l2vpn': l2vpns[0].pk,
'assigned_object_type': 'ipam.vlan',
'assigned_object_id': vlans[3].pk,
},
{
'l2vpn': l2vpns[0].pk,
'assigned_object_type': 'ipam.vlan',
'assigned_object_id': vlans[4].pk,
},
{
'l2vpn': l2vpns[0].pk,
'assigned_object_type': 'ipam.vlan',
'assigned_object_id': vlans[5].pk,
},
]
cls.bulk_update_data = {
'l2vpn': l2vpns[2].pk
}

View File

@ -1463,3 +1463,100 @@ class ServiceTestCase(TestCase, ChangeLoggedFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'virtual_machine': [vms[0].name, vms[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class L2VPNTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = L2VPN.objects.all()
filterset = L2VPNFilterSet
@classmethod
def setUpTestData(cls):
l2vpns = (
L2VPN(name='L2VPN 1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
class L2VPNTerminationTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = L2VPNTermination.objects.all()
filterset = L2VPNTerminationFilterSet
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1')
device_type = DeviceType.objects.create(model='Device Type 1', manufacturer=manufacturer)
device_role = DeviceRole.objects.create(name='Switch')
device = Device.objects.create(
name='Device 1',
site=site,
device_type=device_type,
device_role=device_role,
status='active'
)
interfaces = (
Interface(name='Interface 1', device=device, type='1000baset'),
Interface(name='Interface 2', device=device, type='1000baset'),
Interface(name='Interface 3', device=device, type='1000baset'),
Interface(name='Interface 4', device=device, type='1000baset'),
Interface(name='Interface 5', device=device, type='1000baset'),
Interface(name='Interface 6', device=device, type='1000baset')
)
Interface.objects.bulk_create(interfaces)
vlans = (
VLAN(name='VLAN 1', vid=651),
VLAN(name='VLAN 2', vid=652),
VLAN(name='VLAN 3', vid=653),
VLAN(name='VLAN 4', vid=654),
VLAN(name='VLAN 5', vid=655)
)
VLAN.objects.bulk_create(vlans)
l2vpns = (
L2VPN(name='L2VPN 1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', type='vpls'), # No RD,
)
L2VPN.objects.bulk_create(l2vpns)
l2vpnterminations = (
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpns[1], assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpns[2], assigned_object=vlans[2]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=interfaces[0]),
L2VPNTermination(l2vpn=l2vpns[1], assigned_object=interfaces[1]),
L2VPNTermination(l2vpn=l2vpns[2], assigned_object=interfaces[2]),
)
L2VPNTermination.objects.bulk_create(l2vpnterminations)
def test_l2vpns(self):
l2vpns = L2VPN.objects.all()[:2]
params = {'l2vpn_id': [l2vpns[0].pk, l2vpns[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'l2vpn': ['L2VPN 1', 'L2VPN 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_interfaces(self):
interfaces = Interface.objects.all()[:2]
params = {'interface_id': [interfaces[0].pk, interfaces[1].pk]}
qs = self.filterset(params, self.queryset).qs
results = qs.all()
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'interface': ['Interface 1', 'Interface 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_vlans(self):
vlans = VLAN.objects.all()[:2]
params = {'vlan_id': [vlans[0].pk, vlans[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'vlan': ['VLAN 1', 'VLAN 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -2,8 +2,9 @@ from netaddr import IPNetwork, IPSet
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
from dcim.models import Interface, Device, DeviceRole, DeviceType, Manufacturer, Site
from ipam.choices import IPAddressRoleChoices, PrefixStatusChoices
from ipam.models import Aggregate, IPAddress, IPRange, Prefix, RIR, VLAN, VLANGroup, VRF
from ipam.models import Aggregate, IPAddress, IPRange, Prefix, RIR, VLAN, VLANGroup, VRF, L2VPN, L2VPNTermination
class TestAggregate(TestCase):
@ -538,3 +539,76 @@ class TestVLANGroup(TestCase):
VLAN.objects.create(name='VLAN 104', vid=104, group=vlangroup)
self.assertEqual(vlangroup.get_next_available_vid(), 105)
class TestL2VPNTermination(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1')
device_type = DeviceType.objects.create(model='Device Type 1', manufacturer=manufacturer)
device_role = DeviceRole.objects.create(name='Switch')
device = Device.objects.create(
name='Device 1',
site=site,
device_type=device_type,
device_role=device_role,
status='active'
)
interfaces = (
Interface(name='Interface 1', device=device, type='1000baset'),
Interface(name='Interface 2', device=device, type='1000baset'),
Interface(name='Interface 3', device=device, type='1000baset'),
Interface(name='Interface 4', device=device, type='1000baset'),
Interface(name='Interface 5', device=device, type='1000baset'),
)
Interface.objects.bulk_create(interfaces)
vlans = (
VLAN(name='VLAN 1', vid=651),
VLAN(name='VLAN 2', vid=652),
VLAN(name='VLAN 3', vid=653),
VLAN(name='VLAN 4', vid=654),
VLAN(name='VLAN 5', vid=655),
VLAN(name='VLAN 6', vid=656),
VLAN(name='VLAN 7', vid=657)
)
VLAN.objects.bulk_create(vlans)
l2vpns = (
L2VPN(name='L2VPN 1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
l2vpnterminations = (
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[2])
)
L2VPNTermination.objects.bulk_create(l2vpnterminations)
def test_duplicate_interface_terminations(self):
device = Device.objects.first()
interface = Interface.objects.filter(device=device).first()
l2vpn = L2VPN.objects.first()
L2VPNTermination.objects.create(l2vpn=l2vpn, assigned_object=interface)
duplicate = L2VPNTermination(l2vpn=l2vpn, assigned_object=interface)
self.assertRaises(ValidationError, duplicate.clean)
def test_duplicate_vlan_terminations(self):
vlan = Interface.objects.first()
l2vpn = L2VPN.objects.first()
L2VPNTermination.objects.create(l2vpn=l2vpn, assigned_object=vlan)
duplicate = L2VPNTermination(l2vpn=l2vpn, assigned_object=vlan)
self.assertRaises(ValidationError, duplicate.clean)

View File

@ -1,14 +1,18 @@
import datetime
from django.contrib.contenttypes.models import ContentType
from django.test import override_settings
from django.urls import reverse
from netaddr import IPNetwork
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site, Interface
from extras.choices import ObjectChangeActionChoices
from extras.models import ObjectChange
from ipam.choices import *
from ipam.models import *
from tenancy.models import Tenant
from utilities.testing import ViewTestCases, create_tags
from users.models import ObjectPermission
from utilities.testing import ViewTestCases, create_tags, post_data
class ASNTestCase(ViewTestCases.PrimaryObjectViewTestCase):
@ -746,3 +750,133 @@ class ServiceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
self.assertEqual(instance.protocol, service_template.protocol)
self.assertEqual(instance.ports, service_template.ports)
self.assertEqual(instance.description, service_template.description)
class L2VPNTestCase(ViewTestCases.PrimaryObjectViewTestCase):
model = L2VPN
csv_data = (
'name,slug,type,identifier',
'L2VPN 5,l2vpn-5,vxlan,456',
'L2VPN 6,l2vpn-6,vxlan,444',
)
bulk_edit_data = {
'description': 'New Description',
}
@classmethod
def setUpTestData(cls):
rts = (
RouteTarget(name='64534:123'),
RouteTarget(name='64534:321')
)
RouteTarget.objects.bulk_create(rts)
l2vpns = (
L2VPN(name='L2VPN 1', slug='l2vpn-1', type='vxlan', identifier='650001'),
L2VPN(name='L2VPN 2', slug='l2vpn-2', type='vxlan', identifier='650002'),
L2VPN(name='L2VPN 3', slug='l2vpn-3', type='vxlan', identifier='650003')
)
L2VPN.objects.bulk_create(l2vpns)
cls.form_data = {
'name': 'L2VPN 8',
'slug': 'l2vpn-8',
'type': 'vxlan',
'identifier': 123,
'description': 'Description',
'import_targets': [rts[0].pk],
'export_targets': [rts[1].pk]
}
print(cls.form_data)
class L2VPNTerminationTestCase(
ViewTestCases.GetObjectViewTestCase,
ViewTestCases.GetObjectChangelogViewTestCase,
ViewTestCases.CreateObjectViewTestCase,
ViewTestCases.EditObjectViewTestCase,
ViewTestCases.DeleteObjectViewTestCase,
ViewTestCases.ListObjectsViewTestCase,
ViewTestCases.BulkImportObjectsViewTestCase,
ViewTestCases.BulkDeleteObjectsViewTestCase,
):
model = L2VPNTermination
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1')
device_type = DeviceType.objects.create(model='Device Type 1', manufacturer=manufacturer)
device_role = DeviceRole.objects.create(name='Switch')
device = Device.objects.create(
name='Device 1',
site=site,
device_type=device_type,
device_role=device_role,
status='active'
)
interface = Interface.objects.create(name='Interface 1', device=device, type='1000baset')
l2vpn = L2VPN.objects.create(name='L2VPN 1', type='vxlan', identifier=650001)
l2vpn_vlans = L2VPN.objects.create(name='L2VPN 2', type='vxlan', identifier=650002)
vlans = (
VLAN(name='Vlan 1', vid=1001),
VLAN(name='Vlan 2', vid=1002),
VLAN(name='Vlan 3', vid=1003),
VLAN(name='Vlan 4', vid=1004),
VLAN(name='Vlan 5', vid=1005),
VLAN(name='Vlan 6', vid=1006)
)
VLAN.objects.bulk_create(vlans)
terminations = (
L2VPNTermination(l2vpn=l2vpn, assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpn, assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpn, assigned_object=vlans[2])
)
L2VPNTermination.objects.bulk_create(terminations)
cls.form_data = {
'l2vpn': l2vpn.pk,
'device': device.pk,
'interface': interface.pk,
}
cls.csv_data = (
"l2vpn,vlan",
"L2VPN 2,Vlan 4",
"L2VPN 2,Vlan 5",
"L2VPN 2,Vlan 6",
)
cls.bulk_edit_data = {}
#
# Custom assertions
#
def assertInstanceEqual(self, instance, data, exclude=None, api=False):
"""
Override parent
"""
if exclude is None:
exclude = []
fields = [k for k in data.keys() if k not in exclude]
model_dict = self.model_to_dict(instance, fields=fields, api=api)
# Omit any dictionary keys which are not instance attributes or have been excluded
relevant_data = {
k: v for k, v in data.items() if hasattr(instance, k) and k not in exclude
}
# Handle relations on the model
for k, v in model_dict.items():
if isinstance(v, object) and hasattr(v, 'first'):
model_dict[k] = v.first().pk
self.assertDictEqual(model_dict, relevant_data)

View File

@ -186,4 +186,26 @@ urlpatterns = [
path('services/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='service_changelog', kwargs={'model': Service}),
path('services/<int:pk>/journal/', ObjectJournalView.as_view(), name='service_journal', kwargs={'model': Service}),
# L2VPN
path('l2vpns/', views.L2VPNListView.as_view(), name='l2vpn_list'),
path('l2vpns/add/', views.L2VPNEditView.as_view(), name='l2vpn_add'),
path('l2vpns/import/', views.L2VPNBulkImportView.as_view(), name='l2vpn_import'),
path('l2vpns/edit/', views.L2VPNBulkEditView.as_view(), name='l2vpn_bulk_edit'),
path('l2vpns/delete/', views.L2VPNBulkDeleteView.as_view(), name='l2vpn_bulk_delete'),
path('l2vpns/<int:pk>/', views.L2VPNView.as_view(), name='l2vpn'),
path('l2vpns/<int:pk>/edit/', views.L2VPNEditView.as_view(), name='l2vpn_edit'),
path('l2vpns/<int:pk>/delete/', views.L2VPNDeleteView.as_view(), name='l2vpn_delete'),
path('l2vpns/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='l2vpn_changelog', kwargs={'model': L2VPN}),
path('l2vpns/<int:pk>/journal/', ObjectJournalView.as_view(), name='l2vpn_journal', kwargs={'model': L2VPN}),
path('l2vpn-terminations/', views.L2VPNTerminationListView.as_view(), name='l2vpntermination_list'),
path('l2vpn-terminations/add/', views.L2VPNTerminationEditView.as_view(), name='l2vpntermination_add'),
path('l2vpn-terminations/import/', views.L2VPNTerminationBulkImportView.as_view(), name='l2vpntermination_import'),
path('l2vpn-terminations/edit/', views.L2VPNTerminationBulkEditView.as_view(), name='l2vpntermination_bulk_edit'),
path('l2vpn-terminations/delete/', views.L2VPNTerminationBulkDeleteView.as_view(), name='l2vpntermination_bulk_delete'),
path('l2vpn-terminations/<int:pk>/', views.L2VPNTerminationView.as_view(), name='l2vpntermination'),
path('l2vpn-terminations/<int:pk>/edit/', views.L2VPNTerminationEditView.as_view(), name='l2vpntermination_edit'),
path('l2vpn-terminations/<int:pk>/delete/', views.L2VPNTerminationDeleteView.as_view(), name='l2vpntermination_delete'),
path('l2vpn-terminations/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='l2vpntermination_changelog', kwargs={'model': L2VPNTermination}),
path('l2vpn-terminations/<int:pk>/journal/', ObjectJournalView.as_view(), name='l2vpntermination_journal', kwargs={'model': L2VPNTermination}),
]

View File

@ -17,6 +17,7 @@ from . import filtersets, forms, tables
from .constants import *
from .models import *
from .models import ASN
from .tables.l2vpn import L2VPNTable, L2VPNTerminationTable
from .utils import add_requested_prefixes, add_available_ipaddresses, add_available_vlans
@ -1150,3 +1151,105 @@ class ServiceBulkDeleteView(generic.BulkDeleteView):
queryset = Service.objects.prefetch_related('device', 'virtual_machine')
filterset = filtersets.ServiceFilterSet
table = tables.ServiceTable
# L2VPN
class L2VPNListView(generic.ObjectListView):
queryset = L2VPN.objects.all()
table = L2VPNTable
filterset = filtersets.L2VPNFilterSet
filterset_form = forms.L2VPNFilterForm
class L2VPNView(generic.ObjectView):
queryset = L2VPN.objects.all()
def get_extra_context(self, request, instance):
terminations = L2VPNTermination.objects.restrict(request.user, 'view').filter(l2vpn=instance)
terminations_table = tables.L2VPNTerminationTable(terminations, user=request.user, exclude=('l2vpn', ))
terminations_table.configure(request)
import_targets_table = tables.RouteTargetTable(
instance.import_targets.prefetch_related('tenant'),
orderable=False
)
export_targets_table = tables.RouteTargetTable(
instance.export_targets.prefetch_related('tenant'),
orderable=False
)
return {
'terminations_table': terminations_table,
'import_targets_table': import_targets_table,
'export_targets_table': export_targets_table,
}
class L2VPNEditView(generic.ObjectEditView):
queryset = L2VPN.objects.all()
form = forms.L2VPNForm
class L2VPNDeleteView(generic.ObjectDeleteView):
queryset = L2VPN.objects.all()
class L2VPNBulkImportView(generic.BulkImportView):
queryset = L2VPN.objects.all()
model_form = forms.L2VPNCSVForm
table = tables.L2VPNTable
class L2VPNBulkEditView(generic.BulkEditView):
queryset = L2VPN.objects.all()
filterset = filtersets.L2VPNFilterSet
table = tables.L2VPNTable
form = forms.L2VPNBulkEditForm
class L2VPNBulkDeleteView(generic.BulkDeleteView):
queryset = L2VPN.objects.all()
filterset = filtersets.L2VPNFilterSet
table = tables.L2VPNTable
class L2VPNTerminationListView(generic.ObjectListView):
queryset = L2VPNTermination.objects.all()
table = L2VPNTerminationTable
filterset = filtersets.L2VPNTerminationFilterSet
filterset_form = forms.L2VPNTerminationFilterForm
class L2VPNTerminationView(generic.ObjectView):
queryset = L2VPNTermination.objects.all()
class L2VPNTerminationEditView(generic.ObjectEditView):
queryset = L2VPNTermination.objects.all()
form = forms.L2VPNTerminationForm
template_name = 'ipam/l2vpntermination_edit.html'
class L2VPNTerminationDeleteView(generic.ObjectDeleteView):
queryset = L2VPNTermination.objects.all()
class L2VPNTerminationBulkImportView(generic.BulkImportView):
queryset = L2VPNTermination.objects.all()
model_form = forms.L2VPNTerminationCSVForm
table = tables.L2VPNTerminationTable
class L2VPNTerminationBulkEditView(generic.BulkEditView):
queryset = L2VPNTermination.objects.all()
filterset = filtersets.L2VPNTerminationFilterSet
table = tables.L2VPNTerminationTable
form = forms.L2VPNTerminationBulkEditForm
class L2VPNTerminationBulkDeleteView(generic.BulkDeleteView):
queryset = L2VPNTermination.objects.all()
filterset = filtersets.L2VPNTerminationFilterSet
table = tables.L2VPNTerminationTable

View File

@ -260,6 +260,13 @@ IPAM_MENU = Menu(
get_model_item('ipam', 'vlangroup', 'VLAN Groups'),
),
),
MenuGroup(
label='L2VPNs',
items=(
get_model_item('ipam', 'l2vpn', 'L2VPNs'),
get_model_item('ipam', 'l2vpntermination', 'Terminations'),
),
),
MenuGroup(
label='Other',
items=(

View File

@ -104,6 +104,10 @@
<th scope="row">LAG</th>
<td>{{ object.lag|linkify|placeholder }}</td>
</tr>
<tr>
<th scope="row">L2VPN</th>
<td>{{ object.l2vpn_termination.l2vpn|linkify|placeholder }}</td>
</tr>
</table>
</div>
</div>

View File

@ -0,0 +1,81 @@
{% extends 'generic/object.html' %}
{% load helpers %}
{% load plugins %}
{% load render_table from django_tables2 %}
{% block content %}
<div class="row mb-3">
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">
L2VPN Attributes
</h5>
<div class="card-body">
<table class="table table-hover attr-table
<tr>
<th scope="row">Name</th>
<td>{{ object.name|placeholder }}</td>
</tr>
<tr>
<th scope="row">Slug</th>
<td>{{ object.slug|placeholder }}</td>
</tr>
<tr>
<th scope="row">Identifier</th>
<td>{{ object.identifier|placeholder }}</td>
</tr>
<tr>
<th scope="row">Type</th>
<td>{{ object.get_type_display }}</td>
</tr>
<tr>
<th scope="row">Description</th>
<td>{{ object.description|placeholder }}</td>
</tr>
<tr>
<th scope="row">Tenant</th>
<td>{{ object.tenant|placeholder }}</td>
</tr>
</table>
</div>
</div>
{% include 'inc/panels/contacts.html' %}
{% plugin_left_page object %}
</div>
<div class="col col-md-6">
{% include 'inc/panels/tags.html' with tags=object.tags.all url='circuits:circuit_list' %}
{% include 'inc/panels/custom_fields.html' %}
{% plugin_right_page object %}
</div>
</div>
<div class="row mb-3">
<div class="col col-md-6">
{% include 'inc/panel_table.html' with table=import_targets_table heading="Import Route Targets" %}
</div>
<div class="col col-md-6">
{% include 'inc/panel_table.html' with table=export_targets_table heading="Export Route Targets" %}
</div>
</div>
<div class="row mb-3">
<div class="col col-md-12">
<div class="card">
<h5 class="card-header">Terminations</h5>
<div class="card-body">
{% render_table terminations_table 'inc/table.html' %}
</div>
{% if perms.ipam.add_l2vpntermination %}
<div class="card-footer text-end noprint">
<a href="{% url 'ipam:l2vpntermination_add' %}?l2vpn={{ object.pk }}&return_url={{ object.get_absolute_url }}" class="btn btn-primary btn-sm">
<i class="mdi mdi-plus-thick" aria-hidden="true"></i> Add a Termination
</a>
</div>
{% endif %}
</div>
</div>
</div>
<div class="row mb-3">
<div class="col col-md-12">
{% plugin_full_width_page object %}
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,31 @@
{% extends 'generic/object.html' %}
{% load helpers %}
{% block content %}
<div class="row">
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">
L2VPN Attributes
</h5>
<div class="card-body">
<table class="table table-hover">
<tr>
<th scope="row">L2vPN</th>
<td>{{ object.l2vpn.name|placeholder }}</td>
</tr>
<tr>
<th scope="row">Assigned Object</th>
<td>{{ object.assigned_object.name|placeholder }}</td>
</tr>
</table>
</div>
</div>
</div>
<div class="col col-md-6">
{% include 'inc/panels/custom_fields.html' %}
{% include 'inc/panels/tags.html' with tags=object.tags.all url='ipam:l2vpntermination_list' %}
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,49 @@
{% extends 'generic/object_edit.html' %}
{% load helpers %}
{% load form_helpers %}
{% block form %}
<div class="field-group my-5">
<div class="row mb-2">
<h5 class="offset-sm-3">L2VPN Termination</h5>
</div>
{% render_field form.l2vpn %}
<div class="row mb-3">
<div class="offset-sm-3">
<ul class="nav nav-pills" role="tablist">
<li role="presentation" class="nav-item">
<button role="tab" type="button" id="vlan_tab" data-bs-toggle="tab" aria-controls="vlan" data-bs-target="#vlan" class="nav-link {% if not form.initial.interface or form.initial.vminterface %}active{% endif %}">
VLAN
</button>
</li>
<li role="presentation" class="nav-item">
<button role="tab" type="button" id="interface_tab" data-bs-toggle="tab" aria-controls="interface" data-bs-target="#interface" class="nav-link {% if form.initial.interface %}active{% endif %}">
Interface
</button>
</li>
<li role="presentation" class="nav-item">
<button role="tab" type="button" id="vminterface_tab" data-bs-toggle="tab" aria-controls="vminterface" data-bs-target="#vminterface" class="nav-link {% if form.initial.vminterface %}active{% endif %}">
VM Interface
</button>
</li>
</ul>
</div>
</div>
<div class="row mb-3">
<div class="tab-content p-0 border-0">
<div class="tab-pane {% if not form.initial.interface or form.initial.vminterface %}active{% endif %}" id="vlan" role="tabpanel" aria-labeled-by="vlan_tab">
{% render_field form.device %}
{% render_field form.vlan %}
</div>
<div class="tab-pane {% if form.initial.interface %}active{% endif %}" id="interface" role="tabpanel" aria-labeled-by="interface_tab">
{% render_field form.device %}
{% render_field form.interface %}
</div>
<div class="tab-pane {% if form.initial.vminterface %}active{% endif %}" id="vminterface" role="tabpanel" aria-labeled-by="vminterface_tab">
{% render_field form.virtual_machine %}
{% render_field form.vminterface %}
</div>
</div>
</div>
</div>
{% endblock %}

View File

@ -64,6 +64,10 @@
<th scope="row">Description</th>
<td>{{ object.description|placeholder }}</td>
</tr>
<tr>
<th scope="row">L2VPN</th>
<td>{{ object.l2vpn_termination.l2vpn|linkify|placeholder }}</td>
</tr>
</table>
</div>
</div>