#6454 fix merge conflicts

This commit is contained in:
Arthur
2022-08-18 15:31:54 -07:00
458 changed files with 14069 additions and 6039 deletions

View File

@@ -1,7 +1,8 @@
from rest_framework import serializers
from ipam import models
from netbox.api import WritableNestedSerializer
from ipam.models.l2vpn import L2VPNTermination, L2VPN
from netbox.api.serializers import WritableNestedSerializer
__all__ = [
'NestedAggregateSerializer',
@@ -10,6 +11,8 @@ __all__ = [
'NestedFHRPGroupAssignmentSerializer',
'NestedIPAddressSerializer',
'NestedIPRangeSerializer',
'NestedL2VPNSerializer',
'NestedL2VPNTerminationSerializer',
'NestedPrefixSerializer',
'NestedRIRSerializer',
'NestedRoleSerializer',
@@ -190,3 +193,28 @@ class NestedServiceSerializer(WritableNestedSerializer):
class Meta:
model = models.Service
fields = ['id', 'url', 'display', 'name', 'protocol', 'ports']
#
# L2VPN
#
class NestedL2VPNSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpn-detail')
class Meta:
model = L2VPN
fields = [
'id', 'url', 'display', 'identifier', 'name', 'slug', 'type'
]
class NestedL2VPNTerminationSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpntermination-detail')
l2vpn = NestedL2VPNSerializer()
class Meta:
model = L2VPNTermination
fields = [
'id', 'url', 'display', 'l2vpn'
]

View File

@@ -1,5 +1,3 @@
from collections import OrderedDict
from django.contrib.contenttypes.models import ContentType
from drf_yasg.utils import swagger_serializer_method
from rest_framework import serializers
@@ -8,7 +6,7 @@ from dcim.api.nested_serializers import NestedDeviceSerializer, NestedSiteSerial
from ipam.choices import *
from ipam.constants import IPADDRESS_ASSIGNMENT_MODELS, VLANGROUP_SCOPE_TYPES
from ipam.models import *
from netbox.api import ChoiceField, ContentTypeField, SerializedPKRelatedField
from netbox.api.fields import ChoiceField, ContentTypeField, SerializedPKRelatedField
from netbox.api.serializers import NetBoxModelSerializer
from netbox.constants import NESTED_SERIALIZER_PREFIX
from tenancy.api.nested_serializers import NestedTenantSerializer
@@ -20,6 +18,9 @@ from .nested_serializers import *
#
# ASNs
#
from .nested_serializers import NestedL2VPNSerializer
from ..models.l2vpn import L2VPNTermination, L2VPN
class ASNSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:asn-detail')
@@ -205,13 +206,14 @@ class VLANSerializer(NetBoxModelSerializer):
tenant = NestedTenantSerializer(required=False, allow_null=True)
status = ChoiceField(choices=VLANStatusChoices, required=False)
role = NestedRoleSerializer(required=False, allow_null=True)
l2vpn_termination = NestedL2VPNTerminationSerializer(read_only=True)
prefix_count = serializers.IntegerField(read_only=True)
class Meta:
model = VLAN
fields = [
'id', 'url', 'display', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'tags',
'custom_fields', 'created', 'last_updated', 'prefix_count',
'id', 'url', 'display', 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description',
'l2vpn_termination', 'tags', 'custom_fields', 'created', 'last_updated', 'prefix_count',
]
@@ -223,13 +225,13 @@ class AvailableVLANSerializer(serializers.Serializer):
group = NestedVLANGroupSerializer(read_only=True)
def to_representation(self, instance):
return OrderedDict([
('vid', instance),
('group', NestedVLANGroupSerializer(
return {
'vid': instance,
'group': NestedVLANGroupSerializer(
self.context['group'],
context={'request': self.context['request']}
).data),
])
).data,
}
class CreateAvailableVLANSerializer(NetBoxModelSerializer):
@@ -314,11 +316,11 @@ class AvailablePrefixSerializer(serializers.Serializer):
vrf = NestedVRFSerializer(self.context['vrf'], context={'request': self.context['request']}).data
else:
vrf = None
return OrderedDict([
('family', instance.version),
('prefix', str(instance)),
('vrf', vrf),
])
return {
'family': instance.version,
'prefix': str(instance),
'vrf': vrf,
}
#
@@ -361,7 +363,7 @@ class IPAddressSerializer(NetBoxModelSerializer):
)
assigned_object = serializers.SerializerMethodField(read_only=True)
nat_inside = NestedIPAddressSerializer(required=False, allow_null=True)
nat_outside = NestedIPAddressSerializer(required=False, read_only=True)
nat_outside = NestedIPAddressSerializer(many=True, read_only=True)
class Meta:
model = IPAddress
@@ -370,7 +372,6 @@ class IPAddressSerializer(NetBoxModelSerializer):
'assigned_object_id', 'assigned_object', 'nat_inside', 'nat_outside', 'dns_name', 'description', 'tags',
'custom_fields', 'created', 'last_updated',
]
read_only_fields = ['family', 'nat_outside']
@swagger_serializer_method(serializer_or_field=serializers.DictField)
def get_assigned_object(self, obj):
@@ -394,11 +395,11 @@ class AvailableIPSerializer(serializers.Serializer):
vrf = NestedVRFSerializer(self.context['vrf'], context={'request': self.context['request']}).data
else:
vrf = None
return OrderedDict([
('family', self.context['parent'].family),
('address', f"{instance}/{self.context['parent'].mask_length}"),
('vrf', vrf),
])
return {
'family': self.context['parent'].family,
'address': f"{instance}/{self.context['parent'].mask_length}",
'vrf': vrf,
}
#
@@ -435,3 +436,54 @@ class ServiceSerializer(NetBoxModelSerializer):
'id', 'url', 'display', 'device', 'virtual_machine', 'name', 'ports', 'protocol', 'ipaddresses',
'description', 'tags', 'custom_fields', 'created', 'last_updated',
]
#
# L2VPN
#
class L2VPNSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpn-detail')
type = ChoiceField(choices=L2VPNTypeChoices, required=False)
import_targets = SerializedPKRelatedField(
queryset=RouteTarget.objects.all(),
serializer=NestedRouteTargetSerializer,
required=False,
many=True
)
export_targets = SerializedPKRelatedField(
queryset=RouteTarget.objects.all(),
serializer=NestedRouteTargetSerializer,
required=False,
many=True
)
tenant = NestedTenantSerializer(required=False, allow_null=True)
class Meta:
model = L2VPN
fields = [
'id', 'url', 'display', 'identifier', 'name', 'slug', 'type', 'import_targets', 'export_targets',
'description', 'tenant', 'tags', 'custom_fields', 'created', 'last_updated'
]
class L2VPNTerminationSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:l2vpntermination-detail')
l2vpn = NestedL2VPNSerializer()
assigned_object_type = ContentTypeField(
queryset=ContentType.objects.all()
)
assigned_object = serializers.SerializerMethodField(read_only=True)
class Meta:
model = L2VPNTermination
fields = [
'id', 'url', 'display', 'l2vpn', 'assigned_object_type', 'assigned_object_id',
'assigned_object', 'tags', 'custom_fields', 'created', 'last_updated'
]
@swagger_serializer_method(serializer_or_field=serializers.DictField)
def get_assigned_object(self, instance):
serializer = get_serializer_for_model(instance.assigned_object, prefix=NESTED_SERIALIZER_PREFIX)
context = {'request': self.context['request']}
return serializer(instance.assigned_object, context=context).data

View File

@@ -1,7 +1,6 @@
from django.urls import path
from netbox.api import NetBoxRouter
from ipam.models import IPRange, Prefix
from netbox.api.routers import NetBoxRouter
from . import views
@@ -45,6 +44,10 @@ router.register('vlans', views.VLANViewSet)
router.register('service-templates', views.ServiceTemplateViewSet)
router.register('services', views.ServiceViewSet)
# L2VPN
router.register('l2vpns', views.L2VPNViewSet)
router.register('l2vpn-terminations', views.L2VPNTerminationViewSet)
app_name = 'ipam-api'
urlpatterns = [

View File

@@ -18,6 +18,7 @@ from netbox.config import get_config
from utilities.constants import ADVISORY_LOCK_KEYS
from utilities.utils import count_related
from . import serializers
from ipam.models import L2VPN, L2VPNTermination
class IPAMRootView(APIRootView):
@@ -157,6 +158,18 @@ class ServiceViewSet(NetBoxModelViewSet):
filterset_class = filtersets.ServiceFilterSet
class L2VPNViewSet(NetBoxModelViewSet):
queryset = L2VPN.objects.prefetch_related('import_targets', 'export_targets', 'tenant', 'tags')
serializer_class = serializers.L2VPNSerializer
filterset_class = filtersets.L2VPNFilterSet
class L2VPNTerminationViewSet(NetBoxModelViewSet):
queryset = L2VPNTermination.objects.prefetch_related('assigned_object')
serializer_class = serializers.L2VPNTerminationSerializer
filterset_class = filtersets.L2VPNTerminationFilterSet
#
# Views
#

View File

@@ -170,3 +170,52 @@ class ServiceProtocolChoices(ChoiceSet):
(PROTOCOL_UDP, 'UDP'),
(PROTOCOL_SCTP, 'SCTP'),
)
class L2VPNTypeChoices(ChoiceSet):
TYPE_VPLS = 'vpls'
TYPE_VPWS = 'vpws'
TYPE_EPL = 'epl'
TYPE_EVPL = 'evpl'
TYPE_EPLAN = 'ep-lan'
TYPE_EVPLAN = 'evp-lan'
TYPE_EPTREE = 'ep-tree'
TYPE_EVPTREE = 'evp-tree'
TYPE_VXLAN = 'vxlan'
TYPE_VXLAN_EVPN = 'vxlan-evpn'
TYPE_MPLS_EVPN = 'mpls-evpn'
TYPE_PBB_EVPN = 'pbb-evpn'
CHOICES = (
('VPLS', (
(TYPE_VPWS, 'VPWS'),
(TYPE_VPLS, 'VPLS'),
)),
('VXLAN', (
(TYPE_VXLAN, 'VXLAN'),
(TYPE_VXLAN_EVPN, 'VXLAN-EVPN'),
)),
('L2VPN E-VPN', (
(TYPE_MPLS_EVPN, 'MPLS EVPN'),
(TYPE_PBB_EVPN, 'PBB EVPN'),
)),
('E-Line', (
(TYPE_EPL, 'EPL'),
(TYPE_EVPL, 'EVPL'),
)),
('E-LAN', (
(TYPE_EPLAN, 'Ethernet Private LAN'),
(TYPE_EVPLAN, 'Ethernet Virtual Private LAN'),
)),
('E-Tree', (
(TYPE_EPTREE, 'Ethernet Private Tree'),
(TYPE_EVPTREE, 'Ethernet Virtual Private Tree'),
)),
)
P2P = (
TYPE_VPWS,
TYPE_EPL,
TYPE_EPLAN,
TYPE_EPTREE
)

