diff --git a/netbox/dcim/models/devices.py b/netbox/dcim/models/devices.py index 3b13b1f73..0cb1ea970 100644 --- a/netbox/dcim/models/devices.py +++ b/netbox/dcim/models/devices.py @@ -980,6 +980,9 @@ class Cable(ChangeLoggedModel, CustomFieldModel): # A copy of the PK to be used by __str__ in case the object is deleted self._pk = self.pk + # Cache the original status so we can check later if it's been changed + self._orig_status = self.status + @classmethod def from_db(cls, db, field_names, values): """ @@ -992,8 +995,6 @@ class Cable(ChangeLoggedModel, CustomFieldModel): instance._orig_termination_b_type_id = instance.termination_b_type_id instance._orig_termination_b_id = instance.termination_b_id - instance._orig_status = instance.status - return instance def __str__(self): diff --git a/netbox/dcim/signals.py b/netbox/dcim/signals.py index 0c5da6160..9b0493e34 100644 --- a/netbox/dcim/signals.py +++ b/netbox/dcim/signals.py @@ -86,7 +86,7 @@ def update_connected_endpoints(instance, created, **kwargs): # may change in the future.) However, we do need to capture status changes and update # any CablePaths accordingly. if instance.status != CableStatusChoices.STATUS_CONNECTED: - CablePath.objects.filter(path__contains=object_to_path_node(instance)).update(is_connected=False) + CablePath.objects.filter(path__contains=[object_to_path_node(instance)]).update(is_connected=False) else: rebuild_paths(instance) diff --git a/netbox/dcim/tests/test_cablepaths.py b/netbox/dcim/tests/test_cablepaths.py index 529935eb3..362b3804b 100644 --- a/netbox/dcim/tests/test_cablepaths.py +++ b/netbox/dcim/tests/test_cablepaths.py @@ -2,6 +2,7 @@ from django.contrib.contenttypes.models import ContentType from django.test import TestCase from circuits.models import * +from dcim.choices import CableStatusChoices from dcim.models import * from dcim.utils import objects_to_path @@ -70,13 +71,14 @@ class CablePathTestCase(TestCase): ] CircuitTermination.objects.bulk_create(cls.circuit_terminations) - def assertPathExists(self, origin, destination, path=None, msg=None): + def assertPathExists(self, origin, destination, path=None, is_connected=None, msg=None): """ Assert that a CablePath from origin to destination with a specific intermediate path exists. :param origin: Originating endpoint :param destination: Terminating endpoint, or None :param path: Sequence of objects comprising the intermediate path (optional) + :param is_connected: Boolean indicating whether the end-to-end path is complete and active (optional) :param msg: Custom failure message (optional) """ kwargs = { @@ -91,6 +93,8 @@ class CablePathTestCase(TestCase): kwargs['destination_id__isnull'] = True if path is not None: kwargs['path'] = objects_to_path(*path) + if is_connected is not None: + kwargs['is_connected'] = is_connected if msg is None: if destination is not None: msg = f"Missing path from {origin} to {destination}" @@ -108,12 +112,14 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=self.interfaces[1], - path=(cable1,) + path=(cable1,), + is_connected=True ) self.assertPathExists( origin=self.interfaces[1], destination=self.interfaces[0], - path=(cable1,) + path=(cable1,), + is_connected=True ) self.assertEqual(CablePath.objects.count(), 2) @@ -133,7 +139,8 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=None, - path=(cable1, self.front_ports[16], self.rear_ports[4]) + path=(cable1, self.front_ports[16], self.rear_ports[4]), + is_connected=False ) self.assertEqual(CablePath.objects.count(), 1) @@ -143,12 +150,14 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=self.interfaces[1], - path=(cable1, self.front_ports[16], self.rear_ports[4], cable2) + path=(cable1, self.front_ports[16], self.rear_ports[4], cable2), + is_connected=True ) self.assertPathExists( origin=self.interfaces[1], destination=self.interfaces[0], - path=(cable2, self.rear_ports[4], self.front_ports[16], cable1) + path=(cable2, self.rear_ports[4], self.front_ports[16], cable1), + is_connected=True ) self.assertEqual(CablePath.objects.count(), 2) @@ -157,7 +166,8 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=None, - path=(cable1, self.front_ports[16], self.rear_ports[4]) + path=(cable1, self.front_ports[16], self.rear_ports[4]), + is_connected=False ) self.assertEqual(CablePath.objects.count(), 1) @@ -174,12 +184,14 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=None, - path=(cable1, self.front_ports[0], self.rear_ports[0]) + path=(cable1, self.front_ports[0], self.rear_ports[0]), + is_connected=False ) self.assertPathExists( origin=self.interfaces[1], destination=None, - path=(cable2, self.front_ports[1], self.rear_ports[0]) + path=(cable2, self.front_ports[1], self.rear_ports[0]), + is_connected=False ) self.assertEqual(CablePath.objects.count(), 2) @@ -189,12 +201,14 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=None, - path=(cable1, self.front_ports[0], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[4]) + path=(cable1, self.front_ports[0], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[4]), + is_connected=False ) self.assertPathExists( origin=self.interfaces[1], destination=None, - path=(cable2, self.front_ports[1], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[5]) + path=(cable2, self.front_ports[1], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[5]), + is_connected=False ) self.assertEqual(CablePath.objects.count(), 2) @@ -209,7 +223,8 @@ class CablePathTestCase(TestCase): path=( cable1, self.front_ports[0], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[4], cable4, - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[1], @@ -217,7 +232,8 @@ class CablePathTestCase(TestCase): path=( cable2, self.front_ports[1], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[5], cable5, - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[2], @@ -225,7 +241,8 @@ class CablePathTestCase(TestCase): path=( cable4, self.front_ports[4], self.rear_ports[1], cable3, self.rear_ports[0], self.front_ports[0], cable1 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[3], @@ -233,7 +250,8 @@ class CablePathTestCase(TestCase): path=( cable5, self.front_ports[5], self.rear_ports[1], cable3, self.rear_ports[0], self.front_ports[1], cable2 - ) + ), + is_connected=True ) self.assertEqual(CablePath.objects.count(), 4) @@ -277,7 +295,8 @@ class CablePathTestCase(TestCase): cable1, self.front_ports[0], self.rear_ports[0], cable3, self.front_ports[4], self.rear_ports[1], cable4, self.rear_ports[2], self.front_ports[8], cable5, self.rear_ports[3], self.front_ports[12], cable6 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[1], @@ -286,7 +305,8 @@ class CablePathTestCase(TestCase): cable2, self.front_ports[1], self.rear_ports[0], cable3, self.front_ports[4], self.rear_ports[1], cable4, self.rear_ports[2], self.front_ports[8], cable5, self.rear_ports[3], self.front_ports[13], cable7 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[2], @@ -295,7 +315,8 @@ class CablePathTestCase(TestCase): cable6, self.front_ports[12], self.rear_ports[3], cable5, self.front_ports[8], self.rear_ports[2], cable4, self.rear_ports[1], self.front_ports[4], cable3, self.rear_ports[0], self.front_ports[0], cable1 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[3], @@ -304,7 +325,8 @@ class CablePathTestCase(TestCase): cable7, self.front_ports[13], self.rear_ports[3], cable5, self.front_ports[8], self.rear_ports[2], cable4, self.rear_ports[1], self.front_ports[4], cable3, self.rear_ports[0], self.front_ports[1], cable2 - ) + ), + is_connected=True ) self.assertEqual(CablePath.objects.count(), 4) @@ -342,7 +364,8 @@ class CablePathTestCase(TestCase): path=( cable1, self.front_ports[0], self.rear_ports[0], cable3, self.front_ports[16], self.rear_ports[4], cable4, self.rear_ports[1], self.front_ports[4], cable5 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[1], @@ -350,7 +373,8 @@ class CablePathTestCase(TestCase): path=( cable2, self.front_ports[1], self.rear_ports[0], cable3, self.front_ports[16], self.rear_ports[4], cable4, self.rear_ports[1], self.front_ports[5], cable6 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[2], @@ -358,7 +382,8 @@ class CablePathTestCase(TestCase): path=( cable5, self.front_ports[4], self.rear_ports[1], cable4, self.rear_ports[4], self.front_ports[16], cable3, self.rear_ports[0], self.front_ports[0], cable1 - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[3], @@ -366,7 +391,8 @@ class CablePathTestCase(TestCase): path=( cable6, self.front_ports[5], self.rear_ports[1], cable4, self.rear_ports[4], self.front_ports[16], cable3, self.rear_ports[0], self.front_ports[1], cable2 - ) + ), + is_connected=True ) self.assertEqual(CablePath.objects.count(), 4) @@ -392,7 +418,8 @@ class CablePathTestCase(TestCase): self.assertPathExists( origin=self.interfaces[0], destination=None, - path=(cable1, self.front_ports[16], self.rear_ports[4], cable2, self.rear_ports[5], self.front_ports[17]) + path=(cable1, self.front_ports[16], self.rear_ports[4], cable2, self.rear_ports[5], self.front_ports[17]), + is_connected=False ) self.assertEqual(CablePath.objects.count(), 1) @@ -405,7 +432,8 @@ class CablePathTestCase(TestCase): path=( cable1, self.front_ports[16], self.rear_ports[4], cable2, self.rear_ports[5], self.front_ports[17], cable3, - ) + ), + is_connected=True ) self.assertPathExists( origin=self.interfaces[1], @@ -413,6 +441,54 @@ class CablePathTestCase(TestCase): path=( cable3, self.front_ports[17], self.rear_ports[5], cable2, self.rear_ports[4], self.front_ports[16], cable1, - ) + ), + is_connected=True + ) + self.assertEqual(CablePath.objects.count(), 2) + + def test_07_change_cable_status(self): + """ + [IF1] --C1-- [FP5] [RP5] --C2-- [IF2] + """ + # Create cables 1 and 2 + cable1 = Cable(termination_a=self.interfaces[0], termination_b=self.front_ports[16]) + cable1.save() + cable2 = Cable(termination_a=self.rear_ports[4], termination_b=self.interfaces[1]) + cable2.save() + self.assertEqual(CablePath.objects.filter(is_connected=True).count(), 2) + self.assertEqual(CablePath.objects.count(), 2) + + # Change cable 2's status to "planned" + cable2.status = CableStatusChoices.STATUS_PLANNED + cable2.save() + self.assertPathExists( + origin=self.interfaces[0], + destination=self.interfaces[1], + path=(cable1, self.front_ports[16], self.rear_ports[4], cable2), + is_connected=False + ) + self.assertPathExists( + origin=self.interfaces[1], + destination=self.interfaces[0], + path=(cable2, self.rear_ports[4], self.front_ports[16], cable1), + is_connected=False + ) + self.assertEqual(CablePath.objects.count(), 2) + + # Change cable 2's status to "connected" + cable2 = Cable.objects.get(pk=cable2.pk) + cable2.status = CableStatusChoices.STATUS_CONNECTED + cable2.save() + self.assertPathExists( + origin=self.interfaces[0], + destination=self.interfaces[1], + path=(cable1, self.front_ports[16], self.rear_ports[4], cable2), + is_connected=True + ) + self.assertPathExists( + origin=self.interfaces[1], + destination=self.interfaces[0], + path=(cable2, self.rear_ports[4], self.front_ports[16], cable1), + is_connected=True ) self.assertEqual(CablePath.objects.count(), 2) diff --git a/netbox/dcim/utils.py b/netbox/dcim/utils.py index f97a1e8f0..16d0753ba 100644 --- a/netbox/dcim/utils.py +++ b/netbox/dcim/utils.py @@ -63,4 +63,7 @@ def trace_path(node): destination = peer_termination break + if destination is None: + is_connected = False + return path, destination, is_connected