Fixes: #15016 - Catch AssertionError from cable trace and throw ValidationError (#16384)

This commit is contained in:
Daniel Sheppard 2025-03-04 12:57:27 -06:00 committed by GitHub
parent d208ddde9a
commit 4ab58f2da9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 15 deletions

View File

@ -0,0 +1,2 @@
class UnsupportedCablePath(Exception):
pass

View File

@ -15,6 +15,7 @@ from dcim.fields import PathField
from dcim.utils import decompile_path_node, object_to_path_node from dcim.utils import decompile_path_node, object_to_path_node
from netbox.models import ChangeLoggedModel, PrimaryModel from netbox.models import ChangeLoggedModel, PrimaryModel
from utilities.conversion import to_meters from utilities.conversion import to_meters
from utilities.exceptions import AbortRequest
from utilities.fields import ColorField from utilities.fields import ColorField
from utilities.querysets import RestrictedQuerySet from utilities.querysets import RestrictedQuerySet
from wireless.models import WirelessLink from wireless.models import WirelessLink
@ -26,6 +27,7 @@ __all__ = (
'CableTermination', 'CableTermination',
) )
from ..exceptions import UnsupportedCablePath
trace_paths = Signal() trace_paths = Signal()
@ -236,8 +238,10 @@ class Cable(PrimaryModel):
for termination in self.b_terminations: for termination in self.b_terminations:
if not termination.pk or termination not in b_terminations: if not termination.pk or termination not in b_terminations:
CableTermination(cable=self, cable_end='B', termination=termination).save() CableTermination(cable=self, cable_end='B', termination=termination).save()
try:
trace_paths.send(Cable, instance=self, created=_created) trace_paths.send(Cable, instance=self, created=_created)
except UnsupportedCablePath as e:
raise AbortRequest(e)
def get_status_color(self): def get_status_color(self):
return LinkStatusChoices.colors.get(self.status) return LinkStatusChoices.colors.get(self.status)
@ -531,8 +535,8 @@ class CablePath(models.Model):
return None return None
# Ensure all originating terminations are attached to the same link # Ensure all originating terminations are attached to the same link
if len(terminations) > 1: if len(terminations) > 1 and not all(t.link == terminations[0].link for t in terminations[1:]):
assert all(t.link == terminations[0].link for t in terminations[1:]) raise UnsupportedCablePath(_("All originating terminations must be attached to the same link"))
path = [] path = []
position_stack = [] position_stack = []
@ -543,12 +547,13 @@ class CablePath(models.Model):
while terminations: while terminations:
# Terminations must all be of the same type # Terminations must all be of the same type
assert all(isinstance(t, type(terminations[0])) for t in terminations[1:]) if not all(isinstance(t, type(terminations[0])) for t in terminations[1:]):
raise UnsupportedCablePath(_("All mid-span terminations must have the same termination type"))
# All mid-span terminations must all be attached to the same device # All mid-span terminations must all be attached to the same device
if not isinstance(terminations[0], PathEndpoint): if (not isinstance(terminations[0], PathEndpoint) and not
assert all(isinstance(t, type(terminations[0])) for t in terminations[1:]) all(t.parent_object == terminations[0].parent_object for t in terminations[1:])):
assert all(t.parent_object == terminations[0].parent_object for t in terminations[1:]) raise UnsupportedCablePath(_("All mid-span terminations must have the same parent object"))
# Check for a split path (e.g. rear port fanning out to multiple front ports with # Check for a split path (e.g. rear port fanning out to multiple front ports with
# different cables attached) # different cables attached)
@ -571,8 +576,10 @@ class CablePath(models.Model):
return None return None
# Otherwise, halt the trace if no link exists # Otherwise, halt the trace if no link exists
break break
assert all(type(link) in (Cable, WirelessLink) for link in links) if not all(type(link) in (Cable, WirelessLink) for link in links):
assert all(isinstance(link, type(links[0])) for link in links) raise UnsupportedCablePath(_("All links must be cable or wireless"))
if not all(isinstance(link, type(links[0])) for link in links):
raise UnsupportedCablePath(_("All links must match first link type"))
# Step 3: Record asymmetric paths as split # Step 3: Record asymmetric paths as split
not_connected_terminations = [termination.link for termination in terminations if termination.link is None] not_connected_terminations = [termination.link for termination in terminations if termination.link is None]
@ -653,14 +660,18 @@ class CablePath(models.Model):
positions = position_stack.pop() positions = position_stack.pop()
# Ensure we have a number of positions equal to the amount of remote terminations # Ensure we have a number of positions equal to the amount of remote terminations
assert len(remote_terminations) == len(positions) if len(remote_terminations) != len(positions):
raise UnsupportedCablePath(
_("All positions counts within the path on opposite ends of links must match")
)
# Get our front ports # Get our front ports
q_filter = Q() q_filter = Q()
for rt in remote_terminations: for rt in remote_terminations:
position = positions.pop() position = positions.pop()
q_filter |= Q(rear_port_id=rt.pk, rear_port_position=position) q_filter |= Q(rear_port_id=rt.pk, rear_port_position=position)
assert q_filter is not Q() if q_filter is Q():
raise UnsupportedCablePath(_("Remote termination position filter is missing"))
front_ports = FrontPort.objects.filter(q_filter) front_ports = FrontPort.objects.filter(q_filter)
# Obtain the individual front ports based on the termination and position # Obtain the individual front ports based on the termination and position
elif position_stack: elif position_stack:

View File

@ -5,6 +5,7 @@ from dcim.choices import LinkStatusChoices
from dcim.models import * from dcim.models import *
from dcim.svg import CableTraceSVG from dcim.svg import CableTraceSVG
from dcim.utils import object_to_path_node from dcim.utils import object_to_path_node
from utilities.exceptions import AbortRequest
class CablePathTestCase(TestCase): class CablePathTestCase(TestCase):
@ -2470,7 +2471,7 @@ class CablePathTestCase(TestCase):
b_terminations=[frontport1, frontport3], b_terminations=[frontport1, frontport3],
label='C1' label='C1'
) )
with self.assertRaises(AssertionError): with self.assertRaises(AbortRequest):
cable1.save() cable1.save()
self.assertPathDoesNotExist( self.assertPathDoesNotExist(
@ -2489,7 +2490,7 @@ class CablePathTestCase(TestCase):
label='C3' label='C3'
) )
with self.assertRaises(AssertionError): with self.assertRaises(AbortRequest):
cable3.save() cable3.save()
self.assertPathDoesNotExist( self.assertPathDoesNotExist(

View File

@ -3,8 +3,10 @@ import logging
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from dcim.exceptions import UnsupportedCablePath
from dcim.models import CablePath, Interface from dcim.models import CablePath, Interface
from dcim.utils import create_cablepath from dcim.utils import create_cablepath
from utilities.exceptions import AbortRequest
from .models import WirelessLink from .models import WirelessLink
@ -34,7 +36,10 @@ def update_connected_interfaces(instance, created, raw=False, **kwargs):
# Create/update cable paths # Create/update cable paths
if created: if created:
for interface in (instance.interface_a, instance.interface_b): for interface in (instance.interface_a, instance.interface_b):
create_cablepath([interface]) try:
create_cablepath([interface])
except UnsupportedCablePath as e:
raise AbortRequest(e)
@receiver(post_delete, sender=WirelessLink) @receiver(post_delete, sender=WirelessLink)