Compare commits

..

25 Commits

Author SHA1 Message Date
Daniel Sheppard
3cc15ecaf0 Merge branch 'feature' into feature-ip-prefix-link
Some checks failed
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled
CI / build (20.x, 3.14) (push) Has been cancelled
2025-12-28 11:41:39 -06:00
Daniel Sheppard
5ada585129 Remove unrelated development path
Some checks failed
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled
2025-11-25 08:16:46 -06:00
Daniel Sheppard
b03158f1de Add pgtrigger as dependancy
Some checks are pending
CI / build (20.x, 3.12) (push) Waiting to run
CI / build (20.x, 3.13) (push) Waiting to run
2025-11-25 00:06:35 -06:00
Daniel Sheppard
bdde4b7e94 Switch to using triggers
Still outstanding:

* IPAddress and IPRange triggers
* Triggers for VRF changes on Prefix
* Triggers for changing to "container" on Prefix
* Rework logic for saving on all models
2025-11-25 00:01:23 -06:00
Daniel Sheppard
905656f13e Add migration 2025-11-07 09:10:51 -06:00
Daniel Sheppard
42c2dc57f8 Develop triggers for setting parents 2025-11-07 09:02:30 -06:00
Daniel Sheppard
56673f4d88 Signal optimizations 2025-11-05 18:10:38 -06:00
Daniel Sheppard
955c64b68c Re-order migrations 2025-09-14 10:49:44 -05:00
Daniel Sheppard
912e6e4fb1 Merge branch 'feature' of https://github.com/netbox-community/netbox into feature-ip-prefix-link
Some checks failed
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled
2025-09-11 20:46:47 -05:00
Daniel Sheppard
90d277610c Merge branch 'feature' of https://github.com/netbox-community/netbox into feature-ip-prefix-link 2025-09-03 22:15:38 -05:00
Daniel Sheppard
b1bc933e98 Clean up Prefix TODOs
Some checks failed
CI / build (20.x, 3.10) (push) Has been cancelled
CI / build (20.x, 3.11) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
2025-08-08 09:29:12 -05:00
Daniel Sheppard
b54196f595 Merge branch 'feature' into feature-ip-prefix-link
Some checks are pending
CI / build (20.x, 3.10) (push) Waiting to run
CI / build (20.x, 3.11) (push) Waiting to run
CI / build (20.x, 3.12) (push) Waiting to run
2025-08-07 21:33:58 -05:00
Daniel Sheppard
0d31449df8 Optimize prefix assignment. Fix tests
Some checks failed
CI / build (20.x, 3.10) (push) Has been cancelled
CI / build (20.x, 3.11) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
2025-07-10 12:59:59 -05:00
Daniel Sheppard
76e85683ac Re-apply de-duplication to IPRangeSerializer 2025-07-09 13:18:28 -05:00
Daniel Sheppard
f844ec5703 Merge branch 'feature' into feature-ip-prefix-link 2025-07-09 13:00:51 -05:00
Daniel Sheppard
7eb3a8d379 Fix some tests 2025-07-09 12:56:05 -05:00
Daniel Sheppard
ade4354ca4 Fix some test errors 2025-07-09 11:12:52 -05:00
Daniel Sheppard
697d5bd876 Slightly DRY the migration 2025-07-09 10:52:24 -05:00
Daniel Sheppard
b19f81cede More work on IP Address/Range and Prefix relationship 2025-07-09 10:36:41 -05:00
Daniel Sheppard
c5e7b21147 Add additional FKs 2025-06-05 09:43:18 -05:00
Daniel Sheppard
c211b624d0 Merge branch 'feature' of https://github.com/netbox-community/netbox into feature-ip-prefix-link
# Conflicts:
#	netbox/ipam/forms/bulk_import.py
2025-05-15 08:40:58 -05:00
Daniel Sheppard
4c8301b3a5 Update migration 2025-05-15 08:38:03 -05:00
Daniel Sheppard
68d0b58293 Update migration 2025-04-10 08:22:23 -05:00
Daniel Sheppard
738ef63527 Update from feature 2025-04-09 10:24:56 -05:00
Daniel Sheppard
747fef0bc2 Work on IP to Prefix ForeignKey relationship 2025-02-24 14:03:18 -06:00
36 changed files with 1228 additions and 848 deletions

View File

@@ -35,6 +35,11 @@ django-mptt==0.17.0
# https://github.com/Xof/django-pglocks/blob/master/CHANGES.txt
django-pglocks
# Manager for managing PostgreSQL triggers
# https://github.com/AmbitionEng/django-pgtrigger/blob/main/CHANGELOG.md
django-pgtrigger
# Prometheus metrics library for Django
# https://github.com/korfuri/django-prometheus/blob/master/CHANGELOG.md
django-prometheus

View File

@@ -22,7 +22,7 @@
#### Lookup Modifiers in Filter Forms ([#7604](https://github.com/netbox-community/netbox/issues/7604))
Most object list filters within the UI have been extended to include optional lookup modifiers to support more complex queries. For instance, filters for numeric values now include a dropdown where a user can select "less than," "greater than," or "not" in addition to the default equivalency match. The specific modifiers available depend on the type of each filter. Plugins can register their own filtersets using the `register_filterset()` decorator to enable this new functionality.
Most object list filters within the UI have been extended to include optional lookup modifiers to support more complex queries. For instance, filters for numeric values now include a dropdown where a user can select "less than," "greater than," or "not" in addition to the default equivalency match. The specific modifiers available depend on the type of each filter.
(Note that this feature does not introduce any new filters. Rather, it makes available in the UI filters which already exist.)

View File

@@ -126,26 +126,18 @@ class ModuleCommonForm(forms.Form):
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
token_count = template.name.count(MODULE_TOKEN)
# Validate: depth must be >= token_count (can't expand tokens without context)
if len(module_bays) < token_count:
if len(module_bays) != template.name.count(MODULE_TOKEN):
raise forms.ValidationError(
_(
"Cannot install module with placeholder values in a module bay tree {level} in tree "
"but {tokens} placeholders given."
).format(
level=len(module_bays), tokens=token_count
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
)
)
if token_count == 1:
# Single token: substitute with full path (e.g., "1/1" for depth 2)
full_path = '/'.join([mb.position for mb in module_bays])
resolved_name = resolved_name.replace(MODULE_TOKEN, full_path, 1)
else:
# Multiple tokens: substitute level-by-level (existing behavior)
for mb in module_bays:
resolved_name = resolved_name.replace(MODULE_TOKEN, mb.position, 1)
for module_bay in module_bays:
resolved_name = resolved_name.replace(MODULE_TOKEN, module_bay.position, 1)
existing_item = installed_components.get(resolved_name)

View File

@@ -175,16 +175,9 @@ class ModularComponentTemplateModel(ComponentTemplateModel):
if module:
modules = self._get_module_tree(module)
token_count = self.name.count(MODULE_TOKEN)
name = self.name
if token_count == 1:
# Single token: substitute with full path (e.g., "1/1" for depth 2)
full_path = '/'.join([m.module_bay.position for m in modules])
name = name.replace(MODULE_TOKEN, full_path, 1)
else:
# Multiple tokens: substitute level-by-level (existing behavior)
for m in modules:
name = name.replace(MODULE_TOKEN, m.module_bay.position, 1)
for module in modules:
name = name.replace(MODULE_TOKEN, module.module_bay.position, 1)
return name
return self.name
@@ -194,16 +187,9 @@ class ModularComponentTemplateModel(ComponentTemplateModel):
if module:
modules = self._get_module_tree(module)
token_count = self.label.count(MODULE_TOKEN)
label = self.label
if token_count == 1:
# Single token: substitute with full path (e.g., "1/1" for depth 2)
full_path = '/'.join([m.module_bay.position for m in modules])
label = label.replace(MODULE_TOKEN, full_path, 1)
else:
# Multiple tokens: substitute level-by-level (existing behavior)
for m in modules:
label = label.replace(MODULE_TOKEN, m.module_bay.position, 1)
for module in modules:
label = label.replace(MODULE_TOKEN, module.module_bay.position, 1)
return label
return self.label

View File