View File

@@ -90,3 +90,9 @@ VLANGROUP_SCOPE_TYPES = (
# 16-bit port number
SERVICE_PORT_MIN = 1
SERVICE_PORT_MAX = 65535
L2VPN_ASSIGNMENT_MODELS = Q(
Q(app_label='dcim', model='interface') |
Q(app_label='ipam', model='vlan') |
Q(app_label='virtualization', model='vminterface')
)

View File

@@ -23,6 +23,8 @@ __all__ = (
'FHRPGroupFilterSet',
'IPAddressFilterSet',
'IPRangeFilterSet',
'L2VPNFilterSet',
'L2VPNTerminationFilterSet',
'PrefixFilterSet',
'RIRFilterSet',
'RoleFilterSet',
@@ -922,3 +924,169 @@ class ServiceFilterSet(NetBoxModelFilterSet):
return queryset
qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
return queryset.filter(qs_filter)
#
# L2VPN
#
class L2VPNFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
type = django_filters.MultipleChoiceFilter(
choices=L2VPNTypeChoices,
null_value=None
)
import_target_id = django_filters.ModelMultipleChoiceFilter(
field_name='import_targets',
queryset=RouteTarget.objects.all(),
label='Import target',
)
import_target = django_filters.ModelMultipleChoiceFilter(
field_name='import_targets__name',
queryset=RouteTarget.objects.all(),
to_field_name='name',
label='Import target (name)',
)
export_target_id = django_filters.ModelMultipleChoiceFilter(
field_name='export_targets',
queryset=RouteTarget.objects.all(),
label='Export target',
)
export_target = django_filters.ModelMultipleChoiceFilter(
field_name='export_targets__name',
queryset=RouteTarget.objects.all(),
to_field_name='name',
label='Export target (name)',
)
class Meta:
model = L2VPN
fields = ['id', 'identifier', 'name', 'type', 'description']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(identifier=value) | Q(name__icontains=value) | Q(description__icontains=value)
return queryset.filter(qs_filter)
class L2VPNTerminationFilterSet(NetBoxModelFilterSet):
l2vpn_id = django_filters.ModelMultipleChoiceFilter(
queryset=L2VPN.objects.all(),
label='L2VPN (ID)',
)
l2vpn = django_filters.ModelMultipleChoiceFilter(
field_name='l2vpn__slug',
queryset=L2VPN.objects.all(),
to_field_name='slug',
label='L2VPN (slug)',
)
region = MultiValueCharFilter(
method='filter_region',
field_name='slug',
label='Region (slug)',
)
region_id = MultiValueNumberFilter(
method='filter_region',
field_name='pk',
label='Region (ID)',
)
site = MultiValueCharFilter(
method='filter_site',
field_name='slug',
label='Site (slug)',
)
site_id = MultiValueNumberFilter(
method='filter_site',
field_name='pk',
label='Site (ID)',
)
device = django_filters.ModelMultipleChoiceFilter(
field_name='interface__device__name',
queryset=Device.objects.all(),
to_field_name='name',
label='Device (name)',
)
device_id = django_filters.ModelMultipleChoiceFilter(
field_name='interface__device',
queryset=Device.objects.all(),
label='Device (ID)',
)
virtual_machine = django_filters.ModelMultipleChoiceFilter(
field_name='vminterface__virtual_machine__name',
queryset=VirtualMachine.objects.all(),
to_field_name='name',
label='Virtual machine (name)',
)
virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
field_name='vminterface__virtual_machine',
queryset=VirtualMachine.objects.all(),
label='Virtual machine (ID)',
)
interface = django_filters.ModelMultipleChoiceFilter(
field_name='interface__name',
queryset=Interface.objects.all(),
to_field_name='name',
label='Interface (name)',
)
interface_id = django_filters.ModelMultipleChoiceFilter(
field_name='interface',
queryset=Interface.objects.all(),
label='Interface (ID)',
)
vminterface = django_filters.ModelMultipleChoiceFilter(
field_name='vminterface__name',
queryset=VMInterface.objects.all(),
to_field_name='name',
label='VM interface (name)',
)
vminterface_id = django_filters.ModelMultipleChoiceFilter(
field_name='vminterface',
queryset=VMInterface.objects.all(),
label='VM Interface (ID)',
)
vlan = django_filters.ModelMultipleChoiceFilter(
field_name='vlan__name',
queryset=VLAN.objects.all(),
to_field_name='name',
label='VLAN (name)',
)
vlan_vid = django_filters.NumberFilter(
field_name='vlan__vid',
label='VLAN number (1-4094)',
)
vlan_id = django_filters.ModelMultipleChoiceFilter(
field_name='vlan',
queryset=VLAN.objects.all(),
label='VLAN (ID)',
)
assigned_object_type = ContentTypeFilter()
class Meta:
model = L2VPNTermination
fields = ('id', 'assigned_object_type_id')
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(l2vpn__name__icontains=value)
return queryset.filter(qs_filter)
def filter_site(self, queryset, name, value):
qs = queryset.filter(
Q(
Q(**{'vlan__site__{}__in'.format(name): value}) |
Q(**{'interface__device__site__{}__in'.format(name): value}) |
Q(**{'vminterface__virtual_machine__site__{}__in'.format(name): value})
)
)
return qs
def filter_region(self, queryset, name, value):
qs = queryset.filter(
Q(
Q(**{'vlan__site__region__{}__in'.format(name): value}) |
Q(**{'interface__device__site__region__{}__in'.format(name): value}) |
Q(**{'vminterface__virtual_machine__site__region__{}__in'.format(name): value})
)
)
return qs

View File

@@ -8,7 +8,7 @@ from ipam.models import ASN
from netbox.forms import NetBoxModelBulkEditForm
from tenancy.models import Tenant
from utilities.forms import (
add_blank_choice, BulkEditNullBooleanSelect, DatePicker, DynamicModelChoiceField, NumericArrayField, StaticSelect,
add_blank_choice, BulkEditNullBooleanSelect, DynamicModelChoiceField, NumericArrayField, StaticSelect,
DynamicModelMultipleChoiceField,
)
@@ -18,6 +18,8 @@ __all__ = (
'FHRPGroupBulkEditForm',
'IPAddressBulkEditForm',
'IPRangeBulkEditForm',
'L2VPNBulkEditForm',
'L2VPNTerminationBulkEditForm',
'PrefixBulkEditForm',
'RIRBulkEditForm',
'RoleBulkEditForm',
@@ -440,3 +442,29 @@ class ServiceTemplateBulkEditForm(NetBoxModelBulkEditForm):
class ServiceBulkEditForm(ServiceTemplateBulkEditForm):
model = Service
class L2VPNBulkEditForm(NetBoxModelBulkEditForm):
type = forms.ChoiceField(
choices=add_blank_choice(L2VPNTypeChoices),
required=False,
widget=StaticSelect()
)
tenant = DynamicModelChoiceField(
queryset=Tenant.objects.all(),
required=False
)
description = forms.CharField(
max_length=100,
required=False
)
model = L2VPN
fieldsets = (
(None, ('type', 'description', 'tenant')),
)
nullable_fields = ('tenant', 'description',)
class L2VPNTerminationBulkEditForm(NetBoxModelBulkEditForm):
model = L2VPN

View File

@@ -1,5 +1,6 @@
from django import forms
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from dcim.models import Device, Interface, Site
from ipam.choices import *
@@ -16,6 +17,8 @@ __all__ = (
'FHRPGroupCSVForm',
'IPAddressCSVForm',
'IPRangeCSVForm',
'L2VPNCSVForm',
'L2VPNTerminationCSVForm',
'PrefixCSVForm',
'RIRCSVForm',
'RoleCSVForm',
@@ -425,3 +428,83 @@ class ServiceCSVForm(NetBoxModelCSVForm):
class Meta:
model = Service
fields = ('device', 'virtual_machine', 'name', 'protocol', 'ports', 'description')
class L2VPNCSVForm(NetBoxModelCSVForm):
tenant = CSVModelChoiceField(
queryset=Tenant.objects.all(),
required=False,
to_field_name='name',
)
type = CSVChoiceField(
choices=L2VPNTypeChoices,
help_text='L2VPN type'
)
class Meta:
model = L2VPN
fields = ('identifier', 'name', 'slug', 'type', 'description')
class L2VPNTerminationCSVForm(NetBoxModelCSVForm):
l2vpn = CSVModelChoiceField(
queryset=L2VPN.objects.all(),
required=True,
to_field_name='name',
label='L2VPN',
)
device = CSVModelChoiceField(
queryset=Device.objects.all(),
required=False,
to_field_name='name',
help_text='Parent device (for interface)'
)
virtual_machine = CSVModelChoiceField(
queryset=VirtualMachine.objects.all(),
required=False,
to_field_name='name',
help_text='Parent virtual machine (for interface)'
)
interface = CSVModelChoiceField(
queryset=Interface.objects.none(), # Can also refer to VMInterface
required=False,
to_field_name='name',
help_text='Assigned interface (device or VM)'
)
vlan = CSVModelChoiceField(
queryset=VLAN.objects.all(),
required=False,
to_field_name='name',
help_text='Assigned VLAN'
)
class Meta:
model = L2VPNTermination
fields = ('l2vpn', 'device', 'virtual_machine', 'interface', 'vlan')
def __init__(self, data=None, *args, **kwargs):
super().__init__(data, *args, **kwargs)
if data:
# Limit interface queryset by device or VM
if data.get('device'):
self.fields['interface'].queryset = Interface.objects.filter(
**{f"device__{self.fields['device'].to_field_name}": data['device']}
)
elif data.get('virtual_machine'):
self.fields['interface'].queryset = VMInterface.objects.filter(
**{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
)
def clean(self):
super().clean()
if self.cleaned_data.get('device') and self.cleaned_data.get('virtual_machine'):
raise ValidationError('Cannot import device and VM interface terminations simultaneously.')
if not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')):
raise ValidationError('Each termination must specify either an interface or a VLAN.')
if self.cleaned_data.get('interface') and self.cleaned_data.get('vlan'):
raise ValidationError('Cannot assign both an interface and a VLAN.')
self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')

View File

@@ -1,18 +1,19 @@
from django import forms
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from django.utils.translation import gettext as _
from dcim.models import Location, Rack, Region, Site, SiteGroup, Device
from virtualization.models import VirtualMachine
from ipam.choices import *
from ipam.constants import *
from ipam.models import *
from ipam.models import ASN
from netbox.forms import NetBoxModelFilterSetForm
from tenancy.forms import TenancyFilterForm
from utilities.forms import (
add_blank_choice, DynamicModelChoiceField, DynamicModelMultipleChoiceField, MultipleChoiceField, StaticSelect,
TagFilterField, BOOLEAN_WITH_BLANK_CHOICES,
add_blank_choice, ContentTypeMultipleChoiceField, DynamicModelChoiceField, DynamicModelMultipleChoiceField,
MultipleChoiceField, StaticSelect, TagFilterField, BOOLEAN_WITH_BLANK_CHOICES, APISelectMultiple,
)
from virtualization.models import VirtualMachine
__all__ = (
'AggregateFilterForm',
@@ -20,6 +21,8 @@ __all__ = (
'FHRPGroupFilterForm',
'IPAddressFilterForm',
'IPRangeFilterForm',
'L2VPNFilterForm',
'L2VPNTerminationFilterForm',
'PrefixFilterForm',
'RIRFilterForm',
'RoleFilterForm',
@@ -475,3 +478,88 @@ class ServiceTemplateFilterForm(NetBoxModelFilterSetForm):
class ServiceFilterForm(ServiceTemplateFilterForm):
model = Service
class L2VPNFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
model = L2VPN
fieldsets = (
(None, ('q', 'tag')),
('Attributes', ('type', 'import_target_id', 'export_target_id')),
('Tenant', ('tenant_group_id', 'tenant_id')),
)
type = forms.ChoiceField(
choices=add_blank_choice(L2VPNTypeChoices),
required=False,
widget=StaticSelect()
)
import_target_id = DynamicModelMultipleChoiceField(
queryset=RouteTarget.objects.all(),
required=False,
label=_('Import targets')
)
export_target_id = DynamicModelMultipleChoiceField(
queryset=RouteTarget.objects.all(),
required=False,
label=_('Export targets')
)
tag = TagFilterField(model)
class L2VPNTerminationFilterForm(NetBoxModelFilterSetForm):
model = L2VPNTermination
fieldsets = (
(None, ('l2vpn_id', )),
('Assigned Object', ('assigned_object_type_id', 'region_id', 'site_id', 'device_id', 'virtual_machine_id', 'vlan_id')),
)
l2vpn_id = DynamicModelChoiceField(
queryset=L2VPN.objects.all(),
required=False,
label='L2VPN'
)
assigned_object_type_id = ContentTypeMultipleChoiceField(
queryset=ContentType.objects.filter(L2VPN_ASSIGNMENT_MODELS),
required=False,
label=_('Assigned Object Type'),
limit_choices_to=L2VPN_ASSIGNMENT_MODELS
)
region_id = DynamicModelMultipleChoiceField(
queryset=Region.objects.all(),
required=False,
label=_('Region')
)
site_id = DynamicModelMultipleChoiceField(
queryset=Site.objects.all(),
required=False,
null_option='None',
query_params={
'region_id': '$region_id'
},
label=_('Site')
)
device_id = DynamicModelMultipleChoiceField(
queryset=Device.objects.all(),
required=False,
null_option='None',
query_params={
'site_id': '$site_id'
},
label=_('Device')
)
vlan_id = DynamicModelMultipleChoiceField(
queryset=VLAN.objects.all(),
required=False,
null_option='None',
query_params={
'site_id': '$site_id'
},
label=_('VLAN')
)
virtual_machine_id = DynamicModelMultipleChoiceField(
queryset=VirtualMachine.objects.all(),
required=False,
null_option='None',
query_params={
'site_id': '$site_id'
},
label=_('Virtual Machine')
)

View File

@@ -1,5 +1,6 @@
from django import forms
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from dcim.models import Device, Interface, Location, Rack, Region, Site, SiteGroup
from extras.models import Tag
@@ -7,9 +8,9 @@ from ipam.choices import *
from ipam.constants import *
from ipam.formfields import IPNetworkFormField
from ipam.models import *
from ipam.models import ASN
from netbox.forms import NetBoxModelForm
from tenancy.forms import TenancyForm
from tenancy.models import Tenant
from utilities.exceptions import PermissionsViolation
from utilities.forms import (
add_blank_choice, BootstrapMixin, ContentTypeChoiceField, DatePicker, DynamicModelChoiceField,
@@ -26,6 +27,8 @@ __all__ = (
'IPAddressBulkAddForm',
'IPAddressForm',
'IPRangeForm',
'L2VPNForm',
'L2VPNTerminationForm',
'PrefixForm',
'RIRForm',
'RoleForm',
@@ -861,3 +864,118 @@ class ServiceCreateForm(ServiceForm):
self.cleaned_data['description'] = service_template.description
elif not all(self.cleaned_data[f] for f in ('name', 'protocol', 'ports')):
raise forms.ValidationError("Must specify name, protocol, and port(s) if not using a service template.")
#
# L2VPN
#
class L2VPNForm(TenancyForm, NetBoxModelForm):
slug = SlugField()
import_targets = DynamicModelMultipleChoiceField(
queryset=RouteTarget.objects.all(),
required=False
)
export_targets = DynamicModelMultipleChoiceField(
queryset=RouteTarget.objects.all(),
required=False
)
fieldsets = (
('L2VPN', ('name', 'slug', 'type', 'identifier', 'description', 'tags')),
('Route Targets', ('import_targets', 'export_targets')),
('Tenancy', ('tenant_group', 'tenant')),
)
class Meta:
model = L2VPN
fields = (
'name', 'slug', 'type', 'identifier', 'description', 'import_targets', 'export_targets', 'tenant', 'tags'
)
widgets = {
'type': StaticSelect(),
}
class L2VPNTerminationForm(NetBoxModelForm):
l2vpn = DynamicModelChoiceField(
queryset=L2VPN.objects.all(),
required=True,
query_params={},
label='L2VPN',
fetch_trigger='open'
)
device_vlan = DynamicModelChoiceField(
queryset=Device.objects.all(),
label="Available on Device",
required=False,
query_params={}
)
vlan = DynamicModelChoiceField(
queryset=VLAN.objects.all(),
required=False,
query_params={
'available_on_device': '$device_vlan'
},
label='VLAN'
)
device = DynamicModelChoiceField(
queryset=Device.objects.all(),
required=False,
query_params={}
)
interface = DynamicModelChoiceField(
queryset=Interface.objects.all(),
required=False,
query_params={
'device_id': '$device'
}
)
virtual_machine = DynamicModelChoiceField(
queryset=VirtualMachine.objects.all(),
required=False,
query_params={}
)
vminterface = DynamicModelChoiceField(
queryset=VMInterface.objects.all(),
required=False,
query_params={
'virtual_machine_id': '$virtual_machine'
},
label='Interface'
)
class Meta:
model = L2VPNTermination
fields = ('l2vpn', )
def __init__(self, *args, **kwargs):
instance = kwargs.get('instance')
initial = kwargs.get('initial', {}).copy()
if instance:
if type(instance.assigned_object) is Interface:
initial['device'] = instance.assigned_object.parent
initial['interface'] = instance.assigned_object
elif type(instance.assigned_object) is VLAN:
initial['vlan'] = instance.assigned_object
elif type(instance.assigned_object) is VMInterface:
initial['vminterface'] = instance.assigned_object
kwargs['initial'] = initial
super().__init__(*args, **kwargs)
def clean(self):
super().clean()
interface = self.cleaned_data.get('interface')
vminterface = self.cleaned_data.get('vminterface')
vlan = self.cleaned_data.get('vlan')
if not (interface or vminterface or vlan):
raise ValidationError('A termination must specify an interface or VLAN.')
if len([x for x in (interface, vminterface, vlan) if x]) > 1:
raise ValidationError('A termination can only have one terminating object (an interface or VLAN).')
self.instance.assigned_object = interface or vminterface or vlan

View File

@@ -17,6 +17,12 @@ class IPAMQuery(graphene.ObjectType):
ip_range = ObjectField(IPRangeType)
ip_range_list = ObjectListField(IPRangeType)
l2vpn = ObjectField(L2VPNType)
l2vpn_list = ObjectListField(L2VPNType)
l2vpn_termination = ObjectField(L2VPNTerminationType)
l2vpn_termination_list = ObjectListField(L2VPNTerminationType)
prefix = ObjectField(PrefixType)
prefix_list = ObjectListField(PrefixType)

View File

@@ -11,6 +11,8 @@ __all__ = (
'FHRPGroupAssignmentType',
'IPAddressType',
'IPRangeType',
'L2VPNType',
'L2VPNTerminationType',
'PrefixType',
'RIRType',
'RoleType',
@@ -151,3 +153,17 @@ class VRFType(NetBoxObjectType):
model = models.VRF
fields = '__all__'
filterset_class = filtersets.VRFFilterSet
class L2VPNType(NetBoxObjectType):
class Meta:
model = models.L2VPN
fields = '__all__'
filtersets_class = filtersets.L2VPNFilterSet
class L2VPNTerminationType(NetBoxObjectType):
class Meta:
model = models.L2VPNTermination
fields = '__all__'
filtersets_class = filtersets.L2VPNTerminationFilterSet

View File

@@ -0,0 +1,17 @@
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('ipam', '0057_created_datetimefield'),
]
operations = [
migrations.AlterField(
model_name='ipaddress',
name='nat_inside',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='nat_outside', to='ipam.ipaddress'),
),
]

View File

@@ -0,0 +1,60 @@
import django.core.serializers.json
from django.db import migrations, models
import django.db.models.deletion
import taggit.managers
class Migration(migrations.Migration):
dependencies = [
('extras', '0075_configcontext_locations'),
('contenttypes', '0002_remove_content_type_name'),
('tenancy', '0007_contact_link'),
('ipam', '0058_ipaddress_nat_inside_nonunique'),
]
operations = [
migrations.CreateModel(
name='L2VPN',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False)),
('created', models.DateTimeField(auto_now_add=True, null=True)),
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder)),
('name', models.CharField(max_length=100, unique=True)),
('slug', models.SlugField()),
('type', models.CharField(max_length=50)),
('identifier', models.BigIntegerField(blank=True, null=True)),
('description', models.CharField(blank=True, max_length=200)),
('export_targets', models.ManyToManyField(blank=True, related_name='exporting_l2vpns', to='ipam.routetarget')),
('import_targets', models.ManyToManyField(blank=True, related_name='importing_l2vpns', to='ipam.routetarget')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),
('tenant', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='l2vpns', to='tenancy.tenant')),
],
options={
'verbose_name': 'L2VPN',
'ordering': ('name', 'identifier'),
},
),
migrations.CreateModel(
name='L2VPNTermination',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False)),
('created', models.DateTimeField(auto_now_add=True, null=True)),
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder)),
('assigned_object_id', models.PositiveBigIntegerField()),
('assigned_object_type', models.ForeignKey(limit_choices_to=models.Q(models.Q(models.Q(('app_label', 'dcim'), ('model', 'interface')), models.Q(('app_label', 'ipam'), ('model', 'vlan')), models.Q(('app_label', 'virtualization'), ('model', 'vminterface')), _connector='OR')), on_delete=django.db.models.deletion.PROTECT, related_name='+', to='contenttypes.contenttype')),
('l2vpn', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='terminations', to='ipam.l2vpn')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),
],
options={
'verbose_name': 'L2VPN termination',
'ordering': ('l2vpn',),
},
),
migrations.AddConstraint(
model_name='l2vpntermination',
constraint=models.UniqueConstraint(fields=('assigned_object_type', 'assigned_object_id'), name='ipam_l2vpntermination_assigned_object'),
),
]

