mirror of
https://github.com/netbox-community/netbox.git
synced 2026-01-09 05:12:18 -06:00
Compare commits
12 Commits
20923-dcim
...
1598ca9108
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1598ca9108 | ||
|
|
21f4036782 | ||
|
|
ce3738572c | ||
|
|
cbb979934e | ||
|
|
642d83a4c6 | ||
|
|
a06c12c6b8 | ||
|
|
59afa0b41d | ||
|
|
14b246cb8a | ||
|
|
f0507d00bf | ||
|
|
77b389f105 | ||
|
|
9ae53fc232 | ||
|
|
c111c08315 |
@@ -20,4 +20,4 @@ class ManufacturerSerializer(NetBoxModelSerializer):
|
||||
'id', 'url', 'display_url', 'display', 'name', 'slug', 'description', 'tags', 'custom_fields',
|
||||
'created', 'last_updated', 'devicetype_count', 'inventoryitem_count', 'platform_count',
|
||||
]
|
||||
brief_fields = ('id', 'url', 'display', 'name', 'slug', 'description', 'devicetype_count')
|
||||
brief_fields = ('id', 'url', 'display', 'name', 'slug', 'description')
|
||||
|
||||
@@ -5,6 +5,7 @@ from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.utils.safestring import mark_safe
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from circuits.models import Circuit
|
||||
from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.models import *
|
||||
@@ -1414,19 +1415,52 @@ class MACAddressImportForm(NetBoxModelImportForm):
|
||||
#
|
||||
|
||||
class CableImportForm(NetBoxModelImportForm):
|
||||
"""
|
||||
CSV bulk import form for cables.
|
||||
|
||||
Supports dynamic parent model resolution - terminations are identified by their parent
|
||||
object (device, circuit, or power panel) and termination name.
|
||||
|
||||
The parent field resolves to different models based on the termination type
|
||||
See CABLE_PARENT_MAPPING for supported termination types.
|
||||
"""
|
||||
|
||||
# Map cable termination content types to their parent model and lookup field.
|
||||
#
|
||||
# This mapping enables dynamic parent model resolution during cable CSV imports.
|
||||
# Each entry maps a termination type to a tuple of (parent_content_type, accessor):
|
||||
#
|
||||
# Format: 'app.model': ('parent_app.ParentModel', 'accessor')
|
||||
#
|
||||
CABLE_PARENT_MAPPING = {
|
||||
'dcim.interface': ('dcim.Device', 'name'),
|
||||
'dcim.consoleport': ('dcim.Device', 'name'),
|
||||
'dcim.consoleserverport': ('dcim.Device', 'name'),
|
||||
'dcim.powerport': ('dcim.Device', 'name'),
|
||||
'dcim.poweroutlet': ('dcim.Device', 'name'),
|
||||
'dcim.frontport': ('dcim.Device', 'name'),
|
||||
'dcim.rearport': ('dcim.Device', 'name'),
|
||||
'circuits.circuittermination': ('circuits.Circuit', 'cid'),
|
||||
'dcim.powerfeed': ('dcim.PowerPanel', 'name'),
|
||||
}
|
||||
# Map parent model name to (parent_field_name, termination_name_field, value_transform)
|
||||
TERMINATION_FIELDS = {
|
||||
'Circuit': ('circuit', 'term_side', str.upper),
|
||||
'Device': ('device', 'name', None),
|
||||
'PowerPanel': ('power_panel', 'name', None),
|
||||
}
|
||||
|
||||
# Termination A
|
||||
side_a_site = CSVModelChoiceField(
|
||||
label=_('Side A site'),
|
||||
queryset=Site.objects.all(),
|
||||
required=False,
|
||||
to_field_name='name',
|
||||
help_text=_('Site of parent device A (if any)'),
|
||||
help_text=_('Site of parent A (if any)')
|
||||
)
|
||||
side_a_device = CSVModelChoiceField(
|
||||
label=_('Side A device'),
|
||||
queryset=Device.objects.all(),
|
||||
to_field_name='name',
|
||||
help_text=_('Device name')
|
||||
side_a_parent = forms.CharField(
|
||||
label=_('Side A parent'),
|
||||
help_text=_('Device name, Circuit CID, or Power Panel name')
|
||||
)
|
||||
side_a_type = CSVContentTypeField(
|
||||
label=_('Side A type'),
|
||||
@@ -1445,13 +1479,11 @@ class CableImportForm(NetBoxModelImportForm):
|
||||
queryset=Site.objects.all(),
|
||||
required=False,
|
||||
to_field_name='name',
|
||||
help_text=_('Site of parent device B (if any)'),
|
||||
help_text=_('Site of parent B (if any)')
|
||||
)
|
||||
side_b_device = CSVModelChoiceField(
|
||||
label=_('Side B device'),
|
||||
queryset=Device.objects.all(),
|
||||
to_field_name='name',
|
||||
help_text=_('Device name')
|
||||
side_b_parent = forms.CharField(
|
||||
label=_('Side B parent'),
|
||||
help_text=_('Device name, Circuit CID, or Power Panel name')
|
||||
)
|
||||
side_b_type = CSVContentTypeField(
|
||||
label=_('Side B type'),
|
||||
@@ -1500,7 +1532,7 @@ class CableImportForm(NetBoxModelImportForm):
|
||||
class Meta:
|
||||
model = Cable
|
||||
fields = [
|
||||
'side_a_site', 'side_a_device', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_device', 'side_b_type',
|
||||
'side_a_site', 'side_a_parent', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_parent', 'side_b_type',
|
||||
'side_b_name', 'type', 'status', 'tenant', 'label', 'color', 'length', 'length_unit', 'description',
|
||||
'comments', 'tags',
|
||||
]
|
||||
@@ -1508,21 +1540,6 @@ class CableImportForm(NetBoxModelImportForm):
|
||||
def __init__(self, data=None, *args, **kwargs):
|
||||
super().__init__(data, *args, **kwargs)
|
||||
|
||||
if data:
|
||||
# Limit choices for side_a_device to the assigned side_a_site
|
||||
if side_a_site := data.get('side_a_site'):
|
||||
side_a_device_params = {f'site__{self.fields["side_a_site"].to_field_name}': side_a_site}
|
||||
self.fields['side_a_device'].queryset = self.fields['side_a_device'].queryset.filter(
|
||||
**side_a_device_params
|
||||
)
|
||||
|
||||
# Limit choices for side_b_device to the assigned side_b_site
|
||||
if side_b_site := data.get('side_b_site'):
|
||||
side_b_device_params = {f'site__{self.fields["side_b_site"].to_field_name}': side_b_site}
|
||||
self.fields['side_b_device'].queryset = self.fields['side_b_device'].queryset.filter(
|
||||
**side_b_device_params
|
||||
)
|
||||
|
||||
def _clean_side(self, side):
|
||||
"""
|
||||
Derive a Cable's A/B termination objects.
|
||||
@@ -1531,31 +1548,118 @@ class CableImportForm(NetBoxModelImportForm):
|
||||
"""
|
||||
assert side in 'ab', f"Invalid side designation: {side}"
|
||||
|
||||
device = self.cleaned_data.get(f'side_{side}_device')
|
||||
content_type = self.cleaned_data.get(f'side_{side}_type')
|
||||
site = self.cleaned_data.get(f'side_{side}_site')
|
||||
parent_value = self.cleaned_data.get(f'side_{side}_parent')
|
||||
name = self.cleaned_data.get(f'side_{side}_name')
|
||||
if not device or not content_type or not name:
|
||||
|
||||
if not parent_value or not content_type or not name: # pragma: no cover
|
||||
return None
|
||||
|
||||
model = content_type.model_class()
|
||||
# Get the parent model mapping from the submitted content_type
|
||||
parent_map = self.CABLE_PARENT_MAPPING.get(f'{content_type.app_label}.{content_type.model}')
|
||||
# This should never happen
|
||||
assert parent_map, (
|
||||
'Unknown cable termination content type parent mapping: '
|
||||
f'{content_type.app_label}.{content_type.model}'
|
||||
)
|
||||
|
||||
parent_content_type, parent_accessor = parent_map
|
||||
parent_app_label, parent_model_name = parent_content_type.split('.')
|
||||
|
||||
# Get the parent model class
|
||||
try:
|
||||
if device.virtual_chassis and device.virtual_chassis.master == device and \
|
||||
model.objects.filter(device=device, name=name).count() == 0:
|
||||
termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name)
|
||||
else:
|
||||
termination_object = model.objects.get(device=device, name=name)
|
||||
if termination_object.cable is not None and termination_object.cable != self.instance:
|
||||
raise forms.ValidationError(
|
||||
_("Side {side_upper}: {device} {termination_object} is already connected").format(
|
||||
side_upper=side.upper(), device=device, termination_object=termination_object
|
||||
)
|
||||
)
|
||||
except ObjectDoesNotExist:
|
||||
parent_ct = ContentType.objects.get(app_label=parent_app_label.lower(), model=parent_model_name.lower())
|
||||
parent_model: Device | PowerPanel | Circuit = parent_ct.model_class()
|
||||
except ContentType.DoesNotExist: # pragma: no cover
|
||||
# This should never happen
|
||||
raise AssertionError(f'Unknown cable termination parent content type: {parent_content_type}')
|
||||
|
||||
# Build query for parent lookup
|
||||
parent_query = {parent_accessor: parent_value}
|
||||
# Add site to query if provided
|
||||
if site:
|
||||
parent_query['site'] = site
|
||||
|
||||
# Look up the parent object
|
||||
try:
|
||||
parent_object = parent_model.objects.get(**parent_query)
|
||||
except parent_model.DoesNotExist:
|
||||
raise forms.ValidationError(
|
||||
_("{side_upper} side termination not found: {device} {name}").format(
|
||||
side_upper=side.upper(), device=device, name=name
|
||||
_('Side {side_upper}: {model_name} not found: {value}').format(
|
||||
side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value
|
||||
)
|
||||
)
|
||||
except parent_model.MultipleObjectsReturned:
|
||||
raise forms.ValidationError(
|
||||
_('Side {side_upper}: Multiple {model_name} objects found: {value}').format(
|
||||
side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value
|
||||
)
|
||||
)
|
||||
|
||||
# Get the termination model class
|
||||
termination_model = content_type.model_class()
|
||||
|
||||
# Build the query to find the termination object
|
||||
field_mapping = self.TERMINATION_FIELDS.get(parent_model.__name__)
|
||||
if not field_mapping: # pragma: no cover
|
||||
return None
|
||||
|
||||
parent_field, name_field, value_transform = field_mapping
|
||||
query = {parent_field: parent_object}
|
||||
|
||||
if value_transform:
|
||||
name = value_transform(name)
|
||||
|
||||
if name:
|
||||
query[name_field] = name
|
||||
|
||||
# Add site to query if provided (for site-scoped parents)
|
||||
if site and parent_field in ('device', 'power_panel'):
|
||||
query[f'{parent_field}__site'] = site
|
||||
|
||||
# Look up the termination object
|
||||
try:
|
||||
# Handle virtual chassis for device-based terminations
|
||||
if (parent_field == 'device' and
|
||||
parent_object.virtual_chassis and
|
||||
parent_object.virtual_chassis.master == parent_object and
|
||||
termination_model.objects.filter(**query).count() == 0):
|
||||
query[f'{parent_field}__in'] = parent_object.virtual_chassis.members.all()
|
||||
query.pop(parent_field, None)
|
||||
termination_object = termination_model.objects.get(**query)
|
||||
else:
|
||||
termination_object = termination_model.objects.get(**query)
|
||||
|
||||
# Check if already connected to a cable
|
||||
if termination_object.cable is not None and termination_object.cable != self.instance:
|
||||
raise forms.ValidationError(
|
||||
_('Side {side_upper}: {parent} {termination} is already connected').format(
|
||||
side_upper=side.upper(), parent=parent_object, termination=termination_object
|
||||
)
|
||||
)
|
||||
|
||||
# Circuit terminations can also be connected to provider networks
|
||||
if (name_field == 'term_side' and
|
||||
hasattr(termination_object, '_provider_network') and
|
||||
termination_object._provider_network is not None):
|
||||
raise forms.ValidationError(
|
||||
_('Side {side_upper}: {parent} {termination} is already connected to a provider network').format(
|
||||
side_upper=side.upper(), parent=parent_object, termination=termination_object
|
||||
)
|
||||
)
|
||||
except termination_model.DoesNotExist:
|
||||
raise forms.ValidationError(
|
||||
_('Side {side_upper}: {model_name} not found: {parent} {name}').format(
|
||||
side_upper=side.upper(),
|
||||
model_name=termination_model.__name__,
|
||||
parent=parent_object, name=name or '',
|
||||
),
|
||||
)
|
||||
except termination_model.MultipleObjectsReturned: # pragma: no cover
|
||||
# This should never happen
|
||||
raise AssertionError('Multiple termination objects returned for query: {query}'.format(query=query))
|
||||
|
||||
setattr(self.instance, f'{side}_terminations', [termination_object])
|
||||
return termination_object
|
||||
|
||||
|
||||
@@ -531,7 +531,7 @@ class RackReservationTest(APIViewTestCases.APIViewTestCase):
|
||||
|
||||
class ManufacturerTest(APIViewTestCases.APIViewTestCase):
|
||||
model = Manufacturer
|
||||
brief_fields = ['description', 'devicetype_count', 'display', 'id', 'name', 'slug', 'url']
|
||||
brief_fields = ['description', 'display', 'id', 'name', 'slug', 'url']
|
||||
create_data = [
|
||||
{
|
||||
'name': 'Manufacturer 4',
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
from django.test import TestCase
|
||||
|
||||
from circuits.models import Circuit, CircuitTermination, CircuitType, Provider, ProviderNetwork
|
||||
from dcim.choices import (
|
||||
DeviceFaceChoices, DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, PortTypeChoices,
|
||||
PowerOutletStatusChoices,
|
||||
CableTypeChoices, DeviceFaceChoices, DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices,
|
||||
PortTypeChoices, PowerOutletStatusChoices,
|
||||
)
|
||||
from dcim.forms import *
|
||||
from dcim.models import *
|
||||
@@ -411,3 +412,204 @@ class InterfaceTestCase(TestCase):
|
||||
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
|
||||
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
|
||||
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
|
||||
|
||||
|
||||
class CableImportFormTestCase(TestCase):
|
||||
"""
|
||||
Test cases for CableImportForm error handling and edge cases.
|
||||
|
||||
Note: Happy path scenarios (successful cable creation) are covered by
|
||||
dcim.tests.test_views.CableTestCase which tests the bulk import view.
|
||||
These tests focus on validation errors and edge cases not covered by the view tests.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Create sites
|
||||
cls.site_a = Site.objects.create(name='Site A', slug='site-a')
|
||||
cls.site_b = Site.objects.create(name='Site B', slug='site-b')
|
||||
|
||||
# Create manufacturer and device type
|
||||
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
|
||||
device_type = DeviceType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Device Type 1',
|
||||
slug='device-type-1',
|
||||
)
|
||||
role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1', color='ff0000')
|
||||
|
||||
# Create devices
|
||||
cls.device_a1 = Device.objects.create(
|
||||
name='Device-A1',
|
||||
device_type=device_type,
|
||||
role=role,
|
||||
site=cls.site_a,
|
||||
)
|
||||
cls.device_a2 = Device.objects.create(
|
||||
name='Device-A2',
|
||||
device_type=device_type,
|
||||
role=role,
|
||||
site=cls.site_a,
|
||||
)
|
||||
# Device with same name in different site
|
||||
cls.device_b_duplicate = Device.objects.create(
|
||||
name='Device-A1', # Same name as device_a1
|
||||
device_type=device_type,
|
||||
role=role,
|
||||
site=cls.site_b,
|
||||
)
|
||||
|
||||
# Create interfaces
|
||||
cls.interface_a1_eth0 = Interface.objects.create(
|
||||
device=cls.device_a1,
|
||||
name='eth0',
|
||||
type=InterfaceTypeChoices.TYPE_1GE_FIXED,
|
||||
)
|
||||
cls.interface_a2_eth0 = Interface.objects.create(
|
||||
device=cls.device_a2,
|
||||
name='eth0',
|
||||
type=InterfaceTypeChoices.TYPE_1GE_FIXED,
|
||||
)
|
||||
|
||||
# Create circuit for testing circuit not found error
|
||||
provider = Provider.objects.create(name='Provider 1', slug='provider-1')
|
||||
circuit_type = CircuitType.objects.create(name='Circuit Type 1', slug='circuit-type-1')
|
||||
cls.circuit = Circuit.objects.create(
|
||||
provider=provider,
|
||||
type=circuit_type,
|
||||
cid='CIRCUIT-001',
|
||||
)
|
||||
cls.circuit_term_a = CircuitTermination.objects.create(
|
||||
circuit=cls.circuit,
|
||||
term_side='A',
|
||||
)
|
||||
|
||||
# Create provider network for testing provider network validation
|
||||
cls.provider_network = ProviderNetwork.objects.create(
|
||||
provider=provider,
|
||||
name='Provider Network 1',
|
||||
)
|
||||
|
||||
def test_device_not_found(self):
|
||||
"""Test error when parent device is not found."""
|
||||
form = CableImportForm(data={
|
||||
'side_a_site': 'Site A',
|
||||
'side_a_parent': 'NonexistentDevice',
|
||||
'side_a_type': 'dcim.interface',
|
||||
'side_a_name': 'eth0',
|
||||
'side_b_site': 'Site A',
|
||||
'side_b_parent': 'Device-A2',
|
||||
'side_b_type': 'dcim.interface',
|
||||
'side_b_name': 'eth0',
|
||||
'type': CableTypeChoices.TYPE_CAT6,
|
||||
'status': 'connected',
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
self.assertIn('Side A: Device not found: NonexistentDevice', str(form.errors))
|
||||
|
||||
def test_circuit_not_found(self):
|
||||
"""Test error when circuit is not found."""
|
||||
form = CableImportForm(data={
|
||||
'side_a_site': None,
|
||||
'side_a_parent': 'NONEXISTENT-CID',
|
||||
'side_a_type': 'circuits.circuittermination',
|
||||
'side_a_name': 'A',
|
||||
'side_b_site': 'Site A',
|
||||
'side_b_parent': 'Device-A1',
|
||||
'side_b_type': 'dcim.interface',
|
||||
'side_b_name': 'eth0',
|
||||
'type': CableTypeChoices.TYPE_MMF_OM4,
|
||||
'status': 'connected',
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
self.assertIn('Side A: Circuit not found: NONEXISTENT-CID', str(form.errors))
|
||||
|
||||
def test_termination_not_found(self):
|
||||
"""Test error when termination is not found on parent."""
|
||||
form = CableImportForm(data={
|
||||
'side_a_site': 'Site A',
|
||||
'side_a_parent': 'Device-A1',
|
||||
'side_a_type': 'dcim.interface',
|
||||
'side_a_name': 'eth999', # Nonexistent interface
|
||||
'side_b_site': 'Site A',
|
||||
'side_b_parent': 'Device-A2',
|
||||
'side_b_type': 'dcim.interface',
|
||||
'side_b_name': 'eth0',
|
||||
'type': CableTypeChoices.TYPE_CAT6,
|
||||
'status': 'connected',
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
self.assertIn('Side A: Interface not found', str(form.errors))
|
||||
|
||||
def test_termination_already_cabled(self):
|
||||
"""Test error when termination is already connected to a cable."""
|
||||
# Create an existing cable
|
||||
existing_cable = Cable.objects.create(type=CableTypeChoices.TYPE_CAT6, status='connected')
|
||||
self.interface_a1_eth0.cable = existing_cable
|
||||
self.interface_a1_eth0.save()
|
||||
|
||||
form = CableImportForm(data={
|
||||
'side_a_site': 'Site A',
|
||||
'side_a_parent': 'Device-A1',
|
||||
'side_a_type': 'dcim.interface',
|
||||
'side_a_name': 'eth0',
|
||||
'side_b_site': 'Site A',
|
||||
'side_b_parent': 'Device-A2',
|
||||
'side_b_type': 'dcim.interface',
|
||||
'side_b_name': 'eth0',
|
||||
'type': CableTypeChoices.TYPE_CAT6,
|
||||
'status': 'connected',
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
self.assertIn('already connected', str(form.errors))
|
||||
|
||||
def test_circuit_termination_with_provider_network(self):
|
||||
"""Test error when circuit termination is already connected to a provider network."""
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
# Connect circuit termination to provider network
|
||||
circuit_term = CircuitTermination.objects.get(pk=self.circuit_term_a.pk)
|
||||
pn_ct = ContentType.objects.get_for_model(ProviderNetwork)
|
||||
circuit_term.termination_type = pn_ct
|
||||
circuit_term.termination_id = self.provider_network.pk
|
||||
circuit_term.save()
|
||||
|
||||
try:
|
||||
form = CableImportForm(data={
|
||||
'side_a_site': None,
|
||||
'side_a_parent': 'CIRCUIT-001',
|
||||
'side_a_type': 'circuits.circuittermination',
|
||||
'side_a_name': 'A',
|
||||
'side_b_site': 'Site A',
|
||||
'side_b_parent': 'Device-A1',
|
||||
'side_b_type': 'dcim.interface',
|
||||
'side_b_name': 'eth0',
|
||||
'type': CableTypeChoices.TYPE_MMF_OM4,
|
||||
'status': 'connected',
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
self.assertIn('already connected to a provider network', str(form.errors))
|
||||
finally:
|
||||
# Clean up: remove provider network connection
|
||||
circuit_term.termination_type = None
|
||||
circuit_term.termination_id = None
|
||||
circuit_term.save()
|
||||
|
||||
def test_multiple_parents_without_site(self):
|
||||
"""Test error when multiple parent objects are found without site scoping."""
|
||||
# Device-A1 exists in both site_a and site_b
|
||||
# Try to find device without specifying site
|
||||
form = CableImportForm(data={
|
||||
'side_a_site': '', # Empty site - should cause multiple matches
|
||||
'side_a_parent': 'Device-A1',
|
||||
'side_a_type': 'dcim.interface',
|
||||
'side_a_name': 'eth0',
|
||||
'side_b_site': 'Site A',
|
||||
'side_b_parent': 'Device-A2',
|
||||
'side_b_type': 'dcim.interface',
|
||||
'side_b_name': 'eth0',
|
||||
'type': CableTypeChoices.TYPE_CAT6,
|
||||
'status': 'connected',
|
||||
})
|
||||
self.assertFalse(form.is_valid())
|
||||
self.assertIn('Multiple Device objects found', str(form.errors))
|
||||
|
||||
@@ -11,6 +11,7 @@ from core.models import ObjectType
|
||||
from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.models import *
|
||||
from circuits.models import Circuit, CircuitTermination, CircuitType, Provider
|
||||
from ipam.models import ASN, RIR, VLAN, VRF
|
||||
from netbox.choices import CSVDelimiterChoices, ImportFormatChoices, WeightUnitChoices
|
||||
from tenancy.models import Tenant
|
||||
@@ -3495,7 +3496,7 @@ class CableTestCase(
|
||||
Interface(device=devices[4], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED),
|
||||
Interface(device=devices[4], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED),
|
||||
|
||||
# Device 1, Site 2
|
||||
# Device 5, Site 2
|
||||
Interface(device=devices[5], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED),
|
||||
Interface(device=devices[5], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED),
|
||||
Interface(device=devices[5], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED),
|
||||
@@ -3507,6 +3508,22 @@ class CableTestCase(
|
||||
)
|
||||
Interface.objects.bulk_create(interfaces)
|
||||
|
||||
ConsolePort.objects.create(device=devices[0], name='Console 1')
|
||||
ConsoleServerPort.objects.create(device=devices[1], name='Console Server 1')
|
||||
|
||||
power_panel = PowerPanel.objects.create(site=sites[0], name='Power Panel 1')
|
||||
PowerFeed.objects.create(power_panel=power_panel, name='Feed 1')
|
||||
PowerPort.objects.create(device=devices[0], name='PSU1')
|
||||
|
||||
provider = Provider.objects.create(name='Provider 1', slug='provider-1')
|
||||
circuit_type = CircuitType.objects.create(name='Circuit Type 1', slug='circuit-type-1')
|
||||
circuit = Circuit.objects.create(provider=provider, type=circuit_type, cid='CIRCUIT-001')
|
||||
circuit_terminations = (
|
||||
CircuitTermination(circuit=circuit, term_side='A'),
|
||||
CircuitTermination(circuit=circuit, term_side='Z'),
|
||||
)
|
||||
CircuitTermination.objects.bulk_create(circuit_terminations)
|
||||
|
||||
cable1 = Cable(a_terminations=[interfaces[0]], b_terminations=[interfaces[3]], type=CableTypeChoices.TYPE_CAT6)
|
||||
cable1.save()
|
||||
cable2 = Cable(a_terminations=[interfaces[1]], b_terminations=[interfaces[4]], type=CableTypeChoices.TYPE_CAT6)
|
||||
@@ -3532,7 +3549,7 @@ class CableTestCase(
|
||||
|
||||
cls.csv_data = {
|
||||
'default': (
|
||||
"side_a_device,side_a_type,side_a_name,side_b_device,side_b_type,side_b_name",
|
||||
"side_a_parent,side_a_type,side_a_name,side_b_parent,side_b_type,side_b_name",
|
||||
"Device 4,dcim.interface,Interface 1,Device 5,dcim.interface,Interface 1",
|
||||
"Device 3,dcim.interface,Interface 2,Device 4,dcim.interface,Interface 2",
|
||||
"Device 3,dcim.interface,Interface 3,Device 4,dcim.interface,Interface 3",
|
||||
@@ -3545,12 +3562,28 @@ class CableTestCase(
|
||||
'site-filtering': (
|
||||
# Ensure that CSV bulk import supports assigning terminations from parent devices
|
||||
# that share the same device name, provided those devices belong to different sites.
|
||||
"side_a_site,side_a_device,side_a_type,side_a_name,side_b_site,side_b_device,side_b_type,side_b_name",
|
||||
"side_a_site,side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name",
|
||||
"Site 1,Device 3,dcim.interface,Interface 1,Site 2,Device 1,dcim.interface,Interface 1",
|
||||
"Site 1,Device 3,dcim.interface,Interface 2,Site 2,Device 1,dcim.interface,Interface 2",
|
||||
"Site 1,Device 3,dcim.interface,Interface 3,Site 2,Device 1,dcim.interface,Interface 3",
|
||||
"Site 1,Device 1,dcim.interface,Device 2 Interface,Site 2,Device 1,dcim.interface,Interface 4",
|
||||
"Site 1,Device 1,dcim.interface,Device 3 Interface,Site 2,Device 1,dcim.interface,Interface 5",
|
||||
),
|
||||
'circuits': (
|
||||
# Test circuit termination to interface cables
|
||||
"side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name",
|
||||
"CIRCUIT-001,circuits.circuittermination,A,Site 1,Device 4,dcim.interface,Interface 2",
|
||||
"CIRCUIT-001,circuits.circuittermination,z,Site 2,Device 5,dcim.interface,Interface 2",
|
||||
),
|
||||
'power': (
|
||||
# Test power feed to power port cables
|
||||
"side_a_site,side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name",
|
||||
"Site 1,Power Panel 1,dcim.powerfeed,Feed 1,Site 1,Device 1,dcim.powerport,PSU1",
|
||||
),
|
||||
'console': (
|
||||
# Test console port to console server port cables
|
||||
"side_a_site,side_a_parent,side_a_type,side_a_name,side_b_site,side_b_parent,side_b_type,side_b_name",
|
||||
"Site 1,Device 1,dcim.consoleport,Console 1,Site 1,Device 2,dcim.consoleserverport,Console Server 1",
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,9 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
|
||||
if snapshots:
|
||||
params["snapshots"] = snapshots
|
||||
if request:
|
||||
params["request"] = copy_safe_request(request)
|
||||
# Exclude FILES - webhooks don't need uploaded files,
|
||||
# which can cause pickle errors with Pillow.
|
||||
params["request"] = copy_safe_request(request, include_files=False)
|
||||
|
||||
# Enqueue the task
|
||||
rq_queue.enqueue(
|
||||
|
||||
@@ -230,10 +230,6 @@ class PrefixImportForm(ScopedImportForm, NetBoxModelImportForm):
|
||||
query |= Q(**{
|
||||
f"site__{self.fields['vlan_site'].to_field_name}": vlan_site
|
||||
})
|
||||
# Don't Forget to include VLANs without a site in the filter
|
||||
query |= Q(**{
|
||||
f"site__{self.fields['vlan_site'].to_field_name}__isnull": True
|
||||
})
|
||||
|
||||
if vlan_group:
|
||||
query &= Q(**{
|
||||
|
||||
@@ -564,6 +564,82 @@ vlan: 102
|
||||
self.assertEqual(prefix.vlan.vid, 102)
|
||||
self.assertEqual(prefix.scope, site)
|
||||
|
||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||
def test_prefix_import_with_vlan_site_multiple_vlans_same_vid(self):
|
||||
"""
|
||||
Test import when multiple VLANs exist with the same vid but different sites.
|
||||
Ref: #20560
|
||||
"""
|
||||
site1 = Site.objects.get(name='Site 1')
|
||||
site2 = Site.objects.get(name='Site 2')
|
||||
|
||||
# Create VLANs with the same vid but different sites
|
||||
vlan1 = VLAN.objects.create(vid=1, name='VLAN1-Site1', site=site1)
|
||||
VLAN.objects.create(vid=1, name='VLAN1-Site2', site=site2) # Create ambiguity
|
||||
|
||||
# Import prefix with vlan_site specified
|
||||
IMPORT_DATA = f"""
|
||||
prefix: 10.11.0.0/22
|
||||
status: active
|
||||
scope_type: dcim.site
|
||||
scope_id: {site1.pk}
|
||||
vlan_site: {site1.name}
|
||||
vlan: 1
|
||||
description: LOC02-MGMT
|
||||
"""
|
||||
|
||||
# Add all required permissions to the test user
|
||||
self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
|
||||
|
||||
form_data = {
|
||||
'data': IMPORT_DATA,
|
||||
'format': 'yaml'
|
||||
}
|
||||
response = self.client.post(reverse('ipam:prefix_bulk_import'), data=form_data, follow=True)
|
||||
self.assertHttpStatus(response, 200)
|
||||
|
||||
# Verify the prefix was created with the correct VLAN
|
||||
prefix = Prefix.objects.get(prefix='10.11.0.0/22')
|
||||
self.assertEqual(prefix.vlan, vlan1)
|
||||
|
||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||
def test_prefix_import_with_vlan_site_and_global_vlan(self):
|
||||
"""
|
||||
Test import when a global VLAN (no site) and site-specific VLAN exist with same vid.
|
||||
When vlan_site is specified, should prefer the site-specific VLAN.
|
||||
Ref: #20560
|
||||
"""
|
||||
site1 = Site.objects.get(name='Site 1')
|
||||
|
||||
# Create a global VLAN (no site) and a site-specific VLAN with the same vid
|
||||
VLAN.objects.create(vid=10, name='VLAN10-Global', site=None) # Create ambiguity
|
||||
vlan_site = VLAN.objects.create(vid=10, name='VLAN10-Site1', site=site1)
|
||||
|
||||
# Import prefix with vlan_site specified
|
||||
IMPORT_DATA = f"""
|
||||
prefix: 10.12.0.0/22
|
||||
status: active
|
||||
scope_type: dcim.site
|
||||
scope_id: {site1.pk}
|
||||
vlan_site: {site1.name}
|
||||
vlan: 10
|
||||
description: Test Site-Specific VLAN
|
||||
"""
|
||||
|
||||
# Add all required permissions to the test user
|
||||
self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
|
||||
|
||||
form_data = {
|
||||
'data': IMPORT_DATA,
|
||||
'format': 'yaml'
|
||||
}
|
||||
response = self.client.post(reverse('ipam:prefix_bulk_import'), data=form_data, follow=True)
|
||||
self.assertHttpStatus(response, 200)
|
||||
|
||||
# Verify the prefix was created with the site-specific VLAN (not the global one)
|
||||
prefix = Prefix.objects.get(prefix='10.12.0.0/22')
|
||||
self.assertEqual(prefix.vlan, vlan_site)
|
||||
|
||||
|
||||
class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
|
||||
model = IPRange
|
||||
|
||||
2
netbox/project-static/dist/netbox.css
vendored
2
netbox/project-static/dist/netbox.css
vendored
File diff suppressed because one or more lines are too long
@@ -36,7 +36,6 @@ form.object-edit {
|
||||
// Make optgroup labels sticky when scrolling through select elements
|
||||
select[multiple] {
|
||||
optgroup {
|
||||
position: sticky;
|
||||
top: 0;
|
||||
background-color: var(--bs-body-bg);
|
||||
font-style: normal;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -35,27 +35,34 @@ class NetBoxFakeRequest:
|
||||
# Utility functions
|
||||
#
|
||||
|
||||
def copy_safe_request(request):
|
||||
def copy_safe_request(request, include_files=True):
|
||||
"""
|
||||
Copy selected attributes from a request object into a new fake request object. This is needed in places where
|
||||
thread safe pickling of the useful request data is needed.
|
||||
|
||||
Args:
|
||||
request: The original request object
|
||||
include_files: Whether to include request.FILES.
|
||||
"""
|
||||
meta = {
|
||||
k: request.META[k]
|
||||
for k in HTTP_REQUEST_META_SAFE_COPY
|
||||
if k in request.META and isinstance(request.META[k], str)
|
||||
}
|
||||
return NetBoxFakeRequest({
|
||||
data = {
|
||||
'META': meta,
|
||||
'COOKIES': request.COOKIES,
|
||||
'POST': request.POST,
|
||||
'GET': request.GET,
|
||||
'FILES': request.FILES,
|
||||
'user': request.user,
|
||||
'method': request.method,
|
||||
'path': request.path,
|
||||
'id': getattr(request, 'id', None), # UUID assigned by middleware
|
||||
})
|
||||
}
|
||||
if include_files:
|
||||
data['FILES'] = request.FILES
|
||||
|
||||
return NetBoxFakeRequest(data)
|
||||
|
||||
|
||||
def get_client_ip(request, additional_headers=()):
|
||||
|
||||
Reference in New Issue
Block a user