@@ -848,720 +848,6 @@ class ModuleBayTestCase(TestCase):
nested_bay = module.modulebays.get(name='SFP A-21')
self.assertEqual(nested_bay.label, 'A-21')
def test_nested_module_single_placeholder_full_path(self):
"""
Test that installing a module at depth=2 with a single {module} placeholder
in the interface template name resolves to the full path (e.g., "1/1").
Regression test for transceiver modeling use case.
"""
manufacturer = Manufacturer.objects.first()
site = Site.objects.first()
device_role = DeviceRole.objects.first()
# Create device type with module bay template
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='Chassis Device',
slug='chassis-device'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Line Card Bay 1',
position='1'
)
# Create line card module type with nested module bay
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='SFP Bay {module}/1',
label='SFP {module}/1',
position='1'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='SFP Bay {module}/2',
label='SFP {module}/2',
position='2'
)
# Create SFP module type with interface using single {module} placeholder
sfp_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='SFP Transceiver'
)
InterfaceTemplate.objects.create(
module_type=sfp_type,
name='SFP {module}',
label='{module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
# Create device
device = Device.objects.create(
name='Test Chassis',
device_type=device_type,
role=device_role,
site=site
)
# Install line card in bay 1
line_card_bay = device.modulebays.get(name='Line Card Bay 1')
line_card = Module.objects.create(
device=device,
module_bay=line_card_bay,
module_type=line_card_type
)
# Install SFP in nested bay 1 (depth=2)
sfp_bay_1 = line_card.modulebays.get(name='SFP Bay 1/1')
sfp_module_1 = Module.objects.create(
device=device,
module_bay=sfp_bay_1,
module_type=sfp_type
)
# Verify interface name resolves to full path "1/1"
interface_1 = sfp_module_1.interfaces.first()
self.assertEqual(interface_1.name, 'SFP 1/1')
self.assertEqual(interface_1.label, '1/1')
# Install second SFP in nested bay 2 (depth=2) - verifies uniqueness
sfp_bay_2 = line_card.modulebays.get(name='SFP Bay 1/2')
sfp_module_2 = Module.objects.create(
device=device,
module_bay=sfp_bay_2,
module_type=sfp_type
)
# Verify second interface name resolves to full path "1/2"
interface_2 = sfp_module_2.interfaces.first()
self.assertEqual(interface_2.name, 'SFP 1/2')
self.assertEqual(interface_2.label, '1/2')
def test_single_placeholder_direct_install_depth_1(self):
"""
Test that installing a module directly at depth=1 with a single {module}
placeholder still resolves correctly (just the position, not a path).
"""
manufacturer = Manufacturer.objects.first()
site = Site.objects.first()
device_role = DeviceRole.objects.first()
# Create device type with module bay template
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='Simple Chassis',
slug='simple-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='SFP Bay 1',
position='1'
)
# Create SFP module type with interface using single {module} placeholder
sfp_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='Direct SFP'
)
InterfaceTemplate.objects.create(
module_type=sfp_type,
name='SFP {module}',
label='{module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
# Create device
device = Device.objects.create(
name='Test Simple Chassis',
device_type=device_type,
role=device_role,
site=site
)
# Install SFP directly in bay 1 (depth=1)
sfp_bay = device.modulebays.get(name='SFP Bay 1')
sfp_module = Module.objects.create(
device=device,
module_bay=sfp_bay,
module_type=sfp_type
)
# Verify interface name resolves to just "1"
interface = sfp_module.interfaces.first()
self.assertEqual(interface.name, 'SFP 1')
self.assertEqual(interface.label, '1')
def test_multi_token_level_by_level_depth_2(self):
"""
T1: Multi-token behavior remains unchanged at depth=2.
Ensure legacy {module}/{module} still resolves level-by-level.
"""
site = Site.objects.create(name='T1 Site', slug='t1-site')
manufacturer = Manufacturer.objects.create(name='T1 Manufacturer', slug='t1-manufacturer')
device_role = DeviceRole.objects.create(name='T1 Role', slug='t1-role')
# Create device type with module bay
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T1 Chassis',
slug='t1-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 1',
position='1'
)
# Create line card module type with nested bay
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T1 Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='Nested Bay 2',
position='2'
)
# Create SFP module type with 2-token interface template
sfp_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T1 SFP'
)
InterfaceTemplate.objects.create(
module_type=sfp_type,
name='SFP {module}/{module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
# Create device and install modules
device = Device.objects.create(
name='T1 Device',
device_type=device_type,
role=device_role,
site=site
)
# Install line card at position 1
line_card_bay = device.modulebays.get(name='Bay 1')
line_card = Module.objects.create(
device=device,
module_bay=line_card_bay,
module_type=line_card_type
)
# Install SFP at nested bay (position 2)
sfp_bay = line_card.modulebays.get(name='Nested Bay 2')
sfp_module = Module.objects.create(
device=device,
module_bay=sfp_bay,
module_type=sfp_type
)
# Verify level-by-level substitution: 1/2 (not 1/2/1/2)
interface = sfp_module.interfaces.first()
self.assertEqual(interface.name, 'SFP 1/2')
def test_multi_token_deeper_tree_only_consumes_tokens(self):
"""
T2: Multi-token with deeper tree only consumes tokens (depth=3, tokens=2).
2 tokens → 2 levels, even if tree is deeper.
"""
site = Site.objects.create(name='T2 Site', slug='t2-site')
manufacturer = Manufacturer.objects.create(name='T2 Manufacturer', slug='t2-manufacturer')
device_role = DeviceRole.objects.create(name='T2 Role', slug='t2-role')
# Create device type with module bay
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T2 Chassis',
slug='t2-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 1',
position='1'
)
# Create level 2 module type with nested bay
level2_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T2 Level2'
)
ModuleBayTemplate.objects.create(
module_type=level2_type,
name='Level2 Bay',
position='1'
)
# Create level 3 module type with nested bay
level3_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T2 Level3'
)
ModuleBayTemplate.objects.create(
module_type=level3_type,
name='Level3 Bay',
position='1'
)
# Create leaf module type with 2-token interface template
leaf_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T2 Leaf'
)
InterfaceTemplate.objects.create(
module_type=leaf_type,
name='SFP {module}/{module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
# Create device and install 3 levels of modules
device = Device.objects.create(
name='T2 Device',
device_type=device_type,
role=device_role,
site=site
)
# Level 1
bay1 = device.modulebays.get(name='Bay 1')
module1 = Module.objects.create(
device=device,
module_bay=bay1,
module_type=level2_type
)
# Level 2
bay2 = module1.modulebays.get(name='Level2 Bay')
module2 = Module.objects.create(
device=device,
module_bay=bay2,
module_type=level3_type
)
# Level 3 (leaf)
bay3 = module2.modulebays.get(name='Level3 Bay')
leaf_module = Module.objects.create(
device=device,
module_bay=bay3,
module_type=leaf_type
)
# Verify: 2 tokens → consumes first 2 levels only: "1/1" (not "1/1/1")
interface = leaf_module.interfaces.first()
self.assertEqual(interface.name, 'SFP 1/1')
def test_too_many_tokens_fails_validation(self):
"""
T3: Too-many-tokens still fails (depth=2, tokens=3).
Confirms the validation prevents impossible substitution.
"""
from dcim.forms import ModuleForm
site = Site.objects.create(name='T3 Site', slug='t3-site')
manufacturer = Manufacturer.objects.create(name='T3 Manufacturer', slug='t3-manufacturer')
device_role = DeviceRole.objects.create(name='T3 Role', slug='t3-role')
# Create device type with module bay
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T3 Chassis',
slug='t3-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 1',
position='1'
)
# Create line card module type with nested bay
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T3 Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='Nested Bay',
position='1'
)
# Create leaf module type with 3-token interface template (too many!)
leaf_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T3 Leaf'
)
InterfaceTemplate.objects.create(
module_type=leaf_type,
name='{module}/{module}/{module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
# Create device and install line card
device = Device.objects.create(
name='T3 Device',
device_type=device_type,
role=device_role,
site=site
)
bay1 = device.modulebays.get(name='Bay 1')
line_card = Module.objects.create(
device=device,
module_bay=bay1,
module_type=line_card_type
)
# Attempt to install leaf module at depth=2 with 3 tokens - should fail
nested_bay = line_card.modulebays.get(name='Nested Bay')
form = ModuleForm(data={
'device': device.pk,
'module_bay': nested_bay.pk,
'module_type': leaf_type.pk,
'status': 'active',
'replicate_components': True,
'adopt_components': False,
})
self.assertFalse(form.is_valid())
# Check the error message mentions the mismatch
self.assertIn('2', str(form.errors))
self.assertIn('3', str(form.errors))
def test_label_substitution_matches_name_depth_2(self):
"""
T4: Label substitution works the same way as name (depth=2 single-token).
"""
site = Site.objects.create(name='T4 Site', slug='t4-site')
manufacturer = Manufacturer.objects.create(name='T4 Manufacturer', slug='t4-manufacturer')
device_role = DeviceRole.objects.create(name='T4 Role', slug='t4-role')
# Create device type with module bay
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T4 Chassis',
slug='t4-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 1',
position='1'
)
# Create line card module type with nested bay at position 2
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T4 Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='Nested Bay',
position='2'
)
# Create leaf module type with single-token name AND label
leaf_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T4 Leaf'
)
InterfaceTemplate.objects.create(
module_type=leaf_type,
name='SFP {module}',
label='LBL {module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
# Create device and install modules
device = Device.objects.create(
name='T4 Device',
device_type=device_type,
role=device_role,
site=site
)
bay1 = device.modulebays.get(name='Bay 1')
line_card = Module.objects.create(
device=device,
module_bay=bay1,
module_type=line_card_type
)
nested_bay = line_card.modulebays.get(name='Nested Bay')
leaf_module = Module.objects.create(
device=device,
module_bay=nested_bay,
module_type=leaf_type
)
# Verify both name and label resolve to full path
interface = leaf_module.interfaces.first()
self.assertEqual(interface.name, 'SFP 1/2')
self.assertEqual(interface.label, 'LBL 1/2')
def test_non_interface_component_template_substitution(self):
"""
T5: Non-interface modular component templates (ConsolePortTemplate).
Ensures the fix is general to all ModularComponentTemplateModel subclasses.
"""
site = Site.objects.create(name='T5 Site', slug='t5-site')
manufacturer = Manufacturer.objects.create(name='T5 Manufacturer', slug='t5-manufacturer')
device_role = DeviceRole.objects.create(name='T5 Role', slug='t5-role')
# Create device type with module bay
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T5 Chassis',
slug='t5-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 1',
position='1'
)
# Create line card module type with nested bay at position 2
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T5 Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='Nested Bay',
position='2'
)
# Create leaf module type with ConsolePortTemplate using single token
leaf_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T5 Leaf'
)
ConsolePortTemplate.objects.create(
module_type=leaf_type,
name='Console {module}',
label='{module}'
)
# Create device and install modules
device = Device.objects.create(
name='T5 Device',
device_type=device_type,
role=device_role,
site=site
)
bay1 = device.modulebays.get(name='Bay 1')
line_card = Module.objects.create(
device=device,
module_bay=bay1,
module_type=line_card_type
)
nested_bay = line_card.modulebays.get(name='Nested Bay')
leaf_module = Module.objects.create(
device=device,
module_bay=nested_bay,
module_type=leaf_type
)
# Verify ConsolePort resolves with full path
console_port = leaf_module.consoleports.first()
self.assertEqual(console_port.name, 'Console 1/2')
self.assertEqual(console_port.label, '1/2')
def test_positions_with_slashes_join_correctly(self):
"""
T6: Positions that already contain slashes don't break joining (depth=2, single token).
Some platforms use positions like 0/1 (PIC/port style) even before nesting.
"""
site = Site.objects.create(name='T6 Site', slug='t6-site')
manufacturer = Manufacturer.objects.create(name='T6 Manufacturer', slug='t6-manufacturer')
device_role = DeviceRole.objects.create(name='T6 Role', slug='t6-role')
# Create device type with module bay using slash in position
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T6 Chassis',
slug='t6-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='PIC Bay',
position='0/1' # Position already contains slash
)
# Create line card module type with nested bay at position 2
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T6 Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='Nested Bay',
position='2'
)
# Create leaf module type with single-token interface template
leaf_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T6 Leaf'
)
InterfaceTemplate.objects.create(
module_type=leaf_type,
name='Gi{module}',
type=InterfaceTypeChoices.TYPE_1GE_FIXED
)
# Create device and install modules
device = Device.objects.create(
name='T6 Device',
device_type=device_type,
role=device_role,
site=site
)
bay1 = device.modulebays.get(name='PIC Bay')
line_card = Module.objects.create(
device=device,
module_bay=bay1,
module_type=line_card_type
)
nested_bay = line_card.modulebays.get(name='Nested Bay')
leaf_module = Module.objects.create(
device=device,
module_bay=nested_bay,
module_type=leaf_type
)
# Verify: 0/1 + 2 = 0/1/2
interface = leaf_module.interfaces.first()
self.assertEqual(interface.name, 'Gi0/1/2')
def test_depth_1_single_token_no_extra_slashes(self):
"""
T7: Ensure depth=1 single-token still resolves to the position, not an unnecessary "path join".
"""
site = Site.objects.create(name='T7 Site', slug='t7-site')
manufacturer = Manufacturer.objects.create(name='T7 Manufacturer', slug='t7-manufacturer')
device_role = DeviceRole.objects.create(name='T7 Role', slug='t7-role')
# Create device type with module bay at position 7
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T7 Chassis',
slug='t7-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 7',
position='7'
)
# Create module type with single-token template
module_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T7 Module'
)
InterfaceTemplate.objects.create(
module_type=module_type,
name='{module}',
type=InterfaceTypeChoices.TYPE_1GE_FIXED
)
# Create device and install module directly at depth=1
device = Device.objects.create(
name='T7 Device',
device_type=device_type,
role=device_role,
site=site
)
bay = device.modulebays.get(name='Bay 7')
module = Module.objects.create(
device=device,
module_bay=bay,
module_type=module_type
)
# Verify: just "7", not "7/" or similar
interface = module.interfaces.first()
self.assertEqual(interface.name, '7')
def test_multi_occurrence_tokens_level_by_level(self):
"""
T8: Multiple occurrences of {module} in a single template (token_count > 1) still level-by-level.
Ensure the token_count logic and replacement loop behaves with duplicated patterns.
"""
site = Site.objects.create(name='T8 Site', slug='t8-site')
manufacturer = Manufacturer.objects.create(name='T8 Manufacturer', slug='t8-manufacturer')
device_role = DeviceRole.objects.create(name='T8 Role', slug='t8-role')
# Create device type with module bay
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='T8 Chassis',
slug='t8-chassis'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Bay 1',
position='1'
)
# Create line card module type with nested bay at position 2
line_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T8 Line Card'
)
ModuleBayTemplate.objects.create(
module_type=line_card_type,
name='Nested Bay',
position='2'
)
# Create leaf module type with 2-token template (non-slash separator)
leaf_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='T8 Leaf'
)
InterfaceTemplate.objects.create(
module_type=leaf_type,
name='X{module}-Y{module}',
type=InterfaceTypeChoices.TYPE_1GE_FIXED
)
# Create device and install modules
device = Device.objects.create(
name='T8 Device',
device_type=device_type,
role=device_role,
site=site
)
bay1 = device.modulebays.get(name='Bay 1')
line_card = Module.objects.create(
device=device,
module_bay=bay1,
module_type=line_card_type
)
nested_bay = line_card.modulebays.get(name='Nested Bay')
leaf_module = Module.objects.create(
device=device,
module_bay=nested_bay,
module_type=leaf_type
)
# Verify: X1-Y2 (level-by-level, not full-path stuffed into first)
interface = leaf_module.interfaces.first()
self.assertEqual(interface.name, 'X1-Y2')
class CableTestCase(TestCase):

View File

@@ -51,14 +51,7 @@ class ImageAttachmentsPanel(panels.ObjectsTablePanel):
]
def __init__(self, **kwargs):
super().__init__(
'extras.imageattachment',
filters={
'object_type_id': lambda ctx: ContentType.objects.get_for_model(ctx['object']).pk,
'object_id': lambda ctx: ctx['object'].pk,
},
**kwargs,
)
super().__init__('extras.imageattachment', **kwargs)
class TagsPanel(panels.ObjectPanel):

View File