View File

@@ -2,6 +2,7 @@
from .fhrp import *
from .vrfs import *
from .ip import *
from .l2vpn import *
from .services import *
from .vlans import *
@@ -12,6 +13,8 @@ __all__ = (
'IPRange',
'FHRPGroup',
'FHRPGroupAssignment',
'L2VPN',
'L2VPNTermination',
'Prefix',
'RIR',
'Role',

View File

@@ -48,7 +48,7 @@ class FHRPGroup(NetBoxModel):
related_query_name='fhrpgroup'
)
clone_fields = ('protocol', 'auth_type', 'auth_key')
clone_fields = ('protocol', 'auth_type', 'auth_key', 'description')
class Meta:
ordering = ['protocol', 'group_id', 'pk']

View File

@@ -179,9 +179,9 @@ class Aggregate(GetAvailablePrefixesMixin, NetBoxModel):
blank=True
)
clone_fields = [
clone_fields = (
'rir', 'tenant', 'date_added', 'description',
]
)
class Meta:
ordering = ('prefix', 'pk') # prefix may be non-unique
@@ -368,9 +368,9 @@ class Prefix(GetAvailablePrefixesMixin, NetBoxModel):
objects = PrefixQuerySet.as_manager()
clone_fields = [
clone_fields = (
'site', 'vrf', 'tenant', 'vlan', 'status', 'role', 'is_pool', 'mark_utilized', 'description',
]
)
class Meta:
ordering = (F('vrf').asc(nulls_first=True), 'prefix', 'pk') # (vrf, prefix) may be non-unique
@@ -616,9 +616,9 @@ class IPRange(NetBoxModel):
blank=True
)
clone_fields = [
clone_fields = (
'vrf', 'tenant', 'status', 'role', 'description',
]
)
class Meta:
ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique
@@ -821,7 +821,7 @@ class IPAddress(NetBoxModel):
ct_field='assigned_object_type',
fk_field='assigned_object_id'
)
nat_inside = models.OneToOneField(
nat_inside = models.ForeignKey(
to='self',
on_delete=models.SET_NULL,
related_name='nat_outside',
@@ -844,9 +844,9 @@ class IPAddress(NetBoxModel):
objects = IPAddressManager()
clone_fields = [
'vrf', 'tenant', 'status', 'role', 'description',
]
clone_fields = (
'vrf', 'tenant', 'status', 'role', 'dns_name', 'description',
)
class Meta:
ordering = ('address', 'pk') # address may be non-unique
@@ -865,6 +865,25 @@ class IPAddress(NetBoxModel):
address__net_host=str(self.address.ip)
).exclude(pk=self.pk)
def get_next_available_ip(self):
"""
Return the next available IP address within this IP's network (if any)
"""
if self.address and self.address.broadcast:
start_ip = self.address.ip + 1
end_ip = self.address.broadcast - 1
if start_ip <= end_ip:
available_ips = netaddr.IPSet(netaddr.IPRange(start_ip, end_ip))
available_ips -= netaddr.IPSet([
address.ip for address in IPAddress.objects.filter(
vrf=self.vrf,
address__gt=self.address,
address__net_contained_or_equal=self.address.cidr
).values_list('address', flat=True)
])
if available_ips:
return next(iter(available_ips))
def clean(self):
super().clean()
@@ -915,6 +934,15 @@ class IPAddress(NetBoxModel):
super().save(*args, **kwargs)
def clone(self):
attrs = super().clone()
# Populate the address field with the next available IP (if any)
if next_available_ip := self.get_next_available_ip():
attrs['address'] = f'{next_available_ip}/{self.address.prefixlen}'
return attrs
def to_objectchange(self, action):
objectchange = super().to_objectchange(action)
objectchange.related_object = self.assigned_object

141
netbox/ipam/models/l2vpn.py Normal file
View File

@@ -0,0 +1,141 @@
from django.contrib.contenttypes.fields import GenericRelation, GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from ipam.choices import L2VPNTypeChoices
from ipam.constants import L2VPN_ASSIGNMENT_MODELS
from netbox.models import NetBoxModel
__all__ = (
'L2VPN',
'L2VPNTermination',
)
class L2VPN(NetBoxModel):
name = models.CharField(
max_length=100,
unique=True
)
slug = models.SlugField()
type = models.CharField(
max_length=50,
choices=L2VPNTypeChoices
)
identifier = models.BigIntegerField(
null=True,
blank=True
)
import_targets = models.ManyToManyField(
to='ipam.RouteTarget',
related_name='importing_l2vpns',
blank=True,
)
export_targets = models.ManyToManyField(
to='ipam.RouteTarget',
related_name='exporting_l2vpns',
blank=True
)
description = models.CharField(
max_length=200,
blank=True
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='l2vpns',
blank=True,
null=True
)
contacts = GenericRelation(
to='tenancy.ContactAssignment'
)
clone_fields = ('type',)
class Meta:
ordering = ('name', 'identifier')
verbose_name = 'L2VPN'
def __str__(self):
if self.identifier:
return f'{self.name} ({self.identifier})'
return f'{self.name}'
def get_absolute_url(self):
return reverse('ipam:l2vpn', args=[self.pk])
class L2VPNTermination(NetBoxModel):
l2vpn = models.ForeignKey(
to='ipam.L2VPN',
on_delete=models.CASCADE,
related_name='terminations'
)
assigned_object_type = models.ForeignKey(
to=ContentType,
limit_choices_to=L2VPN_ASSIGNMENT_MODELS,
on_delete=models.PROTECT,
related_name='+'
)
assigned_object_id = models.PositiveBigIntegerField()
assigned_object = GenericForeignKey(
ct_field='assigned_object_type',
fk_field='assigned_object_id'
)
clone_fields = ('l2vpn',)
class Meta:
ordering = ('l2vpn',)
verbose_name = 'L2VPN termination'
constraints = (
models.UniqueConstraint(
fields=('assigned_object_type', 'assigned_object_id'),
name='ipam_l2vpntermination_assigned_object'
),
)
def __str__(self):
if self.pk is not None:
return f'{self.assigned_object} <> {self.l2vpn}'
return super().__str__()
def get_absolute_url(self):
return reverse('ipam:l2vpntermination', args=[self.pk])
def clean(self):
# Only check is assigned_object is set. Required otherwise we have an Integrity Error thrown.
if self.assigned_object:
obj_id = self.assigned_object.pk
obj_type = ContentType.objects.get_for_model(self.assigned_object)
if L2VPNTermination.objects.filter(assigned_object_id=obj_id, assigned_object_type=obj_type).\
exclude(pk=self.pk).count() > 0:
raise ValidationError(f'L2VPN Termination already assigned ({self.assigned_object})')
# Only check if L2VPN is set and is of type P2P
if hasattr(self, 'l2vpn') and self.l2vpn.type in L2VPNTypeChoices.P2P:
terminations_count = L2VPNTermination.objects.filter(l2vpn=self.l2vpn).exclude(pk=self.pk).count()
if terminations_count >= 2:
l2vpn_type = self.l2vpn.get_type_display()
raise ValidationError(
f'{l2vpn_type} L2VPNs cannot have more than two terminations; found {terminations_count} already '
f'defined.'
)
@property
def assigned_object_parent(self):
obj_type = ContentType.objects.get_for_model(self.assigned_object)
if obj_type.model == 'vminterface':
return self.assigned_object.virtual_machine
elif obj_type.model == 'interface':
return self.assigned_object.device
elif obj_type.model == 'vminterface':
return self.assigned_object.virtual_machine
return None
@property
def assigned_object_site(self):
return self.assigned_object_parent.site

View File

@@ -1,4 +1,4 @@
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
@@ -8,6 +8,7 @@ from django.urls import reverse
from dcim.models import Interface
from ipam.choices import *
from ipam.constants import *
from ipam.models import L2VPNTermination
from ipam.querysets import VLANQuerySet
from netbox.models import OrganizationalModel, NetBoxModel
from virtualization.models import VMInterface
@@ -173,6 +174,13 @@ class VLAN(NetBoxModel):
blank=True
)
l2vpn_terminations = GenericRelation(
to='ipam.L2VPNTermination',
content_type_field='assigned_object_type',
object_id_field='assigned_object_id',
related_query_name='vlan'
)
objects = VLANQuerySet.as_manager()
clone_fields = [
@@ -227,3 +235,7 @@ class VLAN(NetBoxModel):
Q(untagged_vlan_id=self.pk) |
Q(tagged_vlans=self.pk)
).distinct()
@property
def l2vpn_termination(self):
return self.l2vpn_terminations.first()

