diff --git a/netbox/dcim/models/cables.py b/netbox/dcim/models/cables.py index d397e92ac..b848e5fd2 100644 --- a/netbox/dcim/models/cables.py +++ b/netbox/dcim/models/cables.py @@ -19,7 +19,7 @@ from utilities.fields import ColorField from utilities.querysets import RestrictedQuerySet from utilities.utils import to_meters from wireless.models import WirelessLink -from .device_components import FrontPort, RearPort +from .device_components import FrontPort, RearPort, PathEndpoint __all__ = ( 'Cable', @@ -501,6 +501,10 @@ class CablePath(models.Model): # Terminations must all be of the same type assert all(isinstance(t, type(terminations[0])) for t in terminations[1:]) + # All mid-span terminations must all be attached to the same device + if not isinstance(terminations[0], PathEndpoint): + assert all(t.device == terminations[0].device for t in terminations[1:]) + # Check for a split path (e.g. rear port fanning out to multiple front ports with # different cables attached) if len(set(t.link for t in terminations)) > 1 and ( @@ -525,7 +529,7 @@ class CablePath(models.Model): assert all(type(link) in (Cable, WirelessLink) for link in links) # Step 3: Record the links - path.append([object_to_path_node(link) for link in links]) + path.append([object_to_path_node(link) for link in list(set(links))]) # Step 4: Update the path status if a link is not connected links_status = [ @@ -576,7 +580,12 @@ class CablePath(models.Model): terminations = rear_ports elif isinstance(remote_terminations[0], RearPort): - if len(remote_terminations) > 1 and position_stack: + if len(remote_terminations) == 1 and remote_terminations[0].positions == 1: + front_ports = FrontPort.objects.filter( + rear_port_id__in=[rp.pk for rp in remote_terminations], + rear_port_position=1 + ) + elif len(remote_terminations) > 1 and position_stack: positions = position_stack.pop() assert len(remote_terminations) == len(positions) q_filter = Q() @@ -590,11 +599,6 @@ class CablePath(models.Model): rear_port_id=remote_terminations[0].pk, rear_port_position__in=position_stack.pop() ) - elif len(remote_terminations) == 1: - front_ports = FrontPort.objects.filter( - rear_port_id__in=[rp.pk for rp in remote_terminations], - rear_port_position=1 - ) else: # No position indicated: path has split, so we stop at the RearPorts is_split = True @@ -636,12 +640,14 @@ class CablePath(models.Model): is_complete = True break - return cls( + cablepath = cls( path=path, is_complete=is_complete, is_active=is_active, is_split=is_split ) + print(f'{cablepath}::{cablepath.path}:{is_complete}:{is_active}:{is_split}') + return cablepath def retrace(self): """ diff --git a/netbox/dcim/tests/test_cablepaths.py b/netbox/dcim/tests/test_cablepaths.py index 3367a3efe..a34db383d 100644 --- a/netbox/dcim/tests/test_cablepaths.py +++ b/netbox/dcim/tests/test_cablepaths.py @@ -1695,6 +1695,78 @@ class CablePathTestCase(TestCase): self.assertPathIsSet(interface3, path3) self.assertPathIsSet(interface4, path4) + def test_219_interface_to_interface_duplex_via_multiple_rearports(self): + """ + [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2] + [FP3] [RP3] --C4-- [RP4] [FP4] + """ + interface1 = Interface.objects.create(device=self.device, name='Interface 1') + interface2 = Interface.objects.create(device=self.device, name='Interface 2') + rearport1 = RearPort.objects.create(device=self.device, name='Rear Port 1', positions=1) + rearport2 = RearPort.objects.create(device=self.device, name='Rear Port 2', positions=1) + rearport3 = RearPort.objects.create(device=self.device, name='Rear Port 3', positions=1) + rearport4 = RearPort.objects.create(device=self.device, name='Rear Port 4', positions=1) + frontport1 = FrontPort.objects.create( + device=self.device, name='Front Port 1', rear_port=rearport1, rear_port_position=1 + ) + frontport2 = FrontPort.objects.create( + device=self.device, name='Front Port 2', rear_port=rearport2, rear_port_position=1 + ) + frontport3 = FrontPort.objects.create( + device=self.device, name='Front Port 3', rear_port=rearport3, rear_port_position=1 + ) + frontport4 = FrontPort.objects.create( + device=self.device, name='Front Port 4', rear_port=rearport4, rear_port_position=1 + ) + + cable2 = Cable( + a_terminations=[rearport1], + b_terminations=[rearport2] + ) + cable2.save() + cable4 = Cable( + a_terminations=[rearport3], + b_terminations=[rearport4] + ) + cable4.save() + self.assertEqual(CablePath.objects.count(), 0) + + # Create cable1 + cable1 = Cable( + a_terminations=[interface1], + b_terminations=[frontport1, frontport3] + ) + cable1.save() + self.assertPathExists( + (interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4), (rearport2, rearport4), (frontport2, frontport4)), + is_complete=False + ) + self.assertEqual(CablePath.objects.count(), 1) + + # Create cable 3 + cable3 = Cable( + a_terminations=[frontport2, frontport4], + b_terminations=[interface2] + ) + cable3.save() + self.assertPathExists( + ( + interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4), + (rearport2, rearport4), (frontport2, frontport4), cable3, interface2 + ), + is_complete=True, + is_active=True + ) + self.assertPathExists( + ( + interface2, cable3, (frontport2, frontport4), (rearport2, rearport4), (cable2, cable4), + (rearport1, rearport3), (frontport1, frontport3), cable1, interface1 + ), + is_complete=True, + is_active=True + ) + self.assertEqual(CablePath.objects.count(), 2) + def test_301_create_path_via_existing_cable(self): """ [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]