@@ -59,18 +59,24 @@ class PrefixSerializer(PrimaryModelSerializer):
vlan = VLANSerializer(nested=True, required=False, allow_null=True)
status = ChoiceField(choices=PrefixStatusChoices, required=False)
role = RoleSerializer(nested=True, required=False, allow_null=True)
children = serializers.IntegerField(read_only=True)
_children = serializers.IntegerField(read_only=True)
_depth = serializers.IntegerField(read_only=True)
prefix = IPNetworkField()
class Meta:
model = Prefix
fields = [
'id', 'url', 'display_url', 'display', 'family', 'prefix', 'vrf', 'scope_type', 'scope_id', 'scope',
'tenant', 'vlan', 'status', 'role', 'is_pool', 'mark_utilized', 'description', 'owner', 'comments', 'tags',
'custom_fields', 'created', 'last_updated', 'children', '_depth',
'id', 'url', 'display_url', 'display', 'family', 'aggregate', 'parent', 'prefix', 'vrf', 'scope_type',
'scope_id', 'scope', 'tenant', 'vlan', 'status', 'role', 'is_pool', 'mark_utilized', 'description',
'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated', '_children', '_depth',
]
brief_fields = ('id', 'url', 'display', 'family', 'prefix', 'description', '_depth')
brief_fields = ('id', 'url', 'display', 'family', 'aggregate', 'parent', 'prefix', 'description', '_depth')
def get_fields(self):
fields = super(PrefixSerializer, self).get_fields()
fields['parent'] = PrefixSerializer(nested=True, read_only=True)
return fields
class PrefixLengthSerializer(serializers.Serializer):
@@ -124,7 +130,9 @@ class AvailablePrefixSerializer(serializers.Serializer):
# IP ranges
#
class IPRangeSerializer(PrimaryModelSerializer):
prefix = PrefixSerializer(nested=True, required=False, allow_null=True)
family = ChoiceField(choices=IPAddressFamilyChoices, read_only=True)
start_address = IPAddressField()
end_address = IPAddressField()
@@ -136,11 +144,11 @@ class IPRangeSerializer(PrimaryModelSerializer):
class Meta:
model = IPRange
fields = [
'id', 'url', 'display_url', 'display', 'family', 'start_address', 'end_address', 'size', 'vrf', 'tenant',
'status', 'role', 'description', 'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
'id', 'url', 'display_url', 'display', 'family', 'prefix', 'start_address', 'end_address', 'size', 'vrf',
'tenant', 'status', 'role', 'description', 'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
'mark_populated', 'mark_utilized',
]
brief_fields = ('id', 'url', 'display', 'family', 'start_address', 'end_address', 'description')
brief_fields = ('id', 'url', 'display', 'family', 'prefix', 'start_address', 'end_address', 'description')
#
@@ -148,6 +156,7 @@ class IPRangeSerializer(PrimaryModelSerializer):
#
class IPAddressSerializer(PrimaryModelSerializer):
prefix = PrefixSerializer(nested=True, required=False, allow_null=True)
family = ChoiceField(choices=IPAddressFamilyChoices, read_only=True)
address = IPAddressField()
vrf = VRFSerializer(nested=True, required=False, allow_null=True)
@@ -166,11 +175,11 @@ class IPAddressSerializer(PrimaryModelSerializer):
class Meta:
model = IPAddress
fields = [
'id', 'url', 'display_url', 'display', 'family', 'address', 'vrf', 'tenant', 'status', 'role',
'id', 'url', 'display_url', 'display', 'family', 'prefix', 'address', 'vrf', 'tenant', 'status', 'role',
'assigned_object_type', 'assigned_object_id', 'assigned_object', 'nat_inside', 'nat_outside',
'dns_name', 'description', 'owner', 'comments', 'tags', 'custom_fields', 'created', 'last_updated',
]
brief_fields = ('id', 'url', 'display', 'family', 'address', 'description')
brief_fields = ('id', 'url', 'display', 'family', 'prefix', 'address', 'description')
class AvailableIPSerializer(serializers.Serializer):

View File