View File

@@ -55,9 +55,9 @@ class VRF(NetBoxModel):
blank=True
)
clone_fields = [
clone_fields = (
'tenant', 'enforce_unique', 'description',
]
)
class Meta:
ordering = ('name', 'rd', 'pk') # (name, rd) may be non-unique

View File

@@ -1,5 +1,6 @@
from .fhrp import *
from .ip import *
from .l2vpn import *
from .services import *
from .vlans import *
from .vrfs import *

View File

@@ -32,15 +32,6 @@ PREFIX_LINK = """
<a href="{% if record.pk %}{% url 'ipam:prefix' pk=record.pk %}{% else %}{% url 'ipam:prefix_add' %}?prefix={{ record }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.site %}&site={{ object.site.pk }}{% endif %}{% if object.tenant %}&tenant_group={{ object.tenant.group.pk }}&tenant={{ object.tenant.pk }}{% endif %}{% endif %}">{{ record.prefix }}</a>
"""
PREFIXFLAT_LINK = """
{% load helpers %}
{% if record.pk %}
<a href="{% url 'ipam:prefix' pk=record.pk %}">{{ record.prefix }}</a>
{% else %}
{{ record.prefix }}
{% endif %}
"""
IPADDRESS_LINK = """
{% if record.pk %}
<a href="{{ record.get_absolute_url }}">{{ record.address }}</a>
@@ -229,9 +220,9 @@ class PrefixTable(TenancyColumnsMixin, NetBoxTable):
export_raw=True,
attrs={'td': {'class': 'text-nowrap'}}
)
prefix_flat = tables.TemplateColumn(
template_code=PREFIXFLAT_LINK,
attrs={'td': {'class': 'text-nowrap'}},
prefix_flat = tables.Column(
accessor=Accessor('prefix'),
linkify=True,
verbose_name='Prefix (Flat)',
)
depth = tables.Column(

View File

@@ -0,0 +1,75 @@
import django_tables2 as tables
from ipam.models import L2VPN, L2VPNTermination
from netbox.tables import NetBoxTable, columns
from tenancy.tables import TenancyColumnsMixin
__all__ = (
'L2VPNTable',
'L2VPNTerminationTable',
)
L2VPN_TARGETS = """
{% for rt in value.all %}
<a href="{{ rt.get_absolute_url }}">{{ rt }}</a>{% if not forloop.last %}<br />{% endif %}
{% endfor %}
"""
class L2VPNTable(TenancyColumnsMixin, NetBoxTable):
pk = columns.ToggleColumn()
name = tables.Column(
linkify=True
)
import_targets = columns.TemplateColumn(
template_code=L2VPN_TARGETS,
orderable=False
)
export_targets = columns.TemplateColumn(
template_code=L2VPN_TARGETS,
orderable=False
)
class Meta(NetBoxTable.Meta):
model = L2VPN
fields = (
'pk', 'name', 'slug', 'type', 'description', 'import_targets', 'export_targets', 'tenant', 'tenant_group',
'actions',
)
default_columns = ('pk', 'name', 'type', 'description', 'actions')
class L2VPNTerminationTable(NetBoxTable):
pk = columns.ToggleColumn()
l2vpn = tables.Column(
verbose_name='L2VPN',
linkify=True
)
assigned_object_type = columns.ContentTypeColumn(
verbose_name='Object Type'
)
assigned_object = tables.Column(
linkify=True,
orderable=False,
verbose_name='Object'
)
assigned_object_parent = tables.Column(
linkify=True,
orderable=False,
verbose_name='Object Parent'
)
assigned_object_site = tables.Column(
linkify=True,
orderable=False,
verbose_name='Object Site'
)
class Meta(NetBoxTable.Meta):
model = L2VPNTermination
fields = (
'pk', 'l2vpn', 'assigned_object_type', 'assigned_object', 'assigned_object_parent', 'assigned_object_site',
'actions',
)
default_columns = (
'pk', 'l2vpn', 'assigned_object_type', 'assigned_object_parent', 'assigned_object', 'actions',
)

View File

@@ -914,3 +914,96 @@ class ServiceTest(APIViewTestCases.APIViewTestCase):
'ports': [6],
},
]
class L2VPNTest(APIViewTestCases.APIViewTestCase):
model = L2VPN
brief_fields = ['display', 'id', 'identifier', 'name', 'slug', 'type', 'url']
create_data = [
{
'name': 'L2VPN 4',
'slug': 'l2vpn-4',
'type': 'vxlan',
'identifier': 33343344
},
{
'name': 'L2VPN 5',
'slug': 'l2vpn-5',
'type': 'vxlan',
'identifier': 33343345
},
{
'name': 'L2VPN 6',
'slug': 'l2vpn-6',
'type': 'vpws',
'identifier': 33343346
},
]
bulk_update_data = {
'description': 'New description',
}
@classmethod
def setUpTestData(cls):
l2vpns = (
L2VPN(name='L2VPN 1', slug='l2vpn-1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', slug='l2vpn-2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', slug='l2vpn-3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
class L2VPNTerminationTest(APIViewTestCases.APIViewTestCase):
model = L2VPNTermination
brief_fields = ['display', 'id', 'l2vpn', 'url']
@classmethod
def setUpTestData(cls):
vlans = (
VLAN(name='VLAN 1', vid=651),
VLAN(name='VLAN 2', vid=652),
VLAN(name='VLAN 3', vid=653),
VLAN(name='VLAN 4', vid=654),
VLAN(name='VLAN 5', vid=655),
VLAN(name='VLAN 6', vid=656),
VLAN(name='VLAN 7', vid=657)
)
VLAN.objects.bulk_create(vlans)
l2vpns = (
L2VPN(name='L2VPN 1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
l2vpnterminations = (
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[2])
)
L2VPNTermination.objects.bulk_create(l2vpnterminations)
cls.create_data = [
{
'l2vpn': l2vpns[0].pk,
'assigned_object_type': 'ipam.vlan',
'assigned_object_id': vlans[3].pk,
},
{
'l2vpn': l2vpns[0].pk,
'assigned_object_type': 'ipam.vlan',
'assigned_object_id': vlans[4].pk,
},
{
'l2vpn': l2vpns[0].pk,
'assigned_object_type': 'ipam.vlan',
'assigned_object_id': vlans[5].pk,
},
]
cls.bulk_update_data = {
'l2vpn': l2vpns[2].pk
}

View File

@@ -1,6 +1,8 @@
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from netaddr import IPNetwork
from dcim.choices import InterfaceTypeChoices
from dcim.models import Device, DeviceRole, DeviceType, Interface, Location, Manufacturer, Rack, Region, Site, SiteGroup
from ipam.choices import *
from ipam.filtersets import *
@@ -1463,3 +1465,159 @@ class ServiceTestCase(TestCase, ChangeLoggedFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'virtual_machine': [vms[0].name, vms[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class L2VPNTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = L2VPN.objects.all()
filterset = L2VPNFilterSet
@classmethod
def setUpTestData(cls):
route_targets = (
RouteTarget(name='1:1'),
RouteTarget(name='1:2'),
RouteTarget(name='1:3'),
RouteTarget(name='2:1'),
RouteTarget(name='2:2'),
RouteTarget(name='2:3'),
)
RouteTarget.objects.bulk_create(route_targets)
l2vpns = (
L2VPN(name='L2VPN 1', type=L2VPNTypeChoices.TYPE_VXLAN, identifier=65001),
L2VPN(name='L2VPN 2', type=L2VPNTypeChoices.TYPE_VPWS, identifier=65002),
L2VPN(name='L2VPN 3', type=L2VPNTypeChoices.TYPE_VPLS),
)
L2VPN.objects.bulk_create(l2vpns)
l2vpns[0].import_targets.add(route_targets[0])
l2vpns[1].import_targets.add(route_targets[1])
l2vpns[2].import_targets.add(route_targets[2])
l2vpns[0].export_targets.add(route_targets[3])
l2vpns[1].export_targets.add(route_targets[4])
l2vpns[2].export_targets.add(route_targets[5])
def test_name(self):
params = {'name': ['L2VPN 1', 'L2VPN 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_identifier(self):
params = {'identifier': ['65001', '65002']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_type(self):
params = {'type': [L2VPNTypeChoices.TYPE_VXLAN, L2VPNTypeChoices.TYPE_VPWS]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_import_targets(self):
route_targets = RouteTarget.objects.filter(name__in=['1:1', '1:2'])
params = {'import_target_id': [route_targets[0].pk, route_targets[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'import_target': [route_targets[0].name, route_targets[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_export_targets(self):
route_targets = RouteTarget.objects.filter(name__in=['2:1', '2:2'])
params = {'export_target_id': [route_targets[0].pk, route_targets[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'export_target': [route_targets[0].name, route_targets[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class L2VPNTerminationTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = L2VPNTermination.objects.all()
filterset = L2VPNTerminationFilterSet
@classmethod
def setUpTestData(cls):
device = create_test_device('Device 1')
interfaces = (
Interface(name='Interface 1', device=device, type=InterfaceTypeChoices.TYPE_1GE_FIXED),
Interface(name='Interface 2', device=device, type=InterfaceTypeChoices.TYPE_1GE_FIXED),
Interface(name='Interface 3', device=device, type=InterfaceTypeChoices.TYPE_1GE_FIXED),
)
Interface.objects.bulk_create(interfaces)
vm = create_test_virtualmachine('Virtual Machine 1')
vminterfaces = (
VMInterface(name='Interface 1', virtual_machine=vm),
VMInterface(name='Interface 2', virtual_machine=vm),
VMInterface(name='Interface 3', virtual_machine=vm),
)
VMInterface.objects.bulk_create(vminterfaces)
vlans = (
VLAN(name='VLAN 1', vid=101),
VLAN(name='VLAN 2', vid=102),
VLAN(name='VLAN 3', vid=103),
)
VLAN.objects.bulk_create(vlans)
l2vpns = (
L2VPN(name='L2VPN 1', slug='l2vpn-1', type='vxlan', identifier=65001),
L2VPN(name='L2VPN 2', slug='l2vpn-2', type='vpws', identifier=65002),
L2VPN(name='L2VPN 3', slug='l2vpn-3', type='vpls'), # No RD,
)
L2VPN.objects.bulk_create(l2vpns)
l2vpnterminations = (
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpns[1], assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpns[2], assigned_object=vlans[2]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=interfaces[0]),
L2VPNTermination(l2vpn=l2vpns[1], assigned_object=interfaces[1]),
L2VPNTermination(l2vpn=l2vpns[2], assigned_object=interfaces[2]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vminterfaces[0]),
L2VPNTermination(l2vpn=l2vpns[1], assigned_object=vminterfaces[1]),
L2VPNTermination(l2vpn=l2vpns[2], assigned_object=vminterfaces[2]),
)
L2VPNTermination.objects.bulk_create(l2vpnterminations)
def test_l2vpn(self):
l2vpns = L2VPN.objects.all()[:2]
params = {'l2vpn_id': [l2vpns[0].pk, l2vpns[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
params = {'l2vpn': [l2vpns[0].slug, l2vpns[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
def test_content_type(self):
params = {'assigned_object_type_id': ContentType.objects.get(model='vlan').pk}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_interface(self):
interfaces = Interface.objects.all()[:2]
params = {'interface_id': [interfaces[0].pk, interfaces[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_vminterface(self):
vminterfaces = VMInterface.objects.all()[:2]
params = {'vminterface_id': [vminterfaces[0].pk, vminterfaces[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_vlan(self):
vlans = VLAN.objects.all()[:2]
params = {'vlan_id': [vlans[0].pk, vlans[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'vlan': ['VLAN 1', 'VLAN 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
site = Site.objects.all().first()
params = {'site_id': [site.pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'site': ['site-1']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_device(self):
device = Device.objects.all().first()
params = {'device_id': [device.pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'device': ['Device 1']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_virtual_machine(self):
virtual_machine = VirtualMachine.objects.all().first()
params = {'virtual_machine_id': [virtual_machine.pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'virtual_machine': ['Virtual Machine 1']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)

View File

@@ -2,8 +2,9 @@ from netaddr import IPNetwork, IPSet
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
from dcim.models import Interface, Device, DeviceRole, DeviceType, Manufacturer, Site
from ipam.choices import IPAddressRoleChoices, PrefixStatusChoices
from ipam.models import Aggregate, IPAddress, IPRange, Prefix, RIR, VLAN, VLANGroup, VRF
from ipam.models import Aggregate, IPAddress, IPRange, Prefix, RIR, VLAN, VLANGroup, VRF, L2VPN, L2VPNTermination
class TestAggregate(TestCase):
@@ -538,3 +539,76 @@ class TestVLANGroup(TestCase):
VLAN.objects.create(name='VLAN 104', vid=104, group=vlangroup)
self.assertEqual(vlangroup.get_next_available_vid(), 105)
class TestL2VPNTermination(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1')
device_type = DeviceType.objects.create(model='Device Type 1', manufacturer=manufacturer)
device_role = DeviceRole.objects.create(name='Switch')
device = Device.objects.create(
name='Device 1',
site=site,
device_type=device_type,
device_role=device_role,
status='active'
)
interfaces = (
Interface(name='Interface 1', device=device, type='1000baset'),
Interface(name='Interface 2', device=device, type='1000baset'),
Interface(name='Interface 3', device=device, type='1000baset'),
Interface(name='Interface 4', device=device, type='1000baset'),
Interface(name='Interface 5', device=device, type='1000baset'),
)
Interface.objects.bulk_create(interfaces)
vlans = (
VLAN(name='VLAN 1', vid=651),
VLAN(name='VLAN 2', vid=652),
VLAN(name='VLAN 3', vid=653),
VLAN(name='VLAN 4', vid=654),
VLAN(name='VLAN 5', vid=655),
VLAN(name='VLAN 6', vid=656),
VLAN(name='VLAN 7', vid=657)
)
VLAN.objects.bulk_create(vlans)
l2vpns = (
L2VPN(name='L2VPN 1', type='vxlan', identifier=650001),
L2VPN(name='L2VPN 2', type='vpws', identifier=650002),
L2VPN(name='L2VPN 3', type='vpls'), # No RD
)
L2VPN.objects.bulk_create(l2vpns)
l2vpnterminations = (
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpns[0], assigned_object=vlans[2])
)
L2VPNTermination.objects.bulk_create(l2vpnterminations)
def test_duplicate_interface_terminations(self):
device = Device.objects.first()
interface = Interface.objects.filter(device=device).first()
l2vpn = L2VPN.objects.first()
L2VPNTermination.objects.create(l2vpn=l2vpn, assigned_object=interface)
duplicate = L2VPNTermination(l2vpn=l2vpn, assigned_object=interface)
self.assertRaises(ValidationError, duplicate.clean)
def test_duplicate_vlan_terminations(self):
vlan = Interface.objects.first()
l2vpn = L2VPN.objects.first()
L2VPNTermination.objects.create(l2vpn=l2vpn, assigned_object=vlan)
duplicate = L2VPNTermination(l2vpn=l2vpn, assigned_object=vlan)
self.assertRaises(ValidationError, duplicate.clean)

View File

@@ -4,11 +4,11 @@ from django.test import override_settings
from django.urls import reverse
from netaddr import IPNetwork
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site, Interface
from ipam.choices import *
from ipam.models import *
from tenancy.models import Tenant
from utilities.testing import ViewTestCases, create_tags
from utilities.testing import ViewTestCases, create_test_device, create_tags
class ASNTestCase(ViewTestCases.PrimaryObjectViewTestCase):
@@ -746,3 +746,120 @@ class ServiceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
self.assertEqual(instance.protocol, service_template.protocol)
self.assertEqual(instance.ports, service_template.ports)
self.assertEqual(instance.description, service_template.description)
class L2VPNTestCase(ViewTestCases.PrimaryObjectViewTestCase):
model = L2VPN
csv_data = (
'name,slug,type,identifier',
'L2VPN 5,l2vpn-5,vxlan,456',
'L2VPN 6,l2vpn-6,vxlan,444',
)
bulk_edit_data = {
'description': 'New Description',
}
@classmethod
def setUpTestData(cls):
rts = (
RouteTarget(name='64534:123'),
RouteTarget(name='64534:321')
)
RouteTarget.objects.bulk_create(rts)
l2vpns = (
L2VPN(name='L2VPN 1', slug='l2vpn-1', type=L2VPNTypeChoices.TYPE_VXLAN, identifier='650001'),
L2VPN(name='L2VPN 2', slug='l2vpn-2', type=L2VPNTypeChoices.TYPE_VXLAN, identifier='650002'),
L2VPN(name='L2VPN 3', slug='l2vpn-3', type=L2VPNTypeChoices.TYPE_VXLAN, identifier='650003')
)
L2VPN.objects.bulk_create(l2vpns)
cls.form_data = {
'name': 'L2VPN 8',
'slug': 'l2vpn-8',
'type': L2VPNTypeChoices.TYPE_VXLAN,
'identifier': 123,
'description': 'Description',
'import_targets': [rts[0].pk],
'export_targets': [rts[1].pk]
}
class L2VPNTerminationTestCase(
ViewTestCases.GetObjectViewTestCase,
ViewTestCases.GetObjectChangelogViewTestCase,
ViewTestCases.CreateObjectViewTestCase,
ViewTestCases.EditObjectViewTestCase,
ViewTestCases.DeleteObjectViewTestCase,
ViewTestCases.ListObjectsViewTestCase,
ViewTestCases.BulkImportObjectsViewTestCase,
ViewTestCases.BulkDeleteObjectsViewTestCase,
):
model = L2VPNTermination
@classmethod
def setUpTestData(cls):
device = create_test_device('Device 1')
interface = Interface.objects.create(name='Interface 1', device=device, type='1000baset')
l2vpn = L2VPN.objects.create(name='L2VPN 1', type=L2VPNTypeChoices.TYPE_VXLAN, identifier=650001)
vlans = (
VLAN(name='Vlan 1', vid=1001),
VLAN(name='Vlan 2', vid=1002),
VLAN(name='Vlan 3', vid=1003),
VLAN(name='Vlan 4', vid=1004),
VLAN(name='Vlan 5', vid=1005),
VLAN(name='Vlan 6', vid=1006)
)
VLAN.objects.bulk_create(vlans)
terminations = (
L2VPNTermination(l2vpn=l2vpn, assigned_object=vlans[0]),
L2VPNTermination(l2vpn=l2vpn, assigned_object=vlans[1]),
L2VPNTermination(l2vpn=l2vpn, assigned_object=vlans[2])
)
L2VPNTermination.objects.bulk_create(terminations)
cls.form_data = {
'l2vpn': l2vpn.pk,
'device': device.pk,
'interface': interface.pk,
}
cls.csv_data = (
"l2vpn,vlan",
"L2VPN 1,Vlan 4",
"L2VPN 1,Vlan 5",
"L2VPN 1,Vlan 6",
)
cls.bulk_edit_data = {}
#
# Custom assertions
#
# TODO: Remove this
def assertInstanceEqual(self, instance, data, exclude=None, api=False):
"""
Override parent
"""
if exclude is None:
exclude = []
fields = [k for k in data.keys() if k not in exclude]
model_dict = self.model_to_dict(instance, fields=fields, api=api)
# Omit any dictionary keys which are not instance attributes or have been excluded
relevant_data = {
k: v for k, v in data.items() if hasattr(instance, k) and k not in exclude
}
# Handle relations on the model
for k, v in model_dict.items():
if isinstance(v, object) and hasattr(v, 'first'):
model_dict[k] = v.first().pk
self.assertDictEqual(model_dict, relevant_data)

View File

@@ -186,4 +186,26 @@ urlpatterns = [
path('services/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='service_changelog', kwargs={'model': Service}),
path('services/<int:pk>/journal/', ObjectJournalView.as_view(), name='service_journal', kwargs={'model': Service}),
# L2VPN
path('l2vpns/', views.L2VPNListView.as_view(), name='l2vpn_list'),
path('l2vpns/add/', views.L2VPNEditView.as_view(), name='l2vpn_add'),
path('l2vpns/import/', views.L2VPNBulkImportView.as_view(), name='l2vpn_import'),
path('l2vpns/edit/', views.L2VPNBulkEditView.as_view(), name='l2vpn_bulk_edit'),
path('l2vpns/delete/', views.L2VPNBulkDeleteView.as_view(), name='l2vpn_bulk_delete'),
path('l2vpns/<int:pk>/', views.L2VPNView.as_view(), name='l2vpn'),
path('l2vpns/<int:pk>/edit/', views.L2VPNEditView.as_view(), name='l2vpn_edit'),
path('l2vpns/<int:pk>/delete/', views.L2VPNDeleteView.as_view(), name='l2vpn_delete'),
path('l2vpns/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='l2vpn_changelog', kwargs={'model': L2VPN}),
path('l2vpns/<int:pk>/journal/', ObjectJournalView.as_view(), name='l2vpn_journal', kwargs={'model': L2VPN}),
path('l2vpn-terminations/', views.L2VPNTerminationListView.as_view(), name='l2vpntermination_list'),
path('l2vpn-terminations/add/', views.L2VPNTerminationEditView.as_view(), name='l2vpntermination_add'),
path('l2vpn-terminations/import/', views.L2VPNTerminationBulkImportView.as_view(), name='l2vpntermination_import'),
path('l2vpn-terminations/edit/', views.L2VPNTerminationBulkEditView.as_view(), name='l2vpntermination_bulk_edit'),
path('l2vpn-terminations/delete/', views.L2VPNTerminationBulkDeleteView.as_view(), name='l2vpntermination_bulk_delete'),
path('l2vpn-terminations/<int:pk>/', views.L2VPNTerminationView.as_view(), name='l2vpntermination'),
path('l2vpn-terminations/<int:pk>/edit/', views.L2VPNTerminationEditView.as_view(), name='l2vpntermination_edit'),
path('l2vpn-terminations/<int:pk>/delete/', views.L2VPNTerminationDeleteView.as_view(), name='l2vpntermination_delete'),
path('l2vpn-terminations/<int:pk>/changelog/', ObjectChangeLogView.as_view(), name='l2vpntermination_changelog', kwargs={'model': L2VPNTermination}),
path('l2vpn-terminations/<int:pk>/journal/', ObjectJournalView.as_view(), name='l2vpntermination_journal', kwargs={'model': L2VPNTermination}),
]

View File

@@ -17,6 +17,7 @@ from . import filtersets, forms, tables
from .constants import *
from .models import *
from .models import ASN
from .tables.l2vpn import L2VPNTable, L2VPNTerminationTable
from .utils import add_requested_prefixes, add_available_ipaddresses, add_available_vlans
@@ -39,11 +40,11 @@ class VRFView(generic.ObjectView):
ipaddress_count = IPAddress.objects.restrict(request.user, 'view').filter(vrf=instance).count()
import_targets_table = tables.RouteTargetTable(
instance.import_targets.prefetch_related('tenant'),
instance.import_targets.all(),
orderable=False
)
export_targets_table = tables.RouteTargetTable(
instance.export_targets.prefetch_related('tenant'),
instance.export_targets.all(),
orderable=False
)
@@ -71,14 +72,14 @@ class VRFBulkImportView(generic.BulkImportView):
class VRFBulkEditView(generic.BulkEditView):
queryset = VRF.objects.prefetch_related('tenant')
queryset = VRF.objects.all()
filterset = filtersets.VRFFilterSet
table = tables.VRFTable
form = forms.VRFBulkEditForm
class VRFBulkDeleteView(generic.BulkDeleteView):
queryset = VRF.objects.prefetch_related('tenant')
queryset = VRF.objects.all()
filterset = filtersets.VRFFilterSet
table = tables.VRFTable
@@ -99,11 +100,11 @@ class RouteTargetView(generic.ObjectView):
def get_extra_context(self, request, instance):
importing_vrfs_table = tables.VRFTable(
instance.importing_vrfs.prefetch_related('tenant'),
instance.importing_vrfs.all(),
orderable=False
)
exporting_vrfs_table = tables.VRFTable(
instance.exporting_vrfs.prefetch_related('tenant'),
instance.exporting_vrfs.all(),
orderable=False
)
@@ -129,14 +130,14 @@ class RouteTargetBulkImportView(generic.BulkImportView):
class RouteTargetBulkEditView(generic.BulkEditView):
queryset = RouteTarget.objects.prefetch_related('tenant')
queryset = RouteTarget.objects.all()
filterset = filtersets.RouteTargetFilterSet
table = tables.RouteTargetTable
form = forms.RouteTargetBulkEditForm
class RouteTargetBulkDeleteView(generic.BulkDeleteView):
queryset = RouteTarget.objects.prefetch_related('tenant')
queryset = RouteTarget.objects.all()
filterset = filtersets.RouteTargetFilterSet
table = tables.RouteTargetTable
@@ -420,7 +421,7 @@ class PrefixListView(generic.ObjectListView):
class PrefixView(generic.ObjectView):
queryset = Prefix.objects.prefetch_related('vrf', 'site__region', 'tenant__group', 'vlan__group', 'role')
queryset = Prefix.objects.all()
def get_extra_context(self, request, instance):
try:
@@ -436,7 +437,7 @@ class PrefixView(generic.ObjectView):
).filter(
prefix__net_contains=str(instance.prefix)
).prefetch_related(
'site', 'role', 'tenant'
'site', 'role', 'tenant', 'vlan',
)
parent_prefix_table = tables.PrefixTable(
list(parent_prefixes),
@@ -450,7 +451,7 @@ class PrefixView(generic.ObjectView):
).exclude(
pk=instance.pk
).prefetch_related(
'site', 'role'
'site', 'role', 'tenant', 'vlan',
)
duplicate_prefix_table = tables.PrefixTable(
list(duplicate_prefixes),
@@ -503,7 +504,7 @@ class PrefixIPRangesView(generic.ObjectChildrenView):
def get_children(self, request, parent):
return parent.get_child_ranges().restrict(request.user, 'view').prefetch_related(
'vrf', 'role', 'tenant', 'tenant__group',
'tenant__group',
)
def get_extra_context(self, request, instance):
@@ -522,7 +523,7 @@ class PrefixIPAddressesView(generic.ObjectChildrenView):
template_name = 'ipam/prefix/ip_addresses.html'
def get_children(self, request, parent):
return parent.get_child_ips().restrict(request.user, 'view').prefetch_related('vrf', 'tenant')
return parent.get_child_ips().restrict(request.user, 'view').prefetch_related('vrf', 'tenant', 'tenant__group')
def prep_table_data(self, request, queryset, parent):
show_available = bool(request.GET.get('show_available', 'true') == 'true')
@@ -555,14 +556,14 @@ class PrefixBulkImportView(generic.BulkImportView):
class PrefixBulkEditView(generic.BulkEditView):
queryset = Prefix.objects.prefetch_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role')
queryset = Prefix.objects.prefetch_related('vrf__tenant')
filterset = filtersets.PrefixFilterSet
table = tables.PrefixTable
form = forms.PrefixBulkEditForm
class PrefixBulkDeleteView(generic.BulkDeleteView):
queryset = Prefix.objects.prefetch_related('site', 'vrf__tenant', 'tenant', 'vlan', 'role')
queryset = Prefix.objects.prefetch_related('vrf__tenant')
filterset = filtersets.PrefixFilterSet
table = tables.PrefixTable
@@ -614,14 +615,14 @@ class IPRangeBulkImportView(generic.BulkImportView):
class IPRangeBulkEditView(generic.BulkEditView):
queryset = IPRange.objects.prefetch_related('vrf', 'tenant')
queryset = IPRange.objects.all()
filterset = filtersets.IPRangeFilterSet
table = tables.IPRangeTable
form = forms.IPRangeBulkEditForm
class IPRangeBulkDeleteView(generic.BulkDeleteView):
queryset = IPRange.objects.prefetch_related('vrf', 'tenant')
queryset = IPRange.objects.all()
filterset = filtersets.IPRangeFilterSet
table = tables.IPRangeTable
@@ -792,14 +793,14 @@ class IPAddressBulkImportView(generic.BulkImportView):
class IPAddressBulkEditView(generic.BulkEditView):
queryset = IPAddress.objects.prefetch_related('vrf__tenant', 'tenant')
queryset = IPAddress.objects.prefetch_related('vrf__tenant')
filterset = filtersets.IPAddressFilterSet
table = tables.IPAddressTable
form = forms.IPAddressBulkEditForm
class IPAddressBulkDeleteView(generic.BulkDeleteView):
queryset = IPAddress.objects.prefetch_related('vrf__tenant', 'tenant')
queryset = IPAddress.objects.prefetch_related('vrf__tenant')
filterset = filtersets.IPAddressFilterSet
table = tables.IPAddressTable
@@ -822,7 +823,8 @@ class VLANGroupView(generic.ObjectView):
def get_extra_context(self, request, instance):
vlans = VLAN.objects.restrict(request.user, 'view').filter(group=instance).prefetch_related(
Prefetch('prefixes', queryset=Prefix.objects.restrict(request.user))
Prefetch('prefixes', queryset=Prefix.objects.restrict(request.user)),
'tenant', 'site', 'role',
).order_by('vid')
vlans_count = vlans.count()
vlans = add_available_vlans(vlans, vlan_group=instance)
@@ -897,7 +899,7 @@ class FHRPGroupView(generic.ObjectView):
def get_extra_context(self, request, instance):
# Get assigned IP addresses
ipaddress_table = tables.AssignedIPAddressesTable(
data=instance.ip_addresses.restrict(request.user, 'view').prefetch_related('vrf', 'tenant'),
data=instance.ip_addresses.restrict(request.user, 'view'),
orderable=False
)
@@ -987,11 +989,11 @@ class VLANListView(generic.ObjectListView):
class VLANView(generic.ObjectView):
queryset = VLAN.objects.prefetch_related('site__region', 'tenant__group', 'role')
queryset = VLAN.objects.all()
def get_extra_context(self, request, instance):
prefixes = Prefix.objects.restrict(request.user, 'view').filter(vlan=instance).prefetch_related(
'vrf', 'site', 'role'
'vrf', 'site', 'role', 'tenant'
)
prefix_table = tables.PrefixTable(list(prefixes), exclude=('vlan', 'utilization'), orderable=False)
@@ -1049,14 +1051,14 @@ class VLANBulkImportView(generic.BulkImportView):
class VLANBulkEditView(generic.BulkEditView):
queryset = VLAN.objects.prefetch_related('site', 'group', 'tenant', 'role')
queryset = VLAN.objects.all()
filterset = filtersets.VLANFilterSet
table = tables.VLANTable
form = forms.VLANBulkEditForm
class VLANBulkDeleteView(generic.BulkDeleteView):
queryset = VLAN.objects.prefetch_related('site', 'group', 'tenant', 'role')
queryset = VLAN.objects.all()
filterset = filtersets.VLANFilterSet
table = tables.VLANTable
@@ -1109,14 +1111,14 @@ class ServiceTemplateBulkDeleteView(generic.BulkDeleteView):
#
class ServiceListView(generic.ObjectListView):
queryset = Service.objects.all()
queryset = Service.objects.prefetch_related('device', 'virtual_machine')
filterset = filtersets.ServiceFilterSet
filterset_form = forms.ServiceFilterForm
table = tables.ServiceTable
class ServiceView(generic.ObjectView):
queryset = Service.objects.prefetch_related('ipaddresses')
queryset = Service.objects.all()
class ServiceCreateView(generic.ObjectEditView):
@@ -1126,7 +1128,7 @@ class ServiceCreateView(generic.ObjectEditView):
class ServiceEditView(generic.ObjectEditView):
queryset = Service.objects.prefetch_related('ipaddresses')
queryset = Service.objects.all()
form = forms.ServiceForm
template_name = 'ipam/service_edit.html'
@@ -1152,3 +1154,105 @@ class ServiceBulkDeleteView(generic.BulkDeleteView):
queryset = Service.objects.prefetch_related('device', 'virtual_machine')
filterset = filtersets.ServiceFilterSet
table = tables.ServiceTable
# L2VPN
class L2VPNListView(generic.ObjectListView):
queryset = L2VPN.objects.all()
table = L2VPNTable
filterset = filtersets.L2VPNFilterSet
filterset_form = forms.L2VPNFilterForm
class L2VPNView(generic.ObjectView):
queryset = L2VPN.objects.all()
def get_extra_context(self, request, instance):
terminations = L2VPNTermination.objects.restrict(request.user, 'view').filter(l2vpn=instance)
terminations_table = tables.L2VPNTerminationTable(terminations, user=request.user, exclude=('l2vpn', ))
terminations_table.configure(request)
import_targets_table = tables.RouteTargetTable(
instance.import_targets.prefetch_related('tenant'),
orderable=False
)
export_targets_table = tables.RouteTargetTable(
instance.export_targets.prefetch_related('tenant'),
orderable=False
)
return {
'terminations_table': terminations_table,
'import_targets_table': import_targets_table,
'export_targets_table': export_targets_table,
}
class L2VPNEditView(generic.ObjectEditView):
queryset = L2VPN.objects.all()
form = forms.L2VPNForm
class L2VPNDeleteView(generic.ObjectDeleteView):
queryset = L2VPN.objects.all()
class L2VPNBulkImportView(generic.BulkImportView):
queryset = L2VPN.objects.all()
model_form = forms.L2VPNCSVForm
table = tables.L2VPNTable
class L2VPNBulkEditView(generic.BulkEditView):
queryset = L2VPN.objects.all()
filterset = filtersets.L2VPNFilterSet
table = tables.L2VPNTable
form = forms.L2VPNBulkEditForm
class L2VPNBulkDeleteView(generic.BulkDeleteView):
queryset = L2VPN.objects.all()
filterset = filtersets.L2VPNFilterSet
table = tables.L2VPNTable
class L2VPNTerminationListView(generic.ObjectListView):
queryset = L2VPNTermination.objects.all()
table = L2VPNTerminationTable
filterset = filtersets.L2VPNTerminationFilterSet
filterset_form = forms.L2VPNTerminationFilterForm
class L2VPNTerminationView(generic.ObjectView):
queryset = L2VPNTermination.objects.all()
class L2VPNTerminationEditView(generic.ObjectEditView):
queryset = L2VPNTermination.objects.all()
form = forms.L2VPNTerminationForm
template_name = 'ipam/l2vpntermination_edit.html'
class L2VPNTerminationDeleteView(generic.ObjectDeleteView):
queryset = L2VPNTermination.objects.all()
class L2VPNTerminationBulkImportView(generic.BulkImportView):
queryset = L2VPNTermination.objects.all()
model_form = forms.L2VPNTerminationCSVForm
table = tables.L2VPNTerminationTable
class L2VPNTerminationBulkEditView(generic.BulkEditView):
queryset = L2VPNTermination.objects.all()
filterset = filtersets.L2VPNTerminationFilterSet
table = tables.L2VPNTerminationTable
form = forms.L2VPNTerminationBulkEditForm
class L2VPNTerminationBulkDeleteView(generic.BulkDeleteView):
queryset = L2VPNTermination.objects.all()
filterset = filtersets.L2VPNTerminationFilterSet
table = tables.L2VPNTerminationTable