Fix failures and add test

This commit is contained in:
Daniel Sheppard 2023-08-04 23:54:45 -05:00
parent 32c4f3c8a6
commit a797995ebb
2 changed files with 87 additions and 9 deletions

View File

@ -19,7 +19,7 @@ from utilities.fields import ColorField
from utilities.querysets import RestrictedQuerySet
from utilities.utils import to_meters
from wireless.models import WirelessLink
from .device_components import FrontPort, RearPort
from .device_components import FrontPort, RearPort, PathEndpoint
__all__ = (
'Cable',
@ -501,6 +501,10 @@ class CablePath(models.Model):
# Terminations must all be of the same type
assert all(isinstance(t, type(terminations[0])) for t in terminations[1:])
# All mid-span terminations must all be attached to the same device
if not isinstance(terminations[0], PathEndpoint):
assert all(t.device == terminations[0].device for t in terminations[1:])
# Check for a split path (e.g. rear port fanning out to multiple front ports with
# different cables attached)
if len(set(t.link for t in terminations)) > 1 and (
@ -525,7 +529,7 @@ class CablePath(models.Model):
assert all(type(link) in (Cable, WirelessLink) for link in links)
# Step 3: Record the links
path.append([object_to_path_node(link) for link in links])
path.append([object_to_path_node(link) for link in list(set(links))])
# Step 4: Update the path status if a link is not connected
links_status = [
@ -576,7 +580,12 @@ class CablePath(models.Model):
terminations = rear_ports
elif isinstance(remote_terminations[0], RearPort):
if len(remote_terminations) > 1 and position_stack:
if len(remote_terminations) == 1 and remote_terminations[0].positions == 1:
front_ports = FrontPort.objects.filter(
rear_port_id__in=[rp.pk for rp in remote_terminations],
rear_port_position=1
)
elif len(remote_terminations) > 1 and position_stack:
positions = position_stack.pop()
assert len(remote_terminations) == len(positions)
q_filter = Q()
@ -590,11 +599,6 @@ class CablePath(models.Model):
rear_port_id=remote_terminations[0].pk,
rear_port_position__in=position_stack.pop()
)
elif len(remote_terminations) == 1:
front_ports = FrontPort.objects.filter(
rear_port_id__in=[rp.pk for rp in remote_terminations],
rear_port_position=1
)
else:
# No position indicated: path has split, so we stop at the RearPorts
is_split = True
@ -636,12 +640,14 @@ class CablePath(models.Model):
is_complete = True
break
return cls(
cablepath = cls(
path=path,
is_complete=is_complete,
is_active=is_active,
is_split=is_split
)
print(f'{cablepath}::{cablepath.path}:{is_complete}:{is_active}:{is_split}')
return cablepath
def retrace(self):
"""

View File

@ -1695,6 +1695,78 @@ class CablePathTestCase(TestCase):
self.assertPathIsSet(interface3, path3)
self.assertPathIsSet(interface4, path4)
def test_219_interface_to_interface_duplex_via_multiple_rearports(self):
"""
[IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
[FP3] [RP3] --C4-- [RP4] [FP4]
"""
interface1 = Interface.objects.create(device=self.device, name='Interface 1')
interface2 = Interface.objects.create(device=self.device, name='Interface 2')
rearport1 = RearPort.objects.create(device=self.device, name='Rear Port 1', positions=1)
rearport2 = RearPort.objects.create(device=self.device, name='Rear Port 2', positions=1)
rearport3 = RearPort.objects.create(device=self.device, name='Rear Port 3', positions=1)
rearport4 = RearPort.objects.create(device=self.device, name='Rear Port 4', positions=1)
frontport1 = FrontPort.objects.create(
device=self.device, name='Front Port 1', rear_port=rearport1, rear_port_position=1
)
frontport2 = FrontPort.objects.create(
device=self.device, name='Front Port 2', rear_port=rearport2, rear_port_position=1
)
frontport3 = FrontPort.objects.create(
device=self.device, name='Front Port 3', rear_port=rearport3, rear_port_position=1
)
frontport4 = FrontPort.objects.create(
device=self.device, name='Front Port 4', rear_port=rearport4, rear_port_position=1
)
cable2 = Cable(
a_terminations=[rearport1],
b_terminations=[rearport2]
)
cable2.save()
cable4 = Cable(
a_terminations=[rearport3],
b_terminations=[rearport4]
)
cable4.save()
self.assertEqual(CablePath.objects.count(), 0)
# Create cable1
cable1 = Cable(
a_terminations=[interface1],
b_terminations=[frontport1, frontport3]
)
cable1.save()
self.assertPathExists(
(interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4), (rearport2, rearport4), (frontport2, frontport4)),
is_complete=False
)
self.assertEqual(CablePath.objects.count(), 1)
# Create cable 3
cable3 = Cable(
a_terminations=[frontport2, frontport4],
b_terminations=[interface2]
)
cable3.save()
self.assertPathExists(
(
interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4),
(rearport2, rearport4), (frontport2, frontport4), cable3, interface2
),
is_complete=True,
is_active=True
)
self.assertPathExists(
(
interface2, cable3, (frontport2, frontport4), (rearport2, rearport4), (cable2, cable4),
(rearport1, rearport3), (frontport1, frontport3), cable1, interface1
),
is_complete=True,
is_active=True
)
self.assertEqual(CablePath.objects.count(), 2)
def test_301_create_path_via_existing_cable(self):
"""
[IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]