@@ -340,6 +340,26 @@ class PrefixFilterSet(PrimaryModelFilterSet, ScopedFilterSet, TenancyFilterSet,
field_name='prefix',
lookup_expr='net_mask_length__lte'
)
aggregate_id = django_filters.ModelMultipleChoiceFilter(
queryset=Aggregate.objects.all(),
label=_('Aggregate'),
)
aggregate = django_filters.ModelMultipleChoiceFilter(
field_name='aggregate__prefix',
queryset=Aggregate.objects.all(),
to_field_name='prefix',
label=_('Aggregate (Prefix)'),
)
parent_id = django_filters.ModelMultipleChoiceFilter(
queryset=Prefix.objects.all(),
label=_('Parent Prefix'),
)
parent = django_filters.ModelMultipleChoiceFilter(
field_name='parent__prefix',
queryset=Prefix.objects.all(),
to_field_name='prefix',
label=_('Parent Prefix (Prefix)'),
)
vrf_id = django_filters.ModelMultipleChoiceFilter(
queryset=VRF.objects.all(),
label=_('VRF'),
@@ -484,6 +504,16 @@ class IPRangeFilterSet(PrimaryModelFilterSet, TenancyFilterSet, ContactModelFilt
method='search_contains',
label=_('Ranges which contain this prefix or IP'),
)
prefix_id = django_filters.ModelMultipleChoiceFilter(
queryset=Prefix.objects.all(),
label=_('Prefix (ID)'),
)
prefix = django_filters.ModelMultipleChoiceFilter(
field_name='prefix__prefix',
queryset=Prefix.objects.all(),
to_field_name='prefix',
label=_('Prefix'),
)
vrf_id = django_filters.ModelMultipleChoiceFilter(
queryset=VRF.objects.all(),
label=_('VRF'),
@@ -569,6 +599,16 @@ class IPAddressFilterSet(PrimaryModelFilterSet, TenancyFilterSet, ContactModelFi
method='search_by_parent',
label=_('Parent prefix'),
)
prefix_id = django_filters.ModelMultipleChoiceFilter(
queryset=Prefix.objects.all(),
label=_('Prefix (ID)'),
)
prefix = django_filters.ModelMultipleChoiceFilter(
field_name='prefix__prefix',
queryset=Prefix.objects.all(),
to_field_name='prefix',
label=_('Prefix (prefix)'),
)
address = MultiValueCharFilter(
method='filter_address',
label=_('Address'),

View File

@@ -168,6 +168,11 @@ class RoleBulkEditForm(OrganizationalModelBulkEditForm):
class PrefixBulkEditForm(ScopedBulkEditForm, PrimaryModelBulkEditForm):
parent = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Parent Prefix')
)
vlan_group = DynamicModelChoiceField(
queryset=VLANGroup.objects.all(),
required=False,
@@ -221,7 +226,7 @@ class PrefixBulkEditForm(ScopedBulkEditForm, PrimaryModelBulkEditForm):
model = Prefix
fieldsets = (
FieldSet('tenant', 'status', 'role', 'description'),
FieldSet('vrf', 'prefix_length', 'is_pool', 'mark_utilized', name=_('Addressing')),
FieldSet('parent', 'vrf', 'prefix_length', 'is_pool', 'mark_utilized', name=_('Addressing')),
FieldSet('scope_type', 'scope', name=_('Scope')),
FieldSet('vlan_group', 'vlan', name=_('VLAN Assignment')),
)
@@ -231,6 +236,11 @@ class PrefixBulkEditForm(ScopedBulkEditForm, PrimaryModelBulkEditForm):
class IPRangeBulkEditForm(PrimaryModelBulkEditForm):
prefix = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Prefix')
)
vrf = DynamicModelChoiceField(
queryset=VRF.objects.all(),
required=False,
@@ -272,6 +282,16 @@ class IPRangeBulkEditForm(PrimaryModelBulkEditForm):
class IPAddressBulkEditForm(PrimaryModelBulkEditForm):
prefix = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Prefix')
)
prefix = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Prefix')
)
vrf = DynamicModelChoiceField(
queryset=VRF.objects.all(),
required=False,
@@ -307,10 +327,10 @@ class IPAddressBulkEditForm(PrimaryModelBulkEditForm):
model = IPAddress
fieldsets = (
FieldSet('status', 'role', 'tenant', 'description'),
FieldSet('vrf', 'mask_length', 'dns_name', name=_('Addressing')),
FieldSet('prefix', 'vrf', 'mask_length', 'dns_name', name=_('Addressing')),
)
nullable_fields = (
'vrf', 'role', 'tenant', 'dns_name', 'description', 'comments',
'prefix', 'vrf', 'role', 'tenant', 'dns_name', 'description', 'comments',
)

View File

@@ -155,6 +155,18 @@ class RoleImportForm(OrganizationalModelImportForm):
class PrefixImportForm(ScopedImportForm, PrimaryModelImportForm):
aggregate = CSVModelChoiceField(
label=_('Aggregate'),
queryset=Aggregate.objects.all(),
to_field_name='prefix',
required=False
)
parent = CSVModelChoiceField(
label=_('Prefix'),
queryset=Prefix.objects.all(),
to_field_name='prefix',
required=False
)
vrf = CSVModelChoiceField(
label=_('VRF'),
queryset=VRF.objects.all(),
@@ -242,8 +254,26 @@ class PrefixImportForm(ScopedImportForm, PrimaryModelImportForm):
queryset = self.fields['vlan'].queryset.filter(query)
self.fields['vlan'].queryset = queryset
# Limit Prefix queryset by assigned vrf
vrf = data.get('vrf')
query = Q()
if vrf:
query &= Q(**{
f"vrf__{self.fields['vrf'].to_field_name}": vrf
})
queryset = self.fields['parent'].queryset.filter(query)
self.fields['parent'].queryset = queryset
class IPRangeImportForm(PrimaryModelImportForm):
prefix = CSVModelChoiceField(
label=_('Prefix'),
queryset=Prefix.objects.all(),
to_field_name='prefix',
required=True,
help_text=_('Assigned prefix')
)
vrf = CSVModelChoiceField(
label=_('VRF'),
queryset=VRF.objects.all(),
@@ -278,8 +308,29 @@ class IPRangeImportForm(PrimaryModelImportForm):
'description', 'owner', 'comments', 'tags',
)
def __init__(self, data=None, *args, **kwargs):
super().__init__(data, *args, **kwargs)
# Limit Prefix queryset by assigned vrf
vrf = data.get('vrf')
query = Q()
if vrf:
query &= Q(**{
f"vrf__{self.fields['vrf'].to_field_name}": vrf
})
queryset = self.fields['prefix'].queryset.filter(query)
self.fields['prefix'].queryset = queryset
class IPAddressImportForm(PrimaryModelImportForm):
prefix = CSVModelChoiceField(
label=_('Prefix'),
queryset=Prefix.objects.all(),
required=False,
to_field_name='prefix',
help_text=_('Assigned prefix')
)
vrf = CSVModelChoiceField(
label=_('VRF'),
queryset=VRF.objects.all(),
@@ -347,8 +398,8 @@ class IPAddressImportForm(PrimaryModelImportForm):
class Meta:
model = IPAddress
fields = [
'address', 'vrf', 'tenant', 'status', 'role', 'device', 'virtual_machine', 'interface', 'fhrp_group',
'is_primary', 'is_oob', 'dns_name', 'description', 'owner', 'comments', 'tags',
'prefix', 'address', 'vrf', 'tenant', 'status', 'role', 'device', 'virtual_machine', 'interface',
'fhrp_group', 'is_primary', 'is_oob', 'dns_name', 'owner', 'description', 'comments', 'tags',
]
def __init__(self, data=None, *args, **kwargs):
@@ -356,6 +407,15 @@ class IPAddressImportForm(PrimaryModelImportForm):
if data:
# Limit Prefix queryset by assigned vrf
vrf = data.get('vrf')
query = Q()
if vrf:
query &= Q(**{f"vrf__{self.fields['vrf'].to_field_name}": vrf})
queryset = self.fields['prefix'].queryset.filter(query)
self.fields['prefix'].queryset = queryset
# Limit interface queryset by assigned device
if data.get('device'):
self.fields['interface'].queryset = Interface.objects.filter(

View File

@@ -211,6 +211,12 @@ class PrefixFilterForm(ContactModelFilterForm, TenancyFilterForm, PrimaryModelFi
choices=PREFIX_MASK_LENGTH_CHOICES,
label=_('Mask length')
)
aggregate_id = DynamicModelMultipleChoiceField(
queryset=Aggregate.objects.all(),
required=False,
label=_('Aggregate'),
null_option='Global'
)
vrf_id = DynamicModelMultipleChoiceField(
queryset=VRF.objects.all(),
required=False,
@@ -285,10 +291,18 @@ class IPRangeFilterForm(ContactModelFilterForm, TenancyFilterForm, PrimaryModelF
model = IPRange
fieldsets = (
FieldSet('q', 'filter_id', 'tag', 'owner_id'),
FieldSet('family', 'vrf_id', 'status', 'role_id', 'mark_populated', 'mark_utilized', name=_('Attributes')),
FieldSet(
'prefix', 'family', 'vrf_id', 'status', 'role_id', 'mark_populated', 'mark_utilized', name=_('Attributes')
),
FieldSet('tenant_group_id', 'tenant_id', name=_('Tenant')),
FieldSet('contact', 'contact_role', 'contact_group', name=_('Contacts')),
)
prefix = DynamicModelMultipleChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Prefix'),
null_option='None'
)
family = forms.ChoiceField(
required=False,
choices=add_blank_choice(IPAddressFamilyChoices),
@@ -333,7 +347,7 @@ class IPAddressFilterForm(ContactModelFilterForm, TenancyFilterForm, PrimaryMode
fieldsets = (
FieldSet('q', 'filter_id', 'tag', 'owner_id'),
FieldSet(
'parent', 'family', 'status', 'role', 'mask_length', 'assigned_to_interface', 'dns_name',
'prefix', 'parent', 'family', 'status', 'role', 'mask_length', 'assigned_to_interface', 'dns_name',
name=_('Attributes')
),
FieldSet('vrf_id', 'present_in_vrf_id', name=_('VRF')),
@@ -341,7 +355,7 @@ class IPAddressFilterForm(ContactModelFilterForm, TenancyFilterForm, PrimaryMode
FieldSet('device_id', 'virtual_machine_id', name=_('Device/VM')),
FieldSet('contact', 'contact_role', 'contact_group', name=_('Contacts')),
)
selector_fields = ('filter_id', 'q', 'region_id', 'group_id', 'parent', 'status', 'role')
selector_fields = ('filter_id', 'q', 'region_id', 'group_id', 'prefix_id', 'parent', 'status', 'role')
parent = forms.CharField(
required=False,
widget=forms.TextInput(
@@ -361,6 +375,11 @@ class IPAddressFilterForm(ContactModelFilterForm, TenancyFilterForm, PrimaryMode
choices=IPADDRESS_MASK_LENGTH_CHOICES,
label=_('Mask length')
)
prefix_id = DynamicModelMultipleChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Prefix'),
)
vrf_id = DynamicModelMultipleChoiceField(
queryset=VRF.objects.all(),
required=False,

View File

@@ -241,6 +241,11 @@ class PrefixForm(TenancyForm, ScopedForm, PrimaryModelForm):
class IPRangeForm(TenancyForm, PrimaryModelForm):
prefix = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
label=_('Prefix')
)
vrf = DynamicModelChoiceField(
queryset=VRF.objects.all(),
required=False,
@@ -255,8 +260,8 @@ class IPRangeForm(TenancyForm, PrimaryModelForm):
fieldsets = (
FieldSet(
'vrf', 'start_address', 'end_address', 'role', 'status', 'mark_populated', 'mark_utilized', 'description',
'tags', name=_('IP Range')
'prefix', 'vrf', 'start_address', 'end_address', 'role', 'status', 'mark_populated', 'mark_utilized',
'description', 'tags', name=_('IP Range')
),
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
)
@@ -264,12 +269,21 @@ class IPRangeForm(TenancyForm, PrimaryModelForm):
class Meta:
model = IPRange
fields = [
'vrf', 'start_address', 'end_address', 'status', 'role', 'tenant_group', 'tenant', 'mark_populated',
'mark_utilized', 'description', 'owner', 'comments', 'tags',
'prefix', 'vrf', 'start_address', 'end_address', 'status', 'role', 'tenant_group', 'tenant',
'mark_populated', 'mark_utilized', 'description', 'owner', 'comments', 'tags',
]
class IPAddressForm(TenancyForm, PrimaryModelForm):
prefix = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
context={
'vrf': 'vrf',
},
selector=True,
label=_('Prefix'),
)
interface = DynamicModelChoiceField(
queryset=Interface.objects.all(),
required=False,
@@ -315,7 +329,7 @@ class IPAddressForm(TenancyForm, PrimaryModelForm):
)
fieldsets = (
FieldSet('address', 'status', 'role', 'vrf', 'dns_name', 'description', 'tags', name=_('IP Address')),
FieldSet('prefix', 'address', 'status', 'role', 'vrf', 'dns_name', 'description', 'tags', name=_('IP Address')),
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
FieldSet(
TabbedGroups(
@@ -331,8 +345,8 @@ class IPAddressForm(TenancyForm, PrimaryModelForm):
class Meta:
model = IPAddress
fields = [
'address', 'vrf', 'status', 'role', 'dns_name', 'primary_for_parent', 'oob_for_parent', 'nat_inside',
'tenant_group', 'tenant', 'description', 'owner', 'comments', 'tags',
'prefix', 'address', 'vrf', 'status', 'role', 'dns_name', 'primary_for_parent', 'oob_for_parent',
'nat_inside', 'tenant_group', 'tenant', 'description', 'owner', 'comments', 'tags',
]
def __init__(self, *args, **kwargs):
@@ -457,6 +471,15 @@ class IPAddressForm(TenancyForm, PrimaryModelForm):
class IPAddressBulkAddForm(TenancyForm, NetBoxModelForm):
prefix = DynamicModelChoiceField(
queryset=Prefix.objects.all(),
required=False,
context={
'vrf': 'vrf',
},
selector=True,
label=_('Prefix'),
)
vrf = DynamicModelChoiceField(
queryset=VRF.objects.all(),
required=False,
@@ -466,7 +489,7 @@ class IPAddressBulkAddForm(TenancyForm, NetBoxModelForm):
class Meta:
model = IPAddress
fields = [
'address', 'vrf', 'status', 'role', 'dns_name', 'description', 'tenant_group', 'tenant', 'tags',
'address', 'prefix', 'vrf', 'status', 'role', 'dns_name', 'description', 'tenant_group', 'tenant', 'tags',
]

View File

@@ -170,6 +170,7 @@ class FHRPGroupAssignmentFilter(ChangeLoggedModelFilter):
@strawberry_django.filter_type(models.IPAddress, lookups=True)
class IPAddressFilter(ContactFilterMixin, TenancyFilterMixin, PrimaryModelFilter):
prefix: Annotated['PrefixFilter', strawberry.lazy('ipam.graphql.filters')] | None = strawberry_django.filter_field()
address: FilterLookup[str] | None = strawberry_django.filter_field()
vrf: Annotated['VRFFilter', strawberry.lazy('ipam.graphql.filters')] | None = strawberry_django.filter_field()
vrf_id: ID | None = strawberry_django.filter_field()
@@ -221,6 +222,7 @@ class IPAddressFilter(ContactFilterMixin, TenancyFilterMixin, PrimaryModelFilter
@strawberry_django.filter_type(models.IPRange, lookups=True)
class IPRangeFilter(ContactFilterMixin, TenancyFilterMixin, PrimaryModelFilter):
prefix: Annotated['PrefixFilter', strawberry.lazy('ipam.graphql.filters')] | None = strawberry_django.filter_field()
start_address: FilterLookup[str] | None = strawberry_django.filter_field()
end_address: FilterLookup[str] | None = strawberry_django.filter_field()
size: Annotated['IntegerLookup', strawberry.lazy('netbox.graphql.filter_lookups')] | None = (
@@ -275,6 +277,10 @@ class IPRangeFilter(ContactFilterMixin, TenancyFilterMixin, PrimaryModelFilter):
@strawberry_django.filter_type(models.Prefix, lookups=True)
class PrefixFilter(ContactFilterMixin, ScopedFilterMixin, TenancyFilterMixin, PrimaryModelFilter):
aggregate: Annotated['AggregateFilter', strawberry.lazy('ipam.graphql.filters')] | None = (
strawberry_django.filter_field()
)
parent: Annotated['PrefixFilter', strawberry.lazy('ipam.graphql.filters')] | None = strawberry_django.filter_field()
prefix: FilterLookup[str] | None = strawberry_django.filter_field()
vrf: Annotated['VRFFilter', strawberry.lazy('ipam.graphql.filters')] | None = strawberry_django.filter_field()
vrf_id: ID | None = strawberry_django.filter_field()

View File

@@ -143,6 +143,7 @@ class FHRPGroupAssignmentType(BaseObjectType):
)
class IPAddressType(ContactsMixin, BaseIPAddressFamilyType, PrimaryObjectType):
address: str
prefix: Annotated["PrefixType", strawberry.lazy('ipam.graphql.types')] | None
vrf: Annotated["VRFType", strawberry.lazy('ipam.graphql.types')] | None
tenant: Annotated["TenantType", strawberry.lazy('tenancy.graphql.types')] | None
nat_inside: Annotated["IPAddressType", strawberry.lazy('ipam.graphql.types')] | None
@@ -167,6 +168,7 @@ class IPAddressType(ContactsMixin, BaseIPAddressFamilyType, PrimaryObjectType):
pagination=True
)
class IPRangeType(ContactsMixin, PrimaryObjectType):
prefix: Annotated["PrefixType", strawberry.lazy('ipam.graphql.types')] | None
start_address: str
end_address: str
vrf: Annotated["VRFType", strawberry.lazy('ipam.graphql.types')] | None
@@ -181,6 +183,8 @@ class IPRangeType(ContactsMixin, PrimaryObjectType):
pagination=True
)
class PrefixType(ContactsMixin, BaseIPAddressFamilyType, PrimaryObjectType):
aggregate: Annotated["AggregateType", strawberry.lazy('ipam.graphql.types')] | None
parent: Annotated["PrefixType", strawberry.lazy('ipam.graphql.types')] | None
prefix: str
vrf: Annotated["VRFType", strawberry.lazy('ipam.graphql.types')] | None
tenant: Annotated["TenantType", strawberry.lazy('tenancy.graphql.types')] | None

View File

@@ -0,0 +1,58 @@
# Generated by Django 5.0.9 on 2025-02-20 16:49
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0082_add_prefix_network_containment_indexes'),
]
operations = [
migrations.AddField(
model_name='prefix',
name='parent',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='children',
to='ipam.prefix',
),
),
migrations.AddField(
model_name='ipaddress',
name='prefix',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='ip_addresses',
to='ipam.prefix',
),
),
migrations.AddField(
model_name='iprange',
name='prefix',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='ip_ranges',
to='ipam.prefix',
),
),
migrations.AddField(
model_name='prefix',
name='aggregate',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='prefixes',
to='ipam.aggregate',
),
),
]

View File

@@ -0,0 +1,134 @@
# Generated by Django 5.0.9 on 2025-02-20 16:49
import sys
import time
from django.db import migrations, models
from ipam.choices import PrefixStatusChoices
def draw_progress(count, total, length=20):
if total == 0:
return
progress = count / total
percent = int(progress * 100)
bar = int(progress * length)
sys.stdout.write('\r')
sys.stdout.write(f"[{'=' * bar:{length}s}] {percent}%")
sys.stdout.flush()
def set_prefix(apps, schema_editor, model, attr='address', parent_attr='prefix', parent_model='Prefix'):
start = time.time()
ChildModel = apps.get_model('ipam', model)
ParentModel = apps.get_model('ipam', parent_model)
addresses = ChildModel.objects.all()
total = addresses.count()
if total == 0:
return
print('\r\n')
print(f'Migrating {parent_model}')
print('\r\n')
i = 0
draw_progress(i, total, 50)
for address in addresses:
i += 1
address_attr = getattr(address, attr)
prefixes = ParentModel.objects.filter(
prefix__net_contains_or_equals=str(address_attr.ip),
prefix__net_mask_length__lte=address_attr.prefixlen,
)
setattr(address, parent_attr, prefixes.last())
try:
address.save()
except Exception as e:
print(f'Error at {address}')
raise e
draw_progress(i, total, 50)
end = time.time()
print(f"\r\nElapsed Time: {end - start:.2f}s")
def set_ipaddress_prefix(apps, schema_editor):
set_prefix(apps, schema_editor, 'IPAddress')
def unset_ipaddress_prefix(apps, schema_editor):
IPAddress = apps.get_model('ipam', 'IPAddress')
IPAddress.objects.update(prefix=None)
def set_iprange_prefix(apps, schema_editor):
set_prefix(apps, schema_editor, 'IPRange', 'start_address')
def unset_iprange_prefix(apps, schema_editor):
IPRange = apps.get_model('ipam', 'IPRange')
IPRange.objects.update(prefix=None)
def set_prefix_aggregate(apps, schema_editor):
set_prefix(apps, schema_editor, 'Prefix', 'prefix', 'aggregate', 'Aggregate')
def unset_prefix_aggregate(apps, schema_editor):
Prefix = apps.get_model('ipam', 'Prefix')
Prefix.objects.update(aggregate=None)
def set_prefix_parent(apps, schema_editor):
Prefix = apps.get_model('ipam', 'Prefix')
start = time.time()
addresses = Prefix.objects.all()
i = 0
total = addresses.count()
if total == 0:
return
print('\r\n')
draw_progress(i, total, 50)
for address in addresses:
i += 1
prefixes = Prefix.objects.exclude(pk=address.pk).filter(
models.Q(
vrf=address.vrf,
prefix__net_contains=str(address.prefix.ip)
) | models.Q(
vrf=None,
status=PrefixStatusChoices.STATUS_CONTAINER,
prefix__net_contains=str(address.prefix.ip),
)
)
if not prefixes.exists():
draw_progress(i, total, 50)
continue
address.parent = prefixes.last()
address.save()
draw_progress(i, total, 50)
end = time.time()
print(f"\r\nElapsed Time: {end - start:.2f}s")
def unset_prefix_parent(apps, schema_editor):
Prefix = apps.get_model('ipam', 'Prefix')
Prefix.objects.update(parent=None)
class Migration(migrations.Migration):
dependencies = [
('ipam', '0083_ipaddress_iprange_prefix_parent'),
]
operations = [
migrations.RunPython(set_ipaddress_prefix, unset_ipaddress_prefix),
migrations.RunPython(set_iprange_prefix, unset_iprange_prefix),
migrations.RunPython(set_prefix_aggregate, unset_prefix_aggregate),
migrations.RunPython(set_prefix_parent, unset_prefix_parent),
]

View File

@@ -0,0 +1,43 @@
# Generated by Django 5.2.5 on 2025-11-06 03:24
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('ipam', '0083_ipaddress_iprange_prefix_parent_data'),
]
operations = [
pgtrigger.migrations.AddTrigger(
model_name='prefix',
trigger=pgtrigger.compiler.Trigger(
name='ipam_prefix_delete',
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n-- Update Child Prefix's with Prefix's PARENT\nUPDATE ipam_prefix SET parent_id=OLD.parent_id WHERE parent_id=OLD.id;\nRETURN OLD;\n", # noqa: E501
hash='899e1943cb201118be7ef02f36f49747224774f2',
operation='DELETE',
pgid='pgtrigger_ipam_prefix_delete_e7810',
table='ipam_prefix',
when='BEFORE',
),
),
),
pgtrigger.migrations.AddTrigger(
model_name='prefix',
trigger=pgtrigger.compiler.Trigger(
name='ipam_prefix_insert',
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\nUPDATE ipam_prefix\nSET parent_id=NEW.id \nWHERE \n prefix << NEW.prefix\n AND\n (\n (vrf_id = NEW.vrf_id OR (vrf_id IS NULL AND NEW.vrf_id IS NULL))\n OR\n (\n NEW.vrf_id IS NULL\n AND\n NEW.status = 'container'\n AND\n NOT EXISTS(\n SELECT 1 FROM ipam_prefix p WHERE p.prefix >> ipam_prefix.prefix AND p.vrf_id = ipam_prefix.vrf_id\n )\n )\n )\n AND id != NEW.id\n AND NOT EXISTS (\n SELECT 1 FROM ipam_prefix p\n WHERE\n p.prefix >> ipam_prefix.prefix\n AND p.prefix << NEW.prefix\n AND (\n (p.vrf_id = ipam_prefix.vrf_id OR (p.vrf_id IS NULL AND ipam_prefix.vrf_id IS NULL))\n OR\n (p.vrf_id IS NULL AND p.status = 'container')\n )\n AND p.id != NEW.id\n )\n;\nRETURN NEW;\n", # noqa: E501
hash='0e05bbe61861227a9eb710b6c94bae9e0cc7119e',
operation='INSERT',
pgid='pgtrigger_ipam_prefix_insert_46c72',
table='ipam_prefix',
when='AFTER',
),
),
),
]

View File

@@ -0,0 +1,25 @@
# Generated by Django 5.2.5 on 2025-11-25 03:53
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0084_prefix_ipam_prefix_delete_prefix_ipam_prefix_insert'),
]
operations = [
migrations.AlterField(
model_name='prefix',
name='parent',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.DO_NOTHING,
related_name='children',
to='ipam.prefix',
),
),
]

View File

@@ -0,0 +1,65 @@
# Generated by Django 5.2.5 on 2025-11-25 06:00
import pgtrigger.compiler
import pgtrigger.migrations
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('ipam', '0085_alter_prefix_parent'),
]
operations = [
pgtrigger.migrations.RemoveTrigger(
model_name='prefix',
name='ipam_prefix_delete',
),
pgtrigger.migrations.RemoveTrigger(
model_name='prefix',
name='ipam_prefix_insert',
),
pgtrigger.migrations.AddTrigger(
model_name='prefix',
trigger=pgtrigger.compiler.Trigger(
name='ipam_prefix_delete',
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n-- Update Child Prefix's with Prefix's PARENT This is a safe assumption based on the fact that the parent would be the\n-- next direct parent for anything else that could contain this prefix\nUPDATE ipam_prefix SET parent_id=OLD.parent_id WHERE parent_id=OLD.id;\nRETURN OLD;\n", # noqa: E501
hash='ee3f890009c05a3617428158e7b6f3d77317885d',
operation='DELETE',
pgid='pgtrigger_ipam_prefix_delete_e7810',
table='ipam_prefix',
when='BEFORE',
),
),
),
pgtrigger.migrations.AddTrigger(
model_name='prefix',
trigger=pgtrigger.compiler.Trigger(
name='ipam_prefix_insert',
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n-- Update the prefix with the new parent if the parent is the most appropriate prefix\nUPDATE ipam_prefix\nSET parent_id=NEW.id\nWHERE\n prefix << NEW.prefix\n AND\n (\n (vrf_id = NEW.vrf_id OR (vrf_id IS NULL AND NEW.vrf_id IS NULL))\n OR\n (\n NEW.vrf_id IS NULL\n AND\n NEW.status = 'container'\n AND\n NOT EXISTS(\n SELECT 1 FROM ipam_prefix p WHERE p.prefix >> ipam_prefix.prefix AND p.vrf_id = ipam_prefix.vrf_id\n )\n )\n )\n AND id != NEW.id\n AND NOT EXISTS (\n SELECT 1 FROM ipam_prefix p\n WHERE\n p.prefix >> ipam_prefix.prefix\n AND p.prefix << NEW.prefix\n AND (\n (p.vrf_id = ipam_prefix.vrf_id OR (p.vrf_id IS NULL AND ipam_prefix.vrf_id IS NULL))\n OR\n (p.vrf_id IS NULL AND p.status = 'container')\n )\n AND p.id != NEW.id\n )\n;\nRETURN NEW;\n", # noqa: E501
hash='1d71498f09e767183d3b0d29c06c9ac9e2cc000a',
operation='INSERT',
pgid='pgtrigger_ipam_prefix_insert_46c72',
table='ipam_prefix',
when='AFTER',
),
),
),
pgtrigger.migrations.AddTrigger(
model_name='prefix',
trigger=pgtrigger.compiler.Trigger(
name='ipam_prefix_update',
sql=pgtrigger.compiler.UpsertTriggerSql(
func="\n-- When a prefix changes, reassign any IPAddresses that no longer\n-- fall within the new prefix range to the parent prefix (or set null if no parent exists)\nUPDATE ipam_prefix\nSET parent_id = OLD.parent_id\nWHERE\n parent_id = NEW.id\n -- IP address no longer contained within the updated prefix\n AND NOT (prefix << NEW.prefix);\n\n-- Update the prefix with the new parent if the parent is the most appropriate prefix\nUPDATE ipam_prefix\nSET parent_id=NEW.id\nWHERE\n prefix << NEW.prefix\n AND\n (\n (vrf_id = NEW.vrf_id OR (vrf_id IS NULL AND NEW.vrf_id IS NULL))\n OR\n (\n NEW.vrf_id IS NULL\n AND\n NEW.status = 'container'\n AND\n NOT EXISTS(\n SELECT 1 FROM ipam_prefix p WHERE p.prefix >> ipam_prefix.prefix AND p.vrf_id = ipam_prefix.vrf_id\n )\n )\n )\n AND id != NEW.id\n AND NOT EXISTS (\n SELECT 1 FROM ipam_prefix p\n WHERE\n p.prefix >> ipam_prefix.prefix\n AND p.prefix << NEW.prefix\n AND (\n (p.vrf_id = ipam_prefix.vrf_id OR (p.vrf_id IS NULL AND ipam_prefix.vrf_id IS NULL))\n OR\n (p.vrf_id IS NULL AND p.status = 'container')\n )\n AND p.id != NEW.id\n )\n;\nRETURN NEW;\n", # noqa: E501
hash='747230a84703df5a4aa3d32e7f45b5a32525b799',
operation='UPDATE',
pgid='pgtrigger_ipam_prefix_update_e5fca',
table='ipam_prefix',
when='AFTER',
),
),
),
]

