From 63a167f1302582b065be74a6021f828e6404d7e4 Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Fri, 28 Feb 2025 11:01:48 -0500 Subject: [PATCH] Fixes #15924: Fix API interface patch tagged all mode (#18759) * Fixes: #15924 - Prevent API payload from allowing tagged_vlans while interface mode is set to taged-all * Prevent cleanup of tagged_vlans when no tagged_vlans set on interface * Fix test errors * Remove accidental debug statements * Update validation to model clean method instead of serializer * Remove clearing of tagged vlans from `save()` * Make changes to validation to account for M2M not being available under model in addition to not being able to check incoming vlans under same model. * Optimize untagged vlan check * Re-ordering statements in validators * Forgot to call super().clean() * Adjust logic for form and serializer. Add tests * Fix test failure * Fix ruff errors * Fix test by removing now invalid test * Update serializer, form and tests * Optimize API test for vlan fields * Optimize API serializer logic --------- Co-authored-by: Daniel Sheppard --- .../api/serializers_/device_components.py | 51 +++++- netbox/dcim/forms/common.py | 18 +- netbox/dcim/models/device_components.py | 2 + netbox/dcim/tests/test_api.py | 70 ++++++++ netbox/dcim/tests/test_forms.py | 166 +++++++++++++++++- 5 files changed, 292 insertions(+), 15 deletions(-) diff --git a/netbox/dcim/api/serializers_/device_components.py b/netbox/dcim/api/serializers_/device_components.py index a6767bb6f..b591030aa 100644 --- a/netbox/dcim/api/serializers_/device_components.py +++ b/netbox/dcim/api/serializers_/device_components.py @@ -1,3 +1,4 @@ +from django.utils.translation import gettext as _ from django.contrib.contenttypes.models import ContentType from drf_spectacular.utils import extend_schema_field from rest_framework import serializers @@ -232,8 +233,56 @@ class InterfaceSerializer(NetBoxModelSerializer, CabledObjectSerializer, Connect def validate(self, data): - # Validate many-to-many VLAN assignments if not self.nested: + + # Validate 802.1q mode and vlan(s) + mode = None + tagged_vlans = [] + + # Gather Information + if self.instance: + mode = data.get('mode') if 'mode' in data.keys() else self.instance.mode + untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else \ + self.instance.untagged_vlan + qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else \ + self.instance.qinq_svlan + tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else \ + self.instance.tagged_vlans.all() + else: + mode = data.get('mode', None) + untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else None + qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else None + tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else None + + errors = {} + + # Non Q-in-Q mode with service vlan set + if mode != InterfaceModeChoices.MODE_Q_IN_Q and qinq_svlan: + errors.update({ + 'qinq_svlan': _("Interface mode does not support q-in-q service vlan") + }) + # Routed mode + if not mode: + # Untagged vlan + if untagged_vlan: + errors.update({ + 'untagged_vlan': _("Interface mode does not support untagged vlan") + }) + # Tagged vlan + if tagged_vlans: + errors.update({ + 'tagged_vlans': _("Interface mode does not support tagged vlans") + }) + # Non-tagged mode + elif mode in (InterfaceModeChoices.MODE_TAGGED_ALL, InterfaceModeChoices.MODE_ACCESS) and tagged_vlans: + errors.update({ + 'tagged_vlans': _("Interface mode does not support tagged vlans") + }) + + if errors: + raise serializers.ValidationError(errors) + + # Validate many-to-many VLAN assignments device = self.instance.device if self.instance else data.get('device') for vlan in data.get('tagged_vlans', []): if vlan.site not in [device.site, None]: diff --git a/netbox/dcim/forms/common.py b/netbox/dcim/forms/common.py index 8ca258f34..23109f66b 100644 --- a/netbox/dcim/forms/common.py +++ b/netbox/dcim/forms/common.py @@ -43,20 +43,14 @@ class InterfaceCommonForm(forms.Form): super().clean() parent_field = 'device' if 'device' in self.cleaned_data else 'virtual_machine' - tagged_vlans = self.cleaned_data.get('tagged_vlans') - - # Untagged interfaces cannot be assigned tagged VLANs - if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_ACCESS and tagged_vlans: - raise forms.ValidationError({ - 'mode': _("An access interface cannot have tagged VLANs assigned.") - }) - - # Remove all tagged VLAN assignments from "tagged all" interfaces - elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED_ALL: - self.cleaned_data['tagged_vlans'] = [] + if 'tagged_vlans' in self.fields.keys(): + tagged_vlans = self.cleaned_data.get('tagged_vlans') if self.is_bound else \ + self.get_initial_for_field(self.fields['tagged_vlans'], 'tagged_vlans') + else: + tagged_vlans = [] # Validate tagged VLANs; must be a global VLAN or in the same site - elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans: + if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans: valid_sites = [None, self.cleaned_data[parent_field].site] invalid_vlans = [str(v) for v in tagged_vlans if v.site not in valid_sites] diff --git a/netbox/dcim/models/device_components.py b/netbox/dcim/models/device_components.py index ce9e5607f..8a8e8f4cc 100644 --- a/netbox/dcim/models/device_components.py +++ b/netbox/dcim/models/device_components.py @@ -934,6 +934,8 @@ class Interface(ModularComponentModel, BaseInterface, CabledObjectModel, PathEnd raise ValidationError({'rf_channel_width': _("Cannot specify custom width with channel selected.")}) # VLAN validation + if not self.mode and self.untagged_vlan: + raise ValidationError({'untagged_vlan': _("Interface mode does not support an untagged vlan.")}) # Validate untagged VLAN if self.untagged_vlan and self.untagged_vlan.site not in [self.device.site, None]: diff --git a/netbox/dcim/tests/test_api.py b/netbox/dcim/tests/test_api.py index 99a446aef..08f93f6ea 100644 --- a/netbox/dcim/tests/test_api.py +++ b/netbox/dcim/tests/test_api.py @@ -1,3 +1,5 @@ +import json + from django.test import override_settings from django.urls import reverse from django.utils.translation import gettext as _ @@ -1748,6 +1750,23 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase }, ] + def _perform_interface_test_with_invalid_data(self, mode: str = None, invalid_data: dict = {}): + device = Device.objects.first() + data = { + 'device': device.pk, + 'name': 'Interface 1', + 'type': InterfaceTypeChoices.TYPE_1GE_FIXED, + } + data.update({'mode': mode}) + data.update(invalid_data) + + response = self.client.post(self._get_list_url(), data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + content = json.loads(response.content) + for key in invalid_data.keys(): + self.assertIn(key, content) + self.assertIsNone(content.get('data')) + def test_bulk_delete_child_interfaces(self): interface1 = Interface.objects.get(name='Interface 1') device = interface1.device @@ -1775,6 +1794,57 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase self.client.delete(self._get_list_url(), data, format='json', **self.header) self.assertEqual(device.interfaces.count(), 2) # Child & parent were both deleted + def test_create_child_interfaces_mode_invalid_data(self): + """ + POST data to test interface mode check and invalid tagged/untagged VLANS. + """ + self.add_permissions('dcim.add_interface') + + vlans = VLAN.objects.all()[0:3] + + # Routed mode, untagged, tagged and qinq service vlan + invalid_data = { + 'untagged_vlan': vlans[0].pk, + 'tagged_vlans': [vlans[1].pk, vlans[2].pk], + 'qinq_svlan': vlans[2].pk + } + self._perform_interface_test_with_invalid_data(None, invalid_data) + + # Routed mode, untagged and tagged vlan + invalid_data = { + 'untagged_vlan': vlans[0].pk, + 'tagged_vlans': [vlans[1].pk, vlans[2].pk], + } + self._perform_interface_test_with_invalid_data(None, invalid_data) + + # Routed mode, untagged vlan + invalid_data = { + 'untagged_vlan': vlans[0].pk, + } + self._perform_interface_test_with_invalid_data(None, invalid_data) + + invalid_data = { + 'tagged_vlans': [vlans[1].pk, vlans[2].pk], + } + # Routed mode, qinq service vlan + self._perform_interface_test_with_invalid_data(None, invalid_data) + # Access mode, tagged vlans + self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data) + # All tagged mode, tagged vlans + self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data) + + invalid_data = { + 'qinq_svlan': vlans[0].pk, + } + # Routed mode, qinq service vlan + self._perform_interface_test_with_invalid_data(None, invalid_data) + # Access mode, qinq service vlan + self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data) + # Tagged mode, qinq service vlan + self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED, invalid_data) + # Tagged-all mode, qinq service vlan + self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data) + class FrontPortTest(APIViewTestCases.APIViewTestCase): model = FrontPort diff --git a/netbox/dcim/tests/test_forms.py b/netbox/dcim/tests/test_forms.py index 7a57bf3f0..89b7508f3 100644 --- a/netbox/dcim/tests/test_forms.py +++ b/netbox/dcim/tests/test_forms.py @@ -1,8 +1,9 @@ from django.test import TestCase -from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices +from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices, InterfaceModeChoices from dcim.forms import * from dcim.models import * +from ipam.models import VLAN from utilities.testing import create_test_device from virtualization.models import Cluster, ClusterGroup, ClusterType @@ -117,11 +118,23 @@ class DeviceTestCase(TestCase): self.assertIn('position', form.errors) -class LabelTestCase(TestCase): +class InterfaceTestCase(TestCase): @classmethod def setUpTestData(cls): cls.device = create_test_device('Device 1') + cls.vlans = ( + VLAN(name='VLAN 1', vid=1), + VLAN(name='VLAN 2', vid=2), + VLAN(name='VLAN 3', vid=3), + ) + VLAN.objects.bulk_create(cls.vlans) + cls.interface = Interface.objects.create( + device=cls.device, + name='Interface 1', + type=InterfaceTypeChoices.TYPE_1GE_GBIC, + mode=InterfaceModeChoices.MODE_TAGGED, + ) def test_interface_label_count_valid(self): """ @@ -151,3 +164,152 @@ class LabelTestCase(TestCase): self.assertFalse(form.is_valid()) self.assertIn('label', form.errors) + + def test_create_interface_mode_valid_data(self): + """ + Test that saving valid interface mode and tagged/untagged vlans works properly + """ + + # Validate access mode + data = { + 'device': self.device.pk, + 'name': 'ethernet1/1', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_ACCESS, + 'untagged_vlan': self.vlans[0].pk + } + form = InterfaceCreateForm(data) + + self.assertTrue(form.is_valid()) + + # Validate tagged vlans + data = { + 'device': self.device.pk, + 'name': 'ethernet1/2', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_TAGGED, + 'untagged_vlan': self.vlans[0].pk, + 'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceCreateForm(data) + self.assertTrue(form.is_valid()) + + # Validate tagged vlans + data = { + 'device': self.device.pk, + 'name': 'ethernet1/3', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_TAGGED_ALL, + 'untagged_vlan': self.vlans[0].pk, + } + form = InterfaceCreateForm(data) + self.assertTrue(form.is_valid()) + + def test_create_interface_mode_access_invalid_data(self): + """ + Test that saving invalid interface mode and tagged/untagged vlans works properly + """ + data = { + 'device': self.device.pk, + 'name': 'ethernet1/4', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_ACCESS, + 'untagged_vlan': self.vlans[0].pk, + 'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceCreateForm(data) + + self.assertTrue(form.is_valid()) + self.assertIn('untagged_vlan', form.cleaned_data.keys()) + self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) + self.assertNotIn('qinq_svlan', form.cleaned_data.keys()) + + def test_edit_interface_mode_access_invalid_data(self): + """ + Test that saving invalid interface mode and tagged/untagged vlans works properly + """ + data = { + 'device': self.device.pk, + 'name': 'Ethernet 1/5', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_ACCESS, + 'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceForm(data, instance=self.interface) + + self.assertTrue(form.is_valid()) + self.assertIn('untagged_vlan', form.cleaned_data.keys()) + self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) + self.assertNotIn('qinq_svlan', form.cleaned_data.keys()) + + def test_create_interface_mode_tagged_all_invalid_data(self): + """ + Test that saving invalid interface mode and tagged/untagged vlans works properly + """ + data = { + 'device': self.device.pk, + 'name': 'ethernet1/6', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_TAGGED_ALL, + 'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceCreateForm(data) + + self.assertTrue(form.is_valid()) + self.assertIn('untagged_vlan', form.cleaned_data.keys()) + self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) + self.assertNotIn('qinq_svlan', form.cleaned_data.keys()) + + def test_edit_interface_mode_tagged_all_invalid_data(self): + """ + Test that saving invalid interface mode and tagged/untagged vlans works properly + """ + data = { + 'device': self.device.pk, + 'name': 'Ethernet 1/7', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': InterfaceModeChoices.MODE_TAGGED_ALL, + 'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceForm(data) + self.assertTrue(form.is_valid()) + self.assertIn('untagged_vlan', form.cleaned_data.keys()) + self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) + self.assertNotIn('qinq_svlan', form.cleaned_data.keys()) + + def test_create_interface_mode_routed_invalid_data(self): + """ + Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly + """ + data = { + 'device': self.device.pk, + 'name': 'ethernet1/6', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': None, + 'untagged_vlan': self.vlans[0].pk, + 'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceCreateForm(data) + + self.assertTrue(form.is_valid()) + self.assertNotIn('untagged_vlan', form.cleaned_data.keys()) + self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) + self.assertNotIn('qinq_svlan', form.cleaned_data.keys()) + + def test_edit_interface_mode_routed_invalid_data(self): + """ + Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly + """ + data = { + 'device': self.device.pk, + 'name': 'Ethernet 1/7', + 'type': InterfaceTypeChoices.TYPE_1GE_GBIC, + 'mode': None, + 'untagged_vlan': self.vlans[0].pk, + 'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk] + } + form = InterfaceForm(data) + self.assertTrue(form.is_valid()) + self.assertNotIn('untagged_vlan', form.cleaned_data.keys()) + self.assertNotIn('tagged_vlans', form.cleaned_data.keys()) + self.assertNotIn('qinq_svlan', form.cleaned_data.keys())