Fixes #15924: Fix API interface patch tagged all mode (#18759)

* Fixes: #15924 - Prevent API payload from allowing tagged_vlans while interface mode is set to taged-all

* Prevent cleanup of tagged_vlans when no tagged_vlans set on interface

* Fix test errors

* Remove accidental debug statements

* Update validation to model clean method instead of serializer

* Remove clearing of tagged vlans from `save()`

* Make changes to validation to account for M2M not being available under model in addition to not being able to check incoming vlans under same model.

* Optimize untagged vlan check

* Re-ordering statements in validators

* Forgot to call super().clean()

* Adjust logic for form and serializer.  Add tests

* Fix test failure

* Fix ruff errors

* Fix test by removing now invalid test

* Update serializer, form and tests

* Optimize API test for vlan fields

* Optimize API serializer logic

---------

Co-authored-by: Daniel Sheppard <dans@dansheps.com>
This commit is contained in:
Jeremy Stretch 2025-02-28 11:01:48 -05:00 committed by GitHub
parent 09d867adc3
commit 63a167f130
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 292 additions and 15 deletions

View File

@ -1,3 +1,4 @@
from django.utils.translation import gettext as _
from django.contrib.contenttypes.models import ContentType
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
@ -232,8 +233,56 @@ class InterfaceSerializer(NetBoxModelSerializer, CabledObjectSerializer, Connect
def validate(self, data):
# Validate many-to-many VLAN assignments
if not self.nested:
# Validate 802.1q mode and vlan(s)
mode = None
tagged_vlans = []
# Gather Information
if self.instance:
mode = data.get('mode') if 'mode' in data.keys() else self.instance.mode
untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else \
self.instance.untagged_vlan
qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else \
self.instance.qinq_svlan
tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else \
self.instance.tagged_vlans.all()
else:
mode = data.get('mode', None)
untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else None
qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else None
tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else None
errors = {}
# Non Q-in-Q mode with service vlan set
if mode != InterfaceModeChoices.MODE_Q_IN_Q and qinq_svlan:
errors.update({
'qinq_svlan': _("Interface mode does not support q-in-q service vlan")
})
# Routed mode
if not mode:
# Untagged vlan
if untagged_vlan:
errors.update({
'untagged_vlan': _("Interface mode does not support untagged vlan")
})
# Tagged vlan
if tagged_vlans:
errors.update({
'tagged_vlans': _("Interface mode does not support tagged vlans")
})
# Non-tagged mode
elif mode in (InterfaceModeChoices.MODE_TAGGED_ALL, InterfaceModeChoices.MODE_ACCESS) and tagged_vlans:
errors.update({
'tagged_vlans': _("Interface mode does not support tagged vlans")
})
if errors:
raise serializers.ValidationError(errors)
# Validate many-to-many VLAN assignments
device = self.instance.device if self.instance else data.get('device')
for vlan in data.get('tagged_vlans', []):
if vlan.site not in [device.site, None]:

View File

@ -43,20 +43,14 @@ class InterfaceCommonForm(forms.Form):
super().clean()
parent_field = 'device' if 'device' in self.cleaned_data else 'virtual_machine'
tagged_vlans = self.cleaned_data.get('tagged_vlans')
# Untagged interfaces cannot be assigned tagged VLANs
if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_ACCESS and tagged_vlans:
raise forms.ValidationError({
'mode': _("An access interface cannot have tagged VLANs assigned.")
})
# Remove all tagged VLAN assignments from "tagged all" interfaces
elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED_ALL:
self.cleaned_data['tagged_vlans'] = []
if 'tagged_vlans' in self.fields.keys():
tagged_vlans = self.cleaned_data.get('tagged_vlans') if self.is_bound else \
self.get_initial_for_field(self.fields['tagged_vlans'], 'tagged_vlans')
else:
tagged_vlans = []
# Validate tagged VLANs; must be a global VLAN or in the same site
elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans:
if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans:
valid_sites = [None, self.cleaned_data[parent_field].site]
invalid_vlans = [str(v) for v in tagged_vlans if v.site not in valid_sites]

View File

@ -934,6 +934,8 @@ class Interface(ModularComponentModel, BaseInterface, CabledObjectModel, PathEnd
raise ValidationError({'rf_channel_width': _("Cannot specify custom width with channel selected.")})
# VLAN validation
if not self.mode and self.untagged_vlan:
raise ValidationError({'untagged_vlan': _("Interface mode does not support an untagged vlan.")})
# Validate untagged VLAN
if self.untagged_vlan and self.untagged_vlan.site not in [self.device.site, None]:

View File

@ -1,3 +1,5 @@
import json
from django.test import override_settings
from django.urls import reverse
from django.utils.translation import gettext as _
@ -1748,6 +1750,23 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase
},
]
def _perform_interface_test_with_invalid_data(self, mode: str = None, invalid_data: dict = {}):
device = Device.objects.first()
data = {
'device': device.pk,
'name': 'Interface 1',
'type': InterfaceTypeChoices.TYPE_1GE_FIXED,
}
data.update({'mode': mode})
data.update(invalid_data)
response = self.client.post(self._get_list_url(), data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
content = json.loads(response.content)
for key in invalid_data.keys():
self.assertIn(key, content)
self.assertIsNone(content.get('data'))
def test_bulk_delete_child_interfaces(self):
interface1 = Interface.objects.get(name='Interface 1')
device = interface1.device
@ -1775,6 +1794,57 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase
self.client.delete(self._get_list_url(), data, format='json', **self.header)
self.assertEqual(device.interfaces.count(), 2) # Child & parent were both deleted
def test_create_child_interfaces_mode_invalid_data(self):
"""
POST data to test interface mode check and invalid tagged/untagged VLANS.
"""
self.add_permissions('dcim.add_interface')
vlans = VLAN.objects.all()[0:3]
# Routed mode, untagged, tagged and qinq service vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
'qinq_svlan': vlans[2].pk
}
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Routed mode, untagged and tagged vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
}
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Routed mode, untagged vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
}
self._perform_interface_test_with_invalid_data(None, invalid_data)
invalid_data = {
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
}
# Routed mode, qinq service vlan
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Access mode, tagged vlans
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data)
# All tagged mode, tagged vlans
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data)
invalid_data = {
'qinq_svlan': vlans[0].pk,
}
# Routed mode, qinq service vlan
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Access mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data)
# Tagged mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED, invalid_data)
# Tagged-all mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data)
class FrontPortTest(APIViewTestCases.APIViewTestCase):
model = FrontPort