View File

@@ -1,4 +1,5 @@
import netaddr
import pgtrigger
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.indexes import GistIndex
@@ -8,6 +9,7 @@ from django.db.models import F
from django.db.models.functions import Cast
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from netaddr.ip import IPNetwork
from dcim.models.mixins import CachedScopeMixin
from ipam.choices import *
@@ -16,6 +18,8 @@ from ipam.fields import IPNetworkField, IPAddressField
from ipam.lookups import Host
from ipam.managers import IPAddressManager
from ipam.querysets import PrefixQuerySet
from ipam.triggers import ipam_prefix_delete_adjust_prefix_parent, ipam_prefix_insert_adjust_prefix_parent, \
ipam_prefix_update_adjust_prefix_parent
from ipam.validators import DNSValidator
from netbox.config import get_config
from netbox.models import OrganizationalModel, PrimaryModel
@@ -185,31 +189,28 @@ class Aggregate(ContactsMixin, GetAvailablePrefixesMixin, PrimaryModel):
return min(utilization, 100)
class Role(OrganizationalModel):
"""
A Role represents the functional role of a Prefix or VLAN; for example, "Customer," "Infrastructure," or
"Management."
"""
weight = models.PositiveSmallIntegerField(
verbose_name=_('weight'),
default=1000
)
class Meta:
ordering = ('weight', 'name')
verbose_name = _('role')
verbose_name_plural = _('roles')
def __str__(self):
return self.name
class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, PrimaryModel):
"""
A Prefix represents an IPv4 or IPv6 network, including mask length. Prefixes can optionally be scoped to certain
areas and/or assigned to VRFs. A Prefix must be assigned a status and may optionally be assigned a used-define Role.
A Prefix can also be assigned to a VLAN where appropriate.
"""
aggregate = models.ForeignKey(
to='ipam.Aggregate',
on_delete=models.SET_NULL, # This is handled by triggers
related_name='prefixes',
blank=True,
null=True,
verbose_name=_('aggregate')
)
parent = models.ForeignKey(
to='ipam.Prefix',
on_delete=models.DO_NOTHING,
related_name='children',
blank=True,
null=True,
verbose_name=_('Prefix')
)
prefix = IPNetworkField(
verbose_name=_('prefix'),
help_text=_('IPv4 or IPv6 network with mask')
@@ -284,8 +285,32 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
verbose_name_plural = _('prefixes')
indexes = (
models.Index(fields=('scope_type', 'scope_id')),
GistIndex(fields=['prefix'], name='ipam_prefix_gist_idx', opclasses=['inet_ops']),
)
GistIndex(
fields=['prefix'],
name='ipam_prefix_gist_idx',
opclasses=['inet_ops'],
),
)
triggers = (
pgtrigger.Trigger(
name='ipam_prefix_delete',
operation=pgtrigger.Delete,
when=pgtrigger.Before,
func=ipam_prefix_delete_adjust_prefix_parent,
),
pgtrigger.Trigger(
name='ipam_prefix_insert',
operation=pgtrigger.Insert,
when=pgtrigger.After,
func=ipam_prefix_insert_adjust_prefix_parent,
),
pgtrigger.Trigger(
name='ipam_prefix_update',
operation=pgtrigger.Update,
when=pgtrigger.After,
func=ipam_prefix_update_adjust_prefix_parent,
),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -301,6 +326,8 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
super().clean()
if self.prefix:
if not isinstance(self.prefix, IPNetwork):
self.prefix = IPNetwork(self.prefix)
# /0 masks are not acceptable
if self.prefix.prefixlen == 0:
@@ -308,6 +335,17 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
'prefix': _("Cannot create prefix with /0 mask.")
})
if self.parent:
if self.prefix not in self.parent.prefix:
raise ValidationError({
'parent': _("Prefix must be part of parent prefix.")
})
if self.parent.status != PrefixStatusChoices.STATUS_CONTAINER and self.vrf != self.parent.vrf:
raise ValidationError({
'vrf': _("VRF must match the parent VRF.")
})
# Enforce unique IP space (if applicable)
if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
duplicate_prefixes = self.get_duplicates()
@@ -321,6 +359,14 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
})
def save(self, *args, **kwargs):
vrf_id = self.vrf.pk if self.vrf else None
if not self.pk and not self.parent:
parent = self.find_parent_prefix(self)
self.parent = parent
elif self.parent and (self.prefix != self._prefix or vrf_id != self._vrf_id):
parent = self.find_parent_prefix(self)
self.parent = parent
if isinstance(self.prefix, netaddr.IPNetwork):
@@ -346,11 +392,11 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
return netaddr.IPAddress(self.prefix).format(netaddr.ipv6_full)
@property
def depth(self):
def depth_count(self):
return self._depth
@property
def children(self):
def children_count(self):
return self._children
def _set_prefix_length(self, value):
@@ -490,11 +536,52 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
return min(utilization, 100)
@classmethod
def find_parent_prefix(cls, network):
prefixes = Prefix.objects.filter(
models.Q(
vrf=network.vrf,
prefix__net_contains=str(network.prefix)
) | models.Q(
vrf=None,
status=PrefixStatusChoices.STATUS_CONTAINER,
prefix__net_contains=str(network.prefix),
)
)
return prefixes.last()
class Role(OrganizationalModel):
"""
A Role represents the functional role of a Prefix or VLAN; for example, "Customer," "Infrastructure," or
"Management."
"""
weight = models.PositiveSmallIntegerField(
verbose_name=_('weight'),
default=1000
)
class Meta:
ordering = ('weight', 'name')
verbose_name = _('role')
verbose_name_plural = _('roles')
def __str__(self):
return self.name
class IPRange(ContactsMixin, PrimaryModel):
"""
A range of IP addresses, defined by start and end addresses.
"""
prefix = models.ForeignKey(
to='ipam.Prefix',
on_delete=models.SET_NULL,
related_name='ip_ranges',
null=True,
blank=True,
verbose_name=_('prefix'),
)
start_address = IPAddressField(
verbose_name=_('start address'),
help_text=_('IPv4 or IPv6 address (with mask)')
@@ -564,6 +651,27 @@ class IPRange(ContactsMixin, PrimaryModel):
super().clean()
if self.start_address and self.end_address:
# If prefix is set, validate suitability
if self.prefix:
# Check that start address and end address are within the prefix range
if self.start_address not in self.prefix.prefix and self.end_address not in self.prefix.prefix:
raise ValidationError({
'start_address': _("Start address must be part of the selected prefix"),
'end_address': _("End address must be part of the selected prefix.")
})
elif self.start_address not in self.prefix.prefix:
raise ValidationError({
'start_address': _("Start address must be part of the selected prefix")
})
elif self.end_address not in self.prefix.prefix:
raise ValidationError({
'end_address': _("End address must be part of the selected prefix.")
})
# Check that VRF matches prefix VRF
if self.vrf != self.prefix.vrf:
raise ValidationError({
'vrf': _("VRF must match the prefix VRF.")
})
# Check that start & end IP versions match
if self.start_address.version != self.end_address.version:
@@ -720,6 +828,14 @@ class IPRange(ContactsMixin, PrimaryModel):
return min(float(child_count) / self.size * 100, 100)
@classmethod
def find_prefix(self, address):
prefixes = Prefix.objects.filter(
models.Q(prefix__net_contains=address.start_address) & Q(prefix__net_contains=address.end_address),
vrf=address.vrf,
)
return prefixes.last()
class IPAddress(ContactsMixin, PrimaryModel):
"""
@@ -732,6 +848,14 @@ class IPAddress(ContactsMixin, PrimaryModel):
for example, when mapping public addresses to private addresses. When an Interface has been assigned an IPAddress
which has a NAT outside IP, that Interface's Device can use either the inside or outside IP as its primary IP.
"""
prefix = models.ForeignKey(
to='ipam.Prefix',
on_delete=models.SET_NULL,
related_name='ip_addresses',
blank=True,
null=True,
verbose_name=_('Prefix')
)
address = IPAddressField(
verbose_name=_('address'),
help_text=_('IPv4 or IPv6 address (with mask)')
@@ -819,6 +943,7 @@ class IPAddress(ContactsMixin, PrimaryModel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._address = self.address
# Denote the original assigned object (if any) for validation in clean()
self._original_assigned_object_id = self.__dict__.get('assigned_object_id')
self._original_assigned_object_type_id = self.__dict__.get('assigned_object_type_id')
@@ -865,6 +990,16 @@ class IPAddress(ContactsMixin, PrimaryModel):
super().clean()
if self.address:
# If prefix is set, validate suitability
if self.prefix:
if self.address not in self.prefix.prefix:
raise ValidationError({
'prefix': _("IP address must be part of the selected prefix.")
})
if self.vrf != self.prefix.vrf:
raise ValidationError({
'vrf': _("IP address VRF must match the prefix VRF.")
})
# /0 masks are not acceptable
if self.address.prefixlen == 0:
@@ -1005,3 +1140,8 @@ class IPAddress(ContactsMixin, PrimaryModel):
def get_role_color(self):
return IPAddressRoleChoices.colors.get(self.role)
@classmethod
def find_prefix(self, address):
prefixes = Prefix.objects.filter(prefix__net_contains=address.address, vrf=address.vrf)
return prefixes.last()

View File

@@ -53,11 +53,12 @@ class IPAddressIndex(SearchIndex):
model = models.IPAddress
fields = (
('address', 100),
('prefix', 200),
('dns_name', 300),
('description', 500),
('comments', 5000),
)
display_attrs = ('vrf', 'tenant', 'status', 'role', 'description')
display_attrs = ('prefix', 'vrf', 'tenant', 'status', 'role', 'description')
@register_search
@@ -66,10 +67,11 @@ class IPRangeIndex(SearchIndex):
fields = (
('start_address', 100),
('end_address', 300),
('prefix', 400),
('description', 500),
('comments', 5000),
)
display_attrs = ('vrf', 'tenant', 'status', 'role', 'description')
display_attrs = ('prefix', 'vrf', 'tenant', 'status', 'role', 'description')
@register_search
@@ -77,10 +79,12 @@ class PrefixIndex(SearchIndex):
model = models.Prefix
fields = (
('prefix', 110),
('parent', 200),
('aggregate', 300),
('description', 500),
('comments', 5000),
)
display_attrs = ('scope', 'vrf', 'tenant', 'vlan', 'status', 'role', 'description')
display_attrs = ('scope', 'aggregate', 'parent', 'vrf', 'tenant', 'vlan', 'status', 'role', 'description')
@register_search

View File

@@ -152,6 +152,10 @@ class PrefixUtilizationColumn(columns.UtilizationColumn):
class PrefixTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
parent = tables.Column(
verbose_name=_('Parent'),
linkify=True
)
prefix = columns.TemplateColumn(
verbose_name=_('Prefix'),
template_code=PREFIX_LINK_WITH_DEPTH,
@@ -230,9 +234,9 @@ class PrefixTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
class Meta(PrimaryModelTable.Meta):
model = Prefix
fields = (
'pk', 'id', 'prefix', 'prefix_flat', 'status', 'children', 'vrf', 'utilization', 'tenant', 'tenant_group',
'scope', 'scope_type', 'vlan_group', 'vlan', 'role', 'is_pool', 'mark_utilized', 'description', 'contacts',
'comments', 'tags', 'created', 'last_updated',
'pk', 'id', 'prefix', 'status', 'parent', 'prefix', 'prefix_flat', 'children', 'vrf', 'utilization',
'tenant', 'tenant_group', 'scope', 'scope_type', 'vlan_group', 'vlan', 'role', 'is_pool', 'mark_utilized',
'contacts', 'description', 'comments', 'tags', 'created', 'last_updated',
)
default_columns = (
'pk', 'prefix', 'status', 'children', 'vrf', 'utilization', 'tenant', 'scope', 'vlan', 'role',
@@ -246,8 +250,11 @@ class PrefixTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
#
# IP ranges
#
class IPRangeTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
prefix = tables.Column(
verbose_name=_('Prefix'),
linkify=True
)
start_address = tables.Column(
verbose_name=_('Start address'),
linkify=True
@@ -284,9 +291,9 @@ class IPRangeTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
class Meta(PrimaryModelTable.Meta):
model = IPRange
fields = (
'pk', 'id', 'start_address', 'end_address', 'size', 'vrf', 'status', 'role', 'tenant', 'tenant_group',
'mark_populated', 'mark_utilized', 'utilization', 'description', 'contacts', 'comments', 'tags',
'created', 'last_updated',
'pk', 'id', 'start_address', 'end_address', 'prefix', 'size', 'vrf', 'status', 'role', 'tenant',
'tenant_group', 'mark_populated', 'mark_utilized', 'utilization', 'description', 'contacts',
'comments', 'tags', 'created', 'last_updated',
)
default_columns = (
'pk', 'start_address', 'end_address', 'size', 'vrf', 'status', 'role', 'tenant', 'description',
@@ -301,10 +308,18 @@ class IPRangeTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
#
class IPAddressTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
prefix = tables.Column(
verbose_name=_('Prefix'),
linkify=True
)
address = tables.TemplateColumn(
template_code=IPADDRESS_LINK,
verbose_name=_('IP Address')
)
prefix = tables.Column(
linkify=True,
verbose_name=_('Prefix')
)
vrf = tables.TemplateColumn(
template_code=VRF_LINK,
verbose_name=_('VRF')
@@ -353,8 +368,9 @@ class IPAddressTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable
class Meta(PrimaryModelTable.Meta):
model = IPAddress
fields = (
'pk', 'id', 'address', 'vrf', 'status', 'role', 'tenant', 'tenant_group', 'nat_inside', 'nat_outside',
'assigned', 'dns_name', 'description', 'comments', 'contacts', 'tags', 'created', 'last_updated',
'pk', 'id', 'address', 'vrf', 'prefix', 'status', 'role', 'tenant', 'tenant_group', 'nat_inside',
'nat_outside', 'assigned', 'dns_name', 'description', 'comments', 'contacts','tags', 'created',
'last_updated',
)
default_columns = (
'pk', 'address', 'vrf', 'status', 'role', 'tenant', 'assigned', 'dns_name', 'description',

View File

@@ -16,12 +16,20 @@ PREFIX_COPY_BUTTON = """
PREFIX_LINK_WITH_DEPTH = """
{% load helpers %}
{% if record.depth %}
<div class="record-depth">
{% for i in record.depth|as_range %}
<span>•</span>
{% endfor %}
</div>
{% if record.depth_count %}
{% if object %}
<div class="record-depth">
{% for i in record.depth_count|parent_depth:object|as_range %}
<span>•</span>
{% endfor %}
</div>
{% else %}
<div class="record-depth">
{% for i in record.depth_count|as_range %}
<span>•</span>
{% endfor %}
</div>
{% endif %}
{% endif %}
""" + PREFIX_LINK

View File

@@ -407,7 +407,8 @@ class RoleTest(APIViewTestCases.APIViewTestCase):
class PrefixTest(APIViewTestCases.APIViewTestCase):
model = Prefix
brief_fields = ['_depth', 'description', 'display', 'family', 'id', 'prefix', 'url']
# TODO: Alter for parent prefix
brief_fields = ['_depth', 'aggregate', 'description', 'display', 'family', 'id', 'parent', 'prefix', 'url']
create_data = [
{
'prefix': '192.168.4.0/24',
@@ -622,7 +623,8 @@ class PrefixTest(APIViewTestCases.APIViewTestCase):
class IPRangeTest(APIViewTestCases.APIViewTestCase):
model = IPRange
brief_fields = ['description', 'display', 'end_address', 'family', 'id', 'start_address', 'url']
# TODO: Alter for parent prefix
brief_fields = ['description', 'display', 'end_address', 'family', 'id', 'prefix', 'start_address', 'url']
create_data = [
{
'start_address': '192.168.4.10/24',
@@ -780,7 +782,8 @@ class IPRangeTest(APIViewTestCases.APIViewTestCase):
class IPAddressTest(APIViewTestCases.APIViewTestCase):
model = IPAddress
brief_fields = ['address', 'description', 'display', 'family', 'id', 'url']
# TODO: Alter for parent prefix
brief_fields = ['address', 'description', 'display', 'family', 'id', 'prefix', 'url']
create_data = [
{
'address': '192.168.0.4/24',

View File

@@ -901,6 +901,10 @@ class PrefixTestCase(TestCase, ChangeLoggedFilterSetTests):
params = {'description': ['foobar1', 'foobar2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
# TODO: Test for parent prefix
# TODO: Test for children?
# TODO: Test for aggregate
class IPRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = IPRange.objects.all()
@@ -1079,6 +1083,7 @@ class IPRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_parent(self):
# TODO: Alter for prefix
params = {'parent': ['10.0.1.0/24', '10.0.2.0/24']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'parent': ['10.0.1.0/25']} # Range 10.0.1.100-199 is not fully contained by 10.0.1.0/25
@@ -1318,6 +1323,7 @@ class IPAddressTestCase(TestCase, ChangeLoggedFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_parent(self):
# TODO: Alter for prefix
params = {'parent': ['10.0.0.0/30', '2001:db8::/126']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8)

View File

@@ -39,6 +39,26 @@ class TestAggregate(TestCase):
class TestIPRange(TestCase):
@classmethod
def setUpTestData(cls):
cls.vrf = VRF.objects.create(name='VRF A', rd='1:1')
cls.prefixes = (
# IPv4
Prefix(prefix='192.0.0.0/16'),
Prefix(prefix='192.0.2.0/24'),
Prefix(prefix='192.0.0.0/16', vrf=cls.vrf),
# IPv6
Prefix(prefix='2001:db8::/32'),
Prefix(prefix='2001:db8::/64'),
)
for prefix in cls.prefixes:
prefix.clean()
prefix.save()
def test_overlapping_range(self):
iprange_192_168 = IPRange.objects.create(
@@ -87,6 +107,69 @@ class TestIPRange(TestCase):
)
iprange_4_198_201.clean()
def test_parent_prefix(self):
ranges = (
IPRange(
start_address=IPNetwork('192.0.0.1/24'),
end_address=IPNetwork('192.0.0.254/24'),
prefix=self.prefixes[0]
),
IPRange(
start_address=IPNetwork('192.0.2.1/24'),
end_address=IPNetwork('192.0.2.254/24'),
prefix=self.prefixes[1]
),
IPRange(
start_address=IPNetwork('192.0.2.1/24'),
end_address=IPNetwork('192.0.2.254/24'),
vrf=self.vrf,
prefix=self.prefixes[2]
),
IPRange(
start_address=IPNetwork('2001:db8::/64'),
end_address=IPNetwork('2001:db8::ffff/64'),
prefix=self.prefixes[4]
),
IPRange(
start_address=IPNetwork('2001:db8:2::/64'),
end_address=IPNetwork('2001:db8:2::ffff/64'),
prefix=self.prefixes[3]
),
)
for range in ranges:
range.clean()
range.save()
self.assertEqual(ranges[0].prefix, self.prefixes[0])
self.assertEqual(ranges[1].prefix, self.prefixes[1])
self.assertEqual(ranges[2].prefix, self.prefixes[2])
self.assertEqual(ranges[3].prefix, self.prefixes[4])
def test_parent_prefix_change(self):
range = IPRange(
start_address=IPNetwork('192.0.1.1/24'),
end_address=IPNetwork('192.0.1.254/24'),
prefix=self.prefixes[0]
)
range.clean()
range.save()
prefix = Prefix(prefix='192.0.0.0/17')
prefix.clean()
prefix.save()
range.refresh_from_db()
self.assertEqual(range.prefix, prefix)
# TODO: Prefix Altered
# TODO: Prefix Deleted
# TODO: Prefix falls outside range
# TODO: Prefix VRF does not match range VRF
class TestPrefix(TestCase):
@@ -169,19 +252,21 @@ class TestPrefix(TestCase):
prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
)
ips = IPAddress.objects.bulk_create((
IPAddress(address=IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(address=IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(address=IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(address=IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
IPAddress(prefix=parent_prefix, address=IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(prefix=parent_prefix, address=IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(prefix=parent_prefix, address=IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(prefix=parent_prefix, address=IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
))
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
child_ip_pks = {p.pk for p in parent_prefix.ip_addresses.all()}
# Global container should return all children
self.assertSetEqual(child_ip_pks, {ips[0].pk, ips[1].pk, ips[2].pk, ips[3].pk})
parent_prefix.vrf = vrfs[0]
parent_prefix.save()
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
parent_prefix.refresh_from_db()
child_ip_pks = {p.pk for p in parent_prefix.ip_addresses.all()}
# VRF container is limited to its own VRF
self.assertSetEqual(child_ip_pks, {ips[1].pk})
@@ -344,17 +429,21 @@ class TestPrefixHierarchy(TestCase):
prefixes = (
# IPv4
Prefix(prefix='10.0.0.0/8', _depth=0, _children=2),
Prefix(prefix='10.0.0.0/16', _depth=1, _children=1),
Prefix(prefix='10.0.0.0/24', _depth=2, _children=0),
Prefix(prefix='10.0.0.0/8'),
Prefix(prefix='10.0.0.0/16'),
Prefix(prefix='10.0.0.0/24'),
Prefix(prefix='192.168.0.0/16'),
# IPv6
Prefix(prefix='2001:db8::/32', _depth=0, _children=2),
Prefix(prefix='2001:db8::/40', _depth=1, _children=1),
Prefix(prefix='2001:db8::/48', _depth=2, _children=0),
Prefix(prefix='2001:db8::/32'),
Prefix(prefix='2001:db8::/40'),
Prefix(prefix='2001:db8::/48'),
)
Prefix.objects.bulk_create(prefixes)
for prefix in prefixes:
prefix.clean()
prefix.save()
def test_create_prefix4(self):
# Create 10.0.0.0/12
@@ -362,15 +451,19 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 2)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2].parent.prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[3].parent.prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[3]._depth, 3)
self.assertEqual(prefixes[3]._children, 0)
@@ -380,15 +473,19 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 2)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2].parent.prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[3].parent.prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[3]._depth, 3)
self.assertEqual(prefixes[3]._children, 0)
@@ -400,12 +497,15 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 2)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2].parent.prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 0)
@@ -417,12 +517,15 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 2)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2].parent.prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 0)
@@ -437,14 +540,17 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
prefixes = Prefix.objects.filter(vrf=vrf)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 0)
@@ -459,14 +565,17 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
prefixes = Prefix.objects.filter(vrf=vrf)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 0)
@@ -476,9 +585,11 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
@@ -488,9 +599,11 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
@@ -500,15 +613,20 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2].parent.prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[2]._depth, 1)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
# TODO: How to we resolve the parent for duplicate prefixes
self.assertEqual(prefixes[3].parent.prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[3]._depth, 2)
self.assertEqual(prefixes[3]._children, 0)
@@ -518,20 +636,158 @@ class TestPrefixHierarchy(TestCase):
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0].parent, None)
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[1].parent.prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2].parent.prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[2]._depth, 1)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[3].parent.prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[3]._depth, 2)
self.assertEqual(prefixes[3]._children, 0)
class TestTriggers(TestCase):
"""
Test the automatic updating of depth and child count in response to changes made within
the prefix hierarchy.
"""
@classmethod
def setUpTestData(cls):
vrfs = (
VRF(name='VRF A'),
VRF(name='VRF B'),
)
for vrf in vrfs:
vrf.clean()
vrf.save()
cls.prefixes = (
# IPv4
Prefix(prefix='10.0.0.0/8'),
Prefix(prefix='10.0.0.0/16'),
Prefix(prefix='10.0.0.0/22'),
Prefix(prefix='10.0.0.0/23'),
Prefix(prefix='10.0.2.0/23'),
Prefix(prefix='10.0.0.0/24'),
Prefix(prefix='10.0.1.0/24'),
Prefix(prefix='10.0.2.0/24'),
Prefix(prefix='10.0.3.0/24'),
Prefix(prefix='10.1.0.0/16', status='container'),
Prefix(prefix='10.1.0.0/22', vrf=vrfs[0]),
Prefix(prefix='10.1.0.0/23', vrf=vrfs[0]),
Prefix(prefix='10.1.2.0/23', vrf=vrfs[0]),
Prefix(prefix='10.1.0.0/24', vrf=vrfs[0]),
Prefix(prefix='10.1.1.0/24', vrf=vrfs[0]),
Prefix(prefix='10.1.2.0/24', vrf=vrfs[0]),
Prefix(prefix='10.1.3.0/24', vrf=vrfs[0]),
)
for prefix in cls.prefixes:
prefix.clean()
prefix.save()
def test_current_hierarchy(self):
self.assertIsNone(Prefix.objects.get(prefix='10.0.0.0/8').parent)
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/16').parent, Prefix.objects.get(prefix='10.0.0.0/8'))
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/22').parent, Prefix.objects.get(prefix='10.0.0.0/16'))
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/23').parent, Prefix.objects.get(prefix='10.0.0.0/22'))
self.assertEqual(Prefix.objects.get(prefix='10.0.2.0/23').parent, Prefix.objects.get(prefix='10.0.0.0/22'))
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/24').parent, Prefix.objects.get(prefix='10.0.0.0/23'))
self.assertEqual(Prefix.objects.get(prefix='10.0.1.0/24').parent, Prefix.objects.get(prefix='10.0.0.0/23'))
self.assertEqual(Prefix.objects.get(prefix='10.0.2.0/24').parent, Prefix.objects.get(prefix='10.0.2.0/23'))
self.assertEqual(Prefix.objects.get(prefix='10.0.3.0/24').parent, Prefix.objects.get(prefix='10.0.2.0/23'))
def test_basic_insert(self):
pfx = Prefix.objects.create(prefix='10.0.0.0/21')
self.assertIsNotNone(Prefix.objects.get(prefix='10.0.0.0/22').parent)
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/22').parent, pfx)
def test_vrf_insert(self):
vrf = VRF.objects.get(name='VRF A')
pfx = Prefix.objects.create(prefix='10.1.0.0/21', vrf=vrf)
parent = Prefix.objects.get(prefix='10.1.0.0/16')
self.assertIsNotNone(Prefix.objects.get(prefix='10.1.0.0/21').parent)
self.assertEqual(Prefix.objects.get(prefix='10.1.0.0/21').parent, parent)
self.assertIsNotNone(Prefix.objects.get(prefix='10.1.0.0/22').parent)
self.assertEqual(Prefix.objects.get(prefix='10.1.0.0/22').parent, pfx)
def test_basic_delete(self):
Prefix.objects.get(prefix='10.0.0.0/23').delete()
parent = Prefix.objects.get(prefix='10.0.0.0/22')
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.0.1.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.0.2.0/24').parent, Prefix.objects.get(prefix='10.0.2.0/23'))
def test_vrf_delete(self):
Prefix.objects.get(prefix='10.1.0.0/23').delete()
parent = Prefix.objects.get(prefix='10.1.0.0/22')
self.assertEqual(Prefix.objects.get(prefix='10.1.0.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.1.1.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.1.2.0/24').parent, Prefix.objects.get(prefix='10.1.2.0/23'))
def test_basic_update(self):
pfx = Prefix.objects.get(prefix='10.0.0.0/23')
parent = Prefix.objects.get(prefix='10.0.0.0/22')
pfx.prefix = '10.3.0.0/23'
pfx.parent = Prefix.objects.get(prefix='10.0.0.0/8')
pfx.clean()
pfx.save()
self.assertEqual(Prefix.objects.get(prefix='10.0.0.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.0.1.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.0.2.0/24').parent, Prefix.objects.get(prefix='10.0.2.0/23'))
def test_vrf_update(self):
pfx = Prefix.objects.get(prefix='10.1.0.0/23')
parent = Prefix.objects.get(prefix='10.1.0.0/22')
pfx.prefix = '10.3.0.0/23'
pfx.parent = None
pfx.clean()
pfx.save()
self.assertEqual(Prefix.objects.get(prefix='10.1.0.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.1.1.0/24').parent, parent)
self.assertEqual(Prefix.objects.get(prefix='10.1.2.0/24').parent, Prefix.objects.get(prefix='10.1.2.0/23'))
# TODO: Test VRF Changes
class TestIPAddress(TestCase):
"""
Test the automatic updating of depth and child count in response to changes made within
the prefix hierarchy.
"""
@classmethod
def setUpTestData(cls):
cls.vrf = VRF.objects.create(name='VRF A', rd='1:1')
cls.prefixes = (
# IPv4
Prefix(prefix='192.0.0.0/16'),
Prefix(prefix='192.0.2.0/24'),
Prefix(prefix='192.0.0.0/16', vrf=cls.vrf),
# IPv6
Prefix(prefix='2001:db8::/32'),
Prefix(prefix='2001:db8::/64'),
)
for prefix in cls.prefixes:
prefix.clean()
prefix.save()
def test_get_duplicates(self):
ips = IPAddress.objects.bulk_create((
@@ -543,6 +799,44 @@ class TestIPAddress(TestCase):
self.assertSetEqual(set(duplicate_ip_pks), {ips[1].pk, ips[2].pk})
def test_parent_prefix(self):
ips = (
IPAddress(address=IPNetwork('192.0.0.1/24'), prefix=self.prefixes[0]),
IPAddress(address=IPNetwork('192.0.2.1/24'), prefix=self.prefixes[1]),
IPAddress(address=IPNetwork('192.0.2.1/24'), vrf=self.vrf, prefix=self.prefixes[2]),
IPAddress(address=IPNetwork('2001:db8::/64'), prefix=self.prefixes[4]),
IPAddress(address=IPNetwork('2001:db8:2::/64'), prefix=self.prefixes[3]),
)
for ip in ips:
ip.clean()
ip.save()
self.assertEqual(ips[0].prefix, self.prefixes[0])
self.assertEqual(ips[1].prefix, self.prefixes[1])
self.assertEqual(ips[2].prefix, self.prefixes[2])
self.assertEqual(ips[3].prefix, self.prefixes[4])
self.assertEqual(ips[4].prefix, self.prefixes[3])
def test_parent_prefix_change(self):
ip = IPAddress(address=IPNetwork('192.0.1.1/24'), prefix=self.prefixes[0])
ip.clean()
ip.save()
prefix = Prefix(prefix='192.0.1.0/17')
prefix.clean()
prefix.save()
ip.refresh_from_db()
self.assertEqual(ip.prefix, prefix)
# TODO: Prefix Altered
# TODO: Prefix Deleted
# TODO: Prefix does not contain IP Address
# TODO: Prefix VRF does not match IP Address VRF
#
# Uniqueness enforcement tests
#
@@ -559,13 +853,20 @@ class TestIPAddress(TestCase):
self.assertRaises(ValidationError, duplicate_ip.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
vrf = VRF.objects.get(rd='1:1')
vrf.enforce_unique = False
vrf.clean()
vrf.save()
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
vrf = VRF.objects.get(rd='1:1')
vrf.enforce_unique = True
vrf.clean()
vrf.save()
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)

View File

@@ -421,6 +421,7 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
tags = create_tags('Alpha', 'Bravo', 'Charlie')
# TODO: Alter for prefix
cls.form_data = {
'prefix': IPNetwork('192.0.2.0/24'),
'scope_type': ContentType.objects.get_for_model(Site).pk,
@@ -436,6 +437,7 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
}
site = sites[0].pk
# TODO: Alter for prefix
cls.csv_data = (
"vrf,prefix,status,scope_type,scope_id",
f"VRF 1,10.4.0.0/16,active,dcim.site,{site}",
@@ -443,6 +445,7 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
f"VRF 1,10.6.0.0/16,active,dcim.site,{site}",
)
# TODO: Alter for prefix
cls.csv_update_data = (
"id,description,status",
f"{prefixes[0].pk},New description 7,{PrefixStatusChoices.STATUS_RESERVED}",
@@ -450,6 +453,7 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
f"{prefixes[2].pk},New description 9,{PrefixStatusChoices.STATUS_RESERVED}",
)
# TODO: Alter for prefix
cls.bulk_edit_data = {
'vrf': vrfs[1].pk,
'tenant': None,
@@ -477,9 +481,9 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
def test_prefix_ipranges(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.168.0.0/16'))
ip_ranges = (
IPRange(start_address='192.168.0.1/24', end_address='192.168.0.100/24', size=99),
IPRange(start_address='192.168.1.1/24', end_address='192.168.1.100/24', size=99),
IPRange(start_address='192.168.2.1/24', end_address='192.168.2.100/24', size=99),
IPRange(prefix=prefix, start_address='192.168.0.1/24', end_address='192.168.0.100/24', size=99),
IPRange(prefix=prefix, start_address='192.168.1.1/24', end_address='192.168.1.100/24', size=99),
IPRange(prefix=prefix, start_address='192.168.2.1/24', end_address='192.168.2.100/24', size=99),
)
IPRange.objects.bulk_create(ip_ranges)
self.assertEqual(prefix.get_child_ranges().count(), 3)
@@ -491,12 +495,12 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
def test_prefix_ipaddresses(self):
prefix = Prefix.objects.create(prefix=IPNetwork('192.168.0.0/16'))
ip_addresses = (
IPAddress(address=IPNetwork('192.168.0.1/16')),
IPAddress(address=IPNetwork('192.168.0.2/16')),
IPAddress(address=IPNetwork('192.168.0.3/16')),
IPAddress(prefix=prefix, address=IPNetwork('192.168.0.1/16')),
IPAddress(prefix=prefix, address=IPNetwork('192.168.0.2/16')),
IPAddress(prefix=prefix, address=IPNetwork('192.168.0.3/16')),
)
IPAddress.objects.bulk_create(ip_addresses)
self.assertEqual(prefix.get_child_ips().count(), 3)
self.assertEqual(prefix.ip_addresses.all().count(), 3)
url = reverse('ipam:prefix_ipaddresses', kwargs={'pk': prefix.pk})
self.assertHttpStatus(self.client.get(url), 200)
@@ -594,6 +598,7 @@ class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
tags = create_tags('Alpha', 'Bravo', 'Charlie')
# TODO: Alter for prefix
cls.form_data = {
'start_address': IPNetwork('192.0.5.10/24'),
'end_address': IPNetwork('192.0.5.100/24'),
@@ -607,6 +612,7 @@ class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
'tags': [t.pk for t in tags],
}
# TODO: Alter for prefix
cls.csv_data = (
"vrf,start_address,end_address,status",
"VRF 1,10.1.0.1/16,10.1.9.254/16,active",
@@ -614,6 +620,7 @@ class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
"VRF 1,10.3.0.1/16,10.3.9.254/16,active",
)
# TODO: Alter for prefix
cls.csv_update_data = (
"id,description,status",
f"{ip_ranges[0].pk},New description 7,{IPRangeStatusChoices.STATUS_RESERVED}",
@@ -621,6 +628,7 @@ class IPRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
f"{ip_ranges[2].pk},New description 9,{IPRangeStatusChoices.STATUS_RESERVED}",
)
# TODO: Alter for prefix
cls.bulk_edit_data = {
'vrf': vrfs[1].pk,
'tenant': None,
@@ -687,6 +695,7 @@ class IPAddressTestCase(ViewTestCases.PrimaryObjectViewTestCase):
),
)
FHRPGroup.objects.bulk_create(fhrp_groups)
# TODO: Alter for prefix
cls.form_data = {
'vrf': vrfs[1].pk,
'address': IPNetwork('192.0.2.99/24'),
@@ -699,6 +708,7 @@ class IPAddressTestCase(ViewTestCases.PrimaryObjectViewTestCase):
'tags': [t.pk for t in tags],
}
# TODO: Alter for prefix
cls.csv_data = (
"vrf,address,status,fhrp_group",
"VRF 1,192.0.2.4/24,active,FHRP Group 1",
@@ -706,6 +716,7 @@ class IPAddressTestCase(ViewTestCases.PrimaryObjectViewTestCase):
"VRF 1,192.0.2.6/24,active,FHRP Group 3",
)
# TODO: Alter for prefix
cls.csv_update_data = (
"id,description,status",
f"{ipaddresses[0].pk},New description 7,{IPAddressStatusChoices.STATUS_RESERVED}",
@@ -713,6 +724,7 @@ class IPAddressTestCase(ViewTestCases.PrimaryObjectViewTestCase):
f"{ipaddresses[2].pk},New description 9,{IPAddressStatusChoices.STATUS_RESERVED}",
)
# TODO: Alter for prefix
cls.bulk_edit_data = {
'vrf': vrfs[1].pk,
'tenant': None,

91
netbox/ipam/triggers.py Normal file
View File

@@ -0,0 +1,91 @@
ipam_prefix_delete_adjust_prefix_parent = """
-- Update Child Prefix's with Prefix's PARENT This is a safe assumption based on the fact that the parent would be the
-- next direct parent for anything else that could contain this prefix
UPDATE ipam_prefix SET parent_id=OLD.parent_id WHERE parent_id=OLD.id;
RETURN OLD;
"""
ipam_prefix_insert_adjust_prefix_parent = """
-- Update the prefix with the new parent if the parent is the most appropriate prefix
UPDATE ipam_prefix
SET parent_id=NEW.id
WHERE
prefix << NEW.prefix
AND
(
(vrf_id = NEW.vrf_id OR (vrf_id IS NULL AND NEW.vrf_id IS NULL))
OR
(
NEW.vrf_id IS NULL
AND
NEW.status = 'container'
AND
NOT EXISTS(
SELECT 1 FROM ipam_prefix p WHERE p.prefix >> ipam_prefix.prefix AND p.vrf_id = ipam_prefix.vrf_id
)
)
)
AND id != NEW.id
AND NOT EXISTS (
SELECT 1 FROM ipam_prefix p
WHERE
p.prefix >> ipam_prefix.prefix
AND p.prefix << NEW.prefix
AND (
(p.vrf_id = ipam_prefix.vrf_id OR (p.vrf_id IS NULL AND ipam_prefix.vrf_id IS NULL))
OR
(p.vrf_id IS NULL AND p.status = 'container')
)
AND p.id != NEW.id
)
;
RETURN NEW;
"""
ipam_prefix_update_adjust_prefix_parent = """
-- When a prefix changes, reassign any IPAddresses that no longer
-- fall within the new prefix range to the parent prefix (or set null if no parent exists)
UPDATE ipam_prefix
SET parent_id = OLD.parent_id
WHERE
parent_id = NEW.id
-- IP address no longer contained within the updated prefix
AND NOT (prefix << NEW.prefix);
-- Update the prefix with the new parent if the parent is the most appropriate prefix
UPDATE ipam_prefix
SET parent_id=NEW.id
WHERE
prefix << NEW.prefix
AND
(
(vrf_id = NEW.vrf_id OR (vrf_id IS NULL AND NEW.vrf_id IS NULL))
OR
(
NEW.vrf_id IS NULL
AND
NEW.status = 'container'
AND
NOT EXISTS(
SELECT 1 FROM ipam_prefix p WHERE p.prefix >> ipam_prefix.prefix AND p.vrf_id = ipam_prefix.vrf_id
)
)
)
AND id != NEW.id
AND NOT EXISTS (
SELECT 1 FROM ipam_prefix p
WHERE
p.prefix >> ipam_prefix.prefix
AND p.prefix << NEW.prefix
AND (
(p.vrf_id = ipam_prefix.vrf_id OR (p.vrf_id IS NULL AND ipam_prefix.vrf_id IS NULL))
OR
(p.vrf_id IS NULL AND p.status = 'container')
)
AND p.id != NEW.id
)
;
RETURN NEW;
"""

View File

@@ -687,13 +687,13 @@ class PrefixIPAddressesView(generic.ObjectChildrenView):
template_name = 'ipam/prefix/ip_addresses.html'
tab = ViewTab(
label=_('IP Addresses'),
badge=lambda x: x.get_child_ips().count(),
badge=lambda x: x.ip_addresses.count(),
permission='ipam.view_ipaddress',
weight=700
)
def get_children(self, request, parent):
return parent.get_child_ips().restrict(request.user, 'view').prefetch_related('vrf', 'tenant', 'tenant__group')
return parent.ip_addresses.restrict(request.user, 'view').prefetch_related('vrf', 'tenant', 'tenant__group')
def prep_table_data(self, request, queryset, parent):
if not request.GET.get('q') and not get_table_ordering(request, self.table):

View File

@@ -454,6 +454,7 @@ INSTALLED_APPS = [
'sorl.thumbnail',
'taggit',
'timezone_field',
'pgtrigger',
'core',
'account',
'circuits',

View File

@@ -14,6 +14,10 @@
<th scope="row">{% trans "Family" %}</th>
<td>IPv{{ object.family }}</td>
</tr>
<tr>
<th scope="row">{% trans "Prefix" %}</th>
<td>{{ object.prefix|linkify|placeholder }}</td>
</tr>
<tr>
<th scope="row">{% trans "VRF" %}</th>
<td>

View File

@@ -14,6 +14,7 @@
<div class="row">
<h2 class="col-9 offset-3">{% trans "IP Addresses" %}</h2>
</div>
{% render_field model_form.prefix %}
{% render_field form.pattern %}
{% render_field model_form.status %}
{% render_field model_form.role %}

View File

@@ -13,6 +13,10 @@
<th scope="row">{% trans "Family" %}</th>
<td>IPv{{ object.family }}</td>
</tr>
<tr>
<th scope="row">{% trans "Prefix" %}</th>
<td>{{ object.prefix|linkify|placeholder }}</td>
</tr>
<tr>
<th scope="row">{% trans "Starting Address" %}</th>
<td>{{ object.start_address }}</td>

View File

@@ -109,7 +109,7 @@
{% endif %}
</td>
</tr>
{% with child_ip_count=object.get_child_ips.count %}
{% with child_ip_count=object.ip_addresses.count %}
<tr>
<th scope="row">{% trans "Child IPs" %}</th>
<td>

View File

@@ -280,6 +280,26 @@ def as_range(n):
return range(n)
@register.filter()
def parent_depth(n, parent=None):
"""
Return the depth of a node based on the parent's depth
"""
parent_depth = 0
if parent and hasattr(parent, 'depth_count'):
parent_depth = parent.depth_count + 1
elif parent and hasattr(parent, 'depth'):
try:
parent_depth = int(parent.depth) + 1
except TypeError:
pass
try:
depth = int(n) - int(parent_depth)
except TypeError:
return n
return depth
@register.filter()
def meters_to_feet(n):
"""

View File

@@ -7,6 +7,7 @@ django-graphiql-debug-toolbar==0.2.0
django-htmx==1.27.0
django-mptt==0.17.0
django-pglocks==1.0.4
django-pgtrigger==4.15.4
django-prometheus==2.4.1
django-redis==6.0.0
django-rich==2.2.0