mirror of
https://github.com/netbox-community/netbox.git
synced 2026-01-11 06:12:16 -06:00
Compare commits
7 Commits
20068-impo
...
f97f14bbc7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f97f14bbc7 | ||
|
|
269112a565 | ||
|
|
c6672538ac | ||
|
|
9ae53fc232 | ||
|
|
6efb258b9f | ||
|
|
da1e0f4b53 | ||
|
|
7f39f75d3d |
@@ -681,8 +681,8 @@ class ModuleBayTemplate(ModularComponentTemplateModel):
|
||||
|
||||
def instantiate(self, **kwargs):
|
||||
return self.component_model(
|
||||
name=self.name,
|
||||
label=self.label,
|
||||
name=self.resolve_name(kwargs.get('module')),
|
||||
label=self.resolve_label(kwargs.get('module')),
|
||||
position=self.position,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@@ -7,7 +7,6 @@ from django.utils.translation import gettext_lazy as _
|
||||
from jsonschema.exceptions import ValidationError as JSONValidationError
|
||||
|
||||
from dcim.choices import *
|
||||
from dcim.constants import MODULE_TOKEN
|
||||
from dcim.utils import update_interface_bridges
|
||||
from extras.models import ConfigContextModel, CustomField
|
||||
from netbox.models import PrimaryModel
|
||||
@@ -331,7 +330,6 @@ class Module(PrimaryModel, ConfigContextModel):
|
||||
else:
|
||||
# ModuleBays must be saved individually for MPTT
|
||||
for instance in create_instances:
|
||||
instance.name = instance.name.replace(MODULE_TOKEN, str(self.module_bay.position))
|
||||
instance.save()
|
||||
|
||||
update_fields = ['module']
|
||||
|
||||
@@ -792,8 +792,54 @@ class ModuleBayTestCase(TestCase):
|
||||
)
|
||||
device.consoleports.first()
|
||||
|
||||
def test_nested_module_token(self):
|
||||
pass
|
||||
@tag('regression') # #19918
|
||||
def test_nested_module_bay_label_resolution(self):
|
||||
"""Test that nested module bay labels properly resolve {module} placeholders"""
|
||||
manufacturer = Manufacturer.objects.first()
|
||||
site = Site.objects.first()
|
||||
device_role = DeviceRole.objects.first()
|
||||
|
||||
# Create device type with module bay template (position='A')
|
||||
device_type = DeviceType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Device with Bays',
|
||||
slug='device-with-bays'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
device_type=device_type,
|
||||
name='Bay A',
|
||||
position='A'
|
||||
)
|
||||
|
||||
# Create module type with nested bay template using {module} placeholder
|
||||
module_type = ModuleType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Module with Nested Bays'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
module_type=module_type,
|
||||
name='SFP {module}-21',
|
||||
label='{module}-21',
|
||||
position='21'
|
||||
)
|
||||
|
||||
# Create device and install module
|
||||
device = Device.objects.create(
|
||||
name='Test Device',
|
||||
device_type=device_type,
|
||||
role=device_role,
|
||||
site=site
|
||||
)
|
||||
module_bay = device.modulebays.get(name='Bay A')
|
||||
module = Module.objects.create(
|
||||
device=device,
|
||||
module_bay=module_bay,
|
||||
module_type=module_type
|
||||
)
|
||||
|
||||
# Verify nested bay label resolves {module} to parent position
|
||||
nested_bay = module.modulebays.get(name='SFP A-21')
|
||||
self.assertEqual(nested_bay.label, 'A-21')
|
||||
|
||||
|
||||
class CableTestCase(TestCase):
|
||||
|
||||
@@ -2,11 +2,14 @@ import logging
|
||||
import traceback
|
||||
from contextlib import ExitStack
|
||||
|
||||
from django.db import transaction
|
||||
from django.db import router, transaction
|
||||
from django.db import DEFAULT_DB_ALIAS
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from core.signals import clear_events
|
||||
from dcim.models import Device
|
||||
from extras.models import Script as ScriptModel
|
||||
from netbox.context_managers import event_tracking
|
||||
from netbox.jobs import JobRunner
|
||||
from netbox.registry import registry
|
||||
from utilities.exceptions import AbortScript, AbortTransaction
|
||||
@@ -42,10 +45,21 @@ class ScriptJob(JobRunner):
|
||||
# A script can modify multiple models so need to do an atomic lock on
|
||||
# both the default database (for non ChangeLogged models) and potentially
|
||||
# any other database (for ChangeLogged models)
|
||||
with transaction.atomic():
|
||||
script.output = script.run(data, commit)
|
||||
if not commit:
|
||||
raise AbortTransaction()
|
||||
changeloged_db = router.db_for_write(Device)
|
||||
with transaction.atomic(using=DEFAULT_DB_ALIAS):
|
||||
# If branch database is different from default, wrap in a second atomic transaction
|
||||
# Note: Don't add any extra code between the two atomic transactions,
|
||||
# otherwise the changes might get committed to the default database
|
||||
# if there are any raised exceptions.
|
||||
if changeloged_db != DEFAULT_DB_ALIAS:
|
||||
with transaction.atomic(using=changeloged_db):
|
||||
script.output = script.run(data, commit)
|
||||
if not commit:
|
||||
raise AbortTransaction()
|
||||
else:
|
||||
script.output = script.run(data, commit)
|
||||
if not commit:
|
||||
raise AbortTransaction()
|
||||
except AbortTransaction:
|
||||
script.log_info(message=_("Database changes have been reverted automatically."))
|
||||
if script.failed:
|
||||
@@ -108,14 +122,14 @@ class ScriptJob(JobRunner):
|
||||
script.request = request
|
||||
self.logger.debug(f"Request ID: {request.id if request else None}")
|
||||
|
||||
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
|
||||
# change logging, event rules, etc.
|
||||
if commit:
|
||||
self.logger.info("Executing script (commit enabled)")
|
||||
with ExitStack() as stack:
|
||||
for request_processor in registry['request_processors']:
|
||||
stack.enter_context(request_processor(request))
|
||||
self.run_script(script, request, data, commit)
|
||||
else:
|
||||
self.logger.warning("Executing script (commit disabled)")
|
||||
|
||||
with ExitStack() as stack:
|
||||
for request_processor in registry['request_processors']:
|
||||
if not commit and request_processor is event_tracking:
|
||||
continue
|
||||
stack.enter_context(request_processor(request))
|
||||
self.run_script(script, request, data, commit)
|
||||
|
||||
@@ -230,10 +230,6 @@ class PrefixImportForm(ScopedImportForm, NetBoxModelImportForm):
|
||||
query |= Q(**{
|
||||
f"site__{self.fields['vlan_site'].to_field_name}": vlan_site
|
||||
})
|
||||
# Don't Forget to include VLANs without a site in the filter
|
||||
query |= Q(**{
|
||||
f"site__{self.fields['vlan_site'].to_field_name}__isnull": True
|
||||
})
|
||||
|
||||
if vlan_group:
|
||||
query &= Q(**{
|
||||
|
||||
@@ -564,6 +564,82 @@ vlan: 102
|
||||
self.assertEqual(prefix.vlan.vid, 102)
|
||||
self.assertEqual(prefix.scope, site)
|
||||
|
||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||
def test_prefix_import_with_vlan_site_multiple_vlans_same_vid(self):
|
||||
"""
|
||||
Test import when multiple VLANs exist with the same vid but different sites.
|
||||
Ref: #20560
|
||||
"""
|
||||
site1 = Site.objects.get(name='Site 1')
|
||||
site2 = Site.objects.get(name='Site 2')
|
||||
|
||||
# Create VLANs with the same vid but different sites
|
||||
vlan1 = VLAN.objects.create(vid=1, name='VLAN1-Site1', site=site1)
|
||||
VLAN.objects.create(vid=1, name='VLAN1-Site2', site=site2) # Create ambiguity
|
||||
|
||||
# Import prefix with vlan_site specified
|
||||
IMPORT_DATA = f"""
|
||||
prefix: 10.11.0.0/22
|
||||
status: active
|
||||
scope_type: dcim.site
|
||||
scope_id: {site1.pk}
|
||||
vlan_site: {site1.name}
|
||||
vlan: 1
|
||||
description: LOC02-MGMT
|
||||
"""
|
||||
|
||||
# Add all required permissions to the test user
|
||||
self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
|
||||
|
||||
form_data = {
|
||||
'data': IMPORT_DATA,
|
||||
'format': 'yaml'
|
||||
}
|
||||
response = self.client.post(reverse('ipam:prefix_bulk_import'), data=form_data, follow=True)
|
||||
self.assertHttpStatus(response, 200)
|
||||
|
||||
# Verify the prefix was created with the correct VLAN
|
||||
prefix = Prefix.objects.get(prefix='10.11.0.0/22')
|
||||
self.assertEqual(prefix.vlan, vlan1)
|
||||
|
||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||
def test_prefix_import_with_vlan_site_and_global_vlan(self):
|
||||
"""
|
||||
Test import when a global VLAN (no site) and site-specific VLAN exist with same vid.
|
||||
When vlan_site is specified, should prefer the site-specific VLAN.
|
||||
Ref: #20560
|
||||
"""
|
||||
site1 = Site.objects.get(name='Site 1')
|
||||
|
||||
# Create a global VLAN (no site) and a site-specific VLAN with the same vid
|
||||
VLAN.objects.create(vid=10, name='VLAN10-Global', site=None) # Create ambiguity
|
||||
vlan_site = VLAN.objects.create(vid=10, name='VLAN10-Site1', site=site1)
|
||||
|
||||
# Import prefix with vlan_site specified
|
||||
IMPORT_DATA = f"""
|
||||
prefix: 10.12.0.0/22
|
||||
status: active
|
||||
scope_type: dcim.site
|
||||
scope_id: {site1.pk}
|
||||
vlan_site: {site1.name}
|
||||
vlan: 10
|
||||
description: Test Site-Specific VLAN
|
||||
"""
|
||||
|
||||
# Add all required permissions to the test user
|
||||
self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
|
||||
|
||||
form_data = {
|
||||
'data': IMPORT_DATA,
|
||||
'format': 'yaml'
|
||||
}
|
||||
response = self.client.post(reverse('ipam:prefix_bulk_import'), data=form_data, follow=True)
|
||||
self.assertHttpStatus(response, 200)
|
||||
|
||||
# Verify the prefix was created with the site-specific VLAN (not the global one)
|
||||
prefix = Prefix.objects.get(prefix='10.12.0.0/22')
|
||||
self.assertEqual(prefix.vlan, vlan_site)
|
||||
|
||||
|
||||
class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
|
||||
model = IPRange
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user