View File

@ -1,8 +1,9 @@
from django.test import TestCase
from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices
from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices, InterfaceModeChoices
from dcim.forms import *
from dcim.models import *
from ipam.models import VLAN
from utilities.testing import create_test_device
from virtualization.models import Cluster, ClusterGroup, ClusterType
@ -117,11 +118,23 @@ class DeviceTestCase(TestCase):
self.assertIn('position', form.errors)
class LabelTestCase(TestCase):
class InterfaceTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.device = create_test_device('Device 1')
cls.vlans = (
VLAN(name='VLAN 1', vid=1),
VLAN(name='VLAN 2', vid=2),
VLAN(name='VLAN 3', vid=3),
)
VLAN.objects.bulk_create(cls.vlans)
cls.interface = Interface.objects.create(
device=cls.device,
name='Interface 1',
type=InterfaceTypeChoices.TYPE_1GE_GBIC,
mode=InterfaceModeChoices.MODE_TAGGED,
)
def test_interface_label_count_valid(self):
"""
@ -151,3 +164,152 @@ class LabelTestCase(TestCase):
self.assertFalse(form.is_valid())
self.assertIn('label', form.errors)
def test_create_interface_mode_valid_data(self):
"""
Test that saving valid interface mode and tagged/untagged vlans works properly
"""
# Validate access mode
data = {
'device': self.device.pk,
'name': 'ethernet1/1',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'untagged_vlan': self.vlans[0].pk
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
# Validate tagged vlans
data = {
'device': self.device.pk,
'name': 'ethernet1/2',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
# Validate tagged vlans
data = {
'device': self.device.pk,
'name': 'ethernet1/3',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'untagged_vlan': self.vlans[0].pk,
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
def test_create_interface_mode_access_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/4',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_edit_interface_mode_access_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/5',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data, instance=self.interface)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_create_interface_mode_tagged_all_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/6',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_edit_interface_mode_tagged_all_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/7',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_create_interface_mode_routed_invalid_data(self):
"""
Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/6',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': None,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_edit_interface_mode_routed_invalid_data(self):
"""
Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/7',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': None,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data)
self.assertTrue(form.is_valid())
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())