mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-14 01:41:22 -06:00
Merge branch 'develop' into 3090-interface-filtering
This commit is contained in:
commit
b23eaeca54
@ -3,8 +3,18 @@
|
|||||||
## Enhancements
|
## Enhancements
|
||||||
|
|
||||||
* [#2050](https://github.com/netbox-community/netbox/issues/2050) - Preview image attachments when hovering the link
|
* [#2050](https://github.com/netbox-community/netbox/issues/2050) - Preview image attachments when hovering the link
|
||||||
|
* [#2589](https://github.com/netbox-community/netbox/issues/2589) - Toggle for showing available prefixes/ip addresses
|
||||||
* [#3090](https://github.com/netbox-community/netbox/issues/3090) - Add filter field for device interfaces
|
* [#3090](https://github.com/netbox-community/netbox/issues/3090) - Add filter field for device interfaces
|
||||||
* [#3187](https://github.com/netbox-community/netbox/issues/3187) - Add rack selection field to rack elevations
|
* [#3187](https://github.com/netbox-community/netbox/issues/3187) - Add rack selection field to rack elevations
|
||||||
|
* [#3851](https://github.com/netbox-community/netbox/issues/3851) - Allow passing initial data to custom script forms
|
||||||
|
|
||||||
|
## Bug Fixes
|
||||||
|
|
||||||
|
* [#3589](https://github.com/netbox-community/netbox/issues/3589) - Fix validation on tagged VLANs of an interface
|
||||||
|
* [#3853](https://github.com/netbox-community/netbox/issues/3853) - Fix device role link on config context view
|
||||||
|
* [#3856](https://github.com/netbox-community/netbox/issues/3856) - Allow filtering VM interfaces by multiple MAC addresses
|
||||||
|
* [#3857](https://github.com/netbox-community/netbox/issues/3857) - Fix group custom links rendering
|
||||||
|
* [#3862](https://github.com/netbox-community/netbox/issues/3862) - Allow filtering device components by multiple device names
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
@ -8,6 +8,13 @@ from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilte
|
|||||||
from .constants import *
|
from .constants import *
|
||||||
from .models import Circuit, CircuitTermination, CircuitType, Provider
|
from .models import Circuit, CircuitTermination, CircuitType, Provider
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'CircuitFilter',
|
||||||
|
'CircuitTerminationFilter',
|
||||||
|
'CircuitTypeFilter',
|
||||||
|
'ProviderFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ProviderFilter(CustomFieldFilterSet, CreatedUpdatedFilterSet):
|
class ProviderFilter(CustomFieldFilterSet, CreatedUpdatedFilterSet):
|
||||||
id__in = NumericInFilter(
|
id__in = NumericInFilter(
|
||||||
|
287
netbox/circuits/tests/test_filters.py
Normal file
287
netbox/circuits/tests/test_filters.py
Normal file
@ -0,0 +1,287 @@
|
|||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from circuits.constants import CIRCUIT_STATUS_ACTIVE, CIRCUIT_STATUS_OFFLINE, CIRCUIT_STATUS_PLANNED
|
||||||
|
from circuits.filters import *
|
||||||
|
from circuits.models import Circuit, CircuitTermination, CircuitType, Provider
|
||||||
|
from dcim.models import Region, Site
|
||||||
|
|
||||||
|
|
||||||
|
class ProviderTestCase(TestCase):
|
||||||
|
queryset = Provider.objects.all()
|
||||||
|
filterset = ProviderFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
providers = (
|
||||||
|
Provider(name='Provider 1', slug='provider-1', asn=65001, account='1234'),
|
||||||
|
Provider(name='Provider 2', slug='provider-2', asn=65002, account='2345'),
|
||||||
|
Provider(name='Provider 3', slug='provider-3', asn=65003, account='3456'),
|
||||||
|
Provider(name='Provider 4', slug='provider-4', asn=65004, account='4567'),
|
||||||
|
Provider(name='Provider 5', slug='provider-5', asn=65005, account='5678'),
|
||||||
|
)
|
||||||
|
Provider.objects.bulk_create(providers)
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
circuit_types = (
|
||||||
|
CircuitType(name='Test Circuit Type 1', slug='test-circuit-type-1'),
|
||||||
|
CircuitType(name='Test Circuit Type 2', slug='test-circuit-type-2'),
|
||||||
|
)
|
||||||
|
CircuitType.objects.bulk_create(circuit_types)
|
||||||
|
|
||||||
|
circuits = (
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 1'),
|
||||||
|
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 1'),
|
||||||
|
)
|
||||||
|
Circuit.objects.bulk_create(circuits)
|
||||||
|
|
||||||
|
CircuitTermination.objects.bulk_create((
|
||||||
|
CircuitTermination(circuit=circuits[0], site=sites[0], term_side='A', port_speed=1000),
|
||||||
|
CircuitTermination(circuit=circuits[1], site=sites[0], term_side='A', port_speed=1000),
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Provider 1', 'Provider 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['provider-1', 'provider-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_asn(self):
|
||||||
|
params = {'asn': ['65001', '65002']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_account(self):
|
||||||
|
params = {'account': ['1234', '2345']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class CircuitTypeTestCase(TestCase):
|
||||||
|
queryset = CircuitType.objects.all()
|
||||||
|
filterset = CircuitTypeFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
CircuitType.objects.bulk_create((
|
||||||
|
CircuitType(name='Circuit Type 1', slug='circuit-type-1'),
|
||||||
|
CircuitType(name='Circuit Type 2', slug='circuit-type-2'),
|
||||||
|
CircuitType(name='Circuit Type 3', slug='circuit-type-3'),
|
||||||
|
))
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
params = {'id': [self.queryset.first().pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Circuit Type 1']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['circuit-type-1']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
|
||||||
|
class CircuitTestCase(TestCase):
|
||||||
|
queryset = Circuit.objects.all()
|
||||||
|
filterset = CircuitFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
circuit_types = (
|
||||||
|
CircuitType(name='Test Circuit Type 1', slug='test-circuit-type-1'),
|
||||||
|
CircuitType(name='Test Circuit Type 2', slug='test-circuit-type-2'),
|
||||||
|
)
|
||||||
|
CircuitType.objects.bulk_create(circuit_types)
|
||||||
|
|
||||||
|
providers = (
|
||||||
|
Provider(name='Provider 1', slug='provider-1'),
|
||||||
|
Provider(name='Provider 2', slug='provider-2'),
|
||||||
|
)
|
||||||
|
Provider.objects.bulk_create(providers)
|
||||||
|
|
||||||
|
circuits = (
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 1', install_date='2020-01-01', commit_rate=1000, status=CIRCUIT_STATUS_ACTIVE),
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 2', install_date='2020-01-02', commit_rate=2000, status=CIRCUIT_STATUS_ACTIVE),
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 3', install_date='2020-01-03', commit_rate=3000, status=CIRCUIT_STATUS_PLANNED),
|
||||||
|
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 4', install_date='2020-01-04', commit_rate=4000, status=CIRCUIT_STATUS_PLANNED),
|
||||||
|
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 5', install_date='2020-01-05', commit_rate=5000, status=CIRCUIT_STATUS_OFFLINE),
|
||||||
|
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 6', install_date='2020-01-06', commit_rate=6000, status=CIRCUIT_STATUS_OFFLINE),
|
||||||
|
)
|
||||||
|
Circuit.objects.bulk_create(circuits)
|
||||||
|
|
||||||
|
circuit_terminations = ((
|
||||||
|
CircuitTermination(circuit=circuits[0], site=sites[0], term_side='A', port_speed=1000),
|
||||||
|
CircuitTermination(circuit=circuits[1], site=sites[1], term_side='A', port_speed=1000),
|
||||||
|
CircuitTermination(circuit=circuits[2], site=sites[2], term_side='A', port_speed=1000),
|
||||||
|
))
|
||||||
|
CircuitTermination.objects.bulk_create(circuit_terminations)
|
||||||
|
|
||||||
|
def test_cid(self):
|
||||||
|
params = {'cid': ['Test Circuit 1', 'Test Circuit 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_install_date(self):
|
||||||
|
params = {'install_date': ['2020-01-01', '2020-01-02']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_commit_rate(self):
|
||||||
|
params = {'commit_rate': ['1000', '2000']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_provider(self):
|
||||||
|
provider = Provider.objects.first()
|
||||||
|
params = {'provider_id': [provider.pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
params = {'provider': [provider.slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_type(self):
|
||||||
|
circuit_type = CircuitType.objects.first()
|
||||||
|
params = {'type_id': [circuit_type.pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
params = {'type': [circuit_type.slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_status(self):
|
||||||
|
params = {'status': [CIRCUIT_STATUS_ACTIVE, CIRCUIT_STATUS_PLANNED]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class CircuitTerminationTestCase(TestCase):
|
||||||
|
queryset = CircuitTermination.objects.all()
|
||||||
|
filterset = CircuitTerminationFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1'),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2'),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3'),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
circuit_types = (
|
||||||
|
CircuitType(name='Test Circuit Type 1', slug='test-circuit-type-1'),
|
||||||
|
)
|
||||||
|
CircuitType.objects.bulk_create(circuit_types)
|
||||||
|
|
||||||
|
providers = (
|
||||||
|
Provider(name='Provider 1', slug='provider-1'),
|
||||||
|
)
|
||||||
|
Provider.objects.bulk_create(providers)
|
||||||
|
|
||||||
|
circuits = (
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 1'),
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 2'),
|
||||||
|
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 3'),
|
||||||
|
)
|
||||||
|
Circuit.objects.bulk_create(circuits)
|
||||||
|
|
||||||
|
circuit_terminations = ((
|
||||||
|
CircuitTermination(circuit=circuits[0], site=sites[0], term_side='A', port_speed=1000, upstream_speed=1000, xconnect_id='ABC'),
|
||||||
|
CircuitTermination(circuit=circuits[0], site=sites[1], term_side='Z', port_speed=1000, upstream_speed=1000, xconnect_id='DEF'),
|
||||||
|
CircuitTermination(circuit=circuits[1], site=sites[1], term_side='A', port_speed=2000, upstream_speed=2000, xconnect_id='GHI'),
|
||||||
|
CircuitTermination(circuit=circuits[1], site=sites[2], term_side='Z', port_speed=2000, upstream_speed=2000, xconnect_id='JKL'),
|
||||||
|
CircuitTermination(circuit=circuits[2], site=sites[2], term_side='A', port_speed=3000, upstream_speed=3000, xconnect_id='MNO'),
|
||||||
|
CircuitTermination(circuit=circuits[2], site=sites[0], term_side='Z', port_speed=3000, upstream_speed=3000, xconnect_id='PQR'),
|
||||||
|
))
|
||||||
|
CircuitTermination.objects.bulk_create(circuit_terminations)
|
||||||
|
|
||||||
|
def test_term_side(self):
|
||||||
|
params = {'term_side': 'A'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_port_speed(self):
|
||||||
|
params = {'port_speed': ['1000', '2000']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_upstream_speed(self):
|
||||||
|
params = {'upstream_speed': ['1000', '2000']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_xconnect_id(self):
|
||||||
|
params = {'xconnect_id': ['ABC', 'DEF']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_circuit_id(self):
|
||||||
|
circuits = Circuit.objects.all()[:2]
|
||||||
|
params = {'circuit_id': [circuits[0].pk, circuits[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
@ -21,6 +21,45 @@ from .models import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'CableFilter',
|
||||||
|
'ConsoleConnectionFilter',
|
||||||
|
'ConsolePortFilter',
|
||||||
|
'ConsolePortTemplateFilter',
|
||||||
|
'ConsoleServerPortFilter',
|
||||||
|
'ConsoleServerPortTemplateFilter',
|
||||||
|
'DeviceBayFilter',
|
||||||
|
'DeviceBayTemplateFilter',
|
||||||
|
'DeviceFilter',
|
||||||
|
'DeviceRoleFilter',
|
||||||
|
'DeviceTypeFilter',
|
||||||
|
'FrontPortFilter',
|
||||||
|
'FrontPortTemplateFilter',
|
||||||
|
'InterfaceConnectionFilter',
|
||||||
|
'InterfaceFilter',
|
||||||
|
'InterfaceTemplateFilter',
|
||||||
|
'InventoryItemFilter',
|
||||||
|
'ManufacturerFilter',
|
||||||
|
'PlatformFilter',
|
||||||
|
'PowerConnectionFilter',
|
||||||
|
'PowerFeedFilter',
|
||||||
|
'PowerOutletFilter',
|
||||||
|
'PowerOutletTemplateFilter',
|
||||||
|
'PowerPanelFilter',
|
||||||
|
'PowerPortFilter',
|
||||||
|
'PowerPortTemplateFilter',
|
||||||
|
'RackFilter',
|
||||||
|
'RackGroupFilter',
|
||||||
|
'RackReservationFilter',
|
||||||
|
'RackRoleFilter',
|
||||||
|
'RearPortFilter',
|
||||||
|
'RearPortTemplateFilter',
|
||||||
|
'RegionFilter',
|
||||||
|
'SiteFilter',
|
||||||
|
'VirtualChassisFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RegionFilter(NameSlugSearchFilterSet):
|
class RegionFilter(NameSlugSearchFilterSet):
|
||||||
parent_id = django_filters.ModelMultipleChoiceFilter(
|
parent_id = django_filters.ModelMultipleChoiceFilter(
|
||||||
queryset=Region.objects.all(),
|
queryset=Region.objects.all(),
|
||||||
@ -646,7 +685,8 @@ class DeviceComponentFilterSet(django_filters.FilterSet):
|
|||||||
queryset=Device.objects.all(),
|
queryset=Device.objects.all(),
|
||||||
label='Device (ID)',
|
label='Device (ID)',
|
||||||
)
|
)
|
||||||
device = django_filters.ModelChoiceFilter(
|
device = django_filters.ModelMultipleChoiceFilter(
|
||||||
|
field_name='device__name',
|
||||||
queryset=Device.objects.all(),
|
queryset=Device.objects.all(),
|
||||||
to_field_name='name',
|
to_field_name='name',
|
||||||
label='Device (name)',
|
label='Device (name)',
|
||||||
|
@ -74,6 +74,17 @@ class InterfaceCommonForm:
|
|||||||
elif self.cleaned_data['mode'] == IFACE_MODE_TAGGED_ALL:
|
elif self.cleaned_data['mode'] == IFACE_MODE_TAGGED_ALL:
|
||||||
self.cleaned_data['tagged_vlans'] = []
|
self.cleaned_data['tagged_vlans'] = []
|
||||||
|
|
||||||
|
# Validate tagged VLANs; must be a global VLAN or in the same site
|
||||||
|
elif self.cleaned_data['mode'] == IFACE_MODE_TAGGED:
|
||||||
|
valid_sites = [None, self.cleaned_data['device'].site]
|
||||||
|
invalid_vlans = [str(v) for v in tagged_vlans if v.site not in valid_sites]
|
||||||
|
|
||||||
|
if invalid_vlans:
|
||||||
|
raise forms.ValidationError({
|
||||||
|
'tagged_vlans': "The tagged VLANs ({}) must belong to the same site as the interface's parent "
|
||||||
|
"device/VM, or they must be global".format(', '.join(invalid_vlans))
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
class BulkRenameForm(forms.Form):
|
class BulkRenameForm(forms.Form):
|
||||||
"""
|
"""
|
||||||
@ -2278,36 +2289,6 @@ class InterfaceForm(InterfaceCommonForm, BootstrapMixin, forms.ModelForm):
|
|||||||
device__in=[self.instance.device, self.instance.device.get_vc_master()], type=IFACE_TYPE_LAG
|
device__in=[self.instance.device, self.instance.device.get_vc_master()], type=IFACE_TYPE_LAG
|
||||||
)
|
)
|
||||||
|
|
||||||
# Limit VLan choices to those in: global vlans, global groups, the current site's group, the current site
|
|
||||||
vlan_choices = []
|
|
||||||
global_vlans = VLAN.objects.filter(site=None, group=None)
|
|
||||||
vlan_choices.append(
|
|
||||||
('Global', [(vlan.pk, vlan) for vlan in global_vlans])
|
|
||||||
)
|
|
||||||
for group in VLANGroup.objects.filter(site=None):
|
|
||||||
global_group_vlans = VLAN.objects.filter(group=group)
|
|
||||||
vlan_choices.append(
|
|
||||||
(group.name, [(vlan.pk, vlan) for vlan in global_group_vlans])
|
|
||||||
)
|
|
||||||
|
|
||||||
site = getattr(self.instance.parent, 'site', None)
|
|
||||||
if site is not None:
|
|
||||||
|
|
||||||
# Add non-grouped site VLANs
|
|
||||||
site_vlans = VLAN.objects.filter(site=site, group=None)
|
|
||||||
vlan_choices.append((site.name, [(vlan.pk, vlan) for vlan in site_vlans]))
|
|
||||||
|
|
||||||
# Add grouped site VLANs
|
|
||||||
for group in VLANGroup.objects.filter(site=site):
|
|
||||||
site_group_vlans = VLAN.objects.filter(group=group)
|
|
||||||
vlan_choices.append((
|
|
||||||
'{} / {}'.format(group.site.name, group.name),
|
|
||||||
[(vlan.pk, vlan) for vlan in site_group_vlans]
|
|
||||||
))
|
|
||||||
|
|
||||||
self.fields['untagged_vlan'].choices = [(None, '---------')] + vlan_choices
|
|
||||||
self.fields['tagged_vlans'].choices = vlan_choices
|
|
||||||
|
|
||||||
|
|
||||||
class InterfaceCreateForm(InterfaceCommonForm, ComponentForm, forms.Form):
|
class InterfaceCreateForm(InterfaceCommonForm, ComponentForm, forms.Form):
|
||||||
name_pattern = ExpandableNameField(
|
name_pattern = ExpandableNameField(
|
||||||
@ -2388,36 +2369,6 @@ class InterfaceCreateForm(InterfaceCommonForm, ComponentForm, forms.Form):
|
|||||||
else:
|
else:
|
||||||
self.fields['lag'].queryset = Interface.objects.none()
|
self.fields['lag'].queryset = Interface.objects.none()
|
||||||
|
|
||||||
# Limit VLan choices to those in: global vlans, global groups, the current site's group, the current site
|
|
||||||
vlan_choices = []
|
|
||||||
global_vlans = VLAN.objects.filter(site=None, group=None)
|
|
||||||
vlan_choices.append(
|
|
||||||
('Global', [(vlan.pk, vlan) for vlan in global_vlans])
|
|
||||||
)
|
|
||||||
for group in VLANGroup.objects.filter(site=None):
|
|
||||||
global_group_vlans = VLAN.objects.filter(group=group)
|
|
||||||
vlan_choices.append(
|
|
||||||
(group.name, [(vlan.pk, vlan) for vlan in global_group_vlans])
|
|
||||||
)
|
|
||||||
|
|
||||||
site = getattr(self.parent, 'site', None)
|
|
||||||
if site is not None:
|
|
||||||
|
|
||||||
# Add non-grouped site VLANs
|
|
||||||
site_vlans = VLAN.objects.filter(site=site, group=None)
|
|
||||||
vlan_choices.append((site.name, [(vlan.pk, vlan) for vlan in site_vlans]))
|
|
||||||
|
|
||||||
# Add grouped site VLANs
|
|
||||||
for group in VLANGroup.objects.filter(site=site):
|
|
||||||
site_group_vlans = VLAN.objects.filter(group=group)
|
|
||||||
vlan_choices.append((
|
|
||||||
'{} / {}'.format(group.site.name, group.name),
|
|
||||||
[(vlan.pk, vlan) for vlan in site_group_vlans]
|
|
||||||
))
|
|
||||||
|
|
||||||
self.fields['untagged_vlan'].choices = [(None, '---------')] + vlan_choices
|
|
||||||
self.fields['tagged_vlans'].choices = vlan_choices
|
|
||||||
|
|
||||||
|
|
||||||
class InterfaceBulkEditForm(InterfaceCommonForm, BootstrapMixin, AddRemoveTagsForm, BulkEditForm):
|
class InterfaceBulkEditForm(InterfaceCommonForm, BootstrapMixin, AddRemoveTagsForm, BulkEditForm):
|
||||||
pk = forms.ModelMultipleChoiceField(
|
pk = forms.ModelMultipleChoiceField(
|
||||||
@ -2500,36 +2451,6 @@ class InterfaceBulkEditForm(InterfaceCommonForm, BootstrapMixin, AddRemoveTagsFo
|
|||||||
else:
|
else:
|
||||||
self.fields['lag'].choices = []
|
self.fields['lag'].choices = []
|
||||||
|
|
||||||
# Limit VLan choices to those in: global vlans, global groups, the current site's group, the current site
|
|
||||||
vlan_choices = []
|
|
||||||
global_vlans = VLAN.objects.filter(site=None, group=None)
|
|
||||||
vlan_choices.append(
|
|
||||||
('Global', [(vlan.pk, vlan) for vlan in global_vlans])
|
|
||||||
)
|
|
||||||
for group in VLANGroup.objects.filter(site=None):
|
|
||||||
global_group_vlans = VLAN.objects.filter(group=group)
|
|
||||||
vlan_choices.append(
|
|
||||||
(group.name, [(vlan.pk, vlan) for vlan in global_group_vlans])
|
|
||||||
)
|
|
||||||
if self.parent_obj is not None:
|
|
||||||
site = getattr(self.parent_obj, 'site', None)
|
|
||||||
if site is not None:
|
|
||||||
|
|
||||||
# Add non-grouped site VLANs
|
|
||||||
site_vlans = VLAN.objects.filter(site=site, group=None)
|
|
||||||
vlan_choices.append((site.name, [(vlan.pk, vlan) for vlan in site_vlans]))
|
|
||||||
|
|
||||||
# Add grouped site VLANs
|
|
||||||
for group in VLANGroup.objects.filter(site=site):
|
|
||||||
site_group_vlans = VLAN.objects.filter(group=group)
|
|
||||||
vlan_choices.append((
|
|
||||||
'{} / {}'.format(group.site.name, group.name),
|
|
||||||
[(vlan.pk, vlan) for vlan in site_group_vlans]
|
|
||||||
))
|
|
||||||
|
|
||||||
self.fields['untagged_vlan'].choices = [(None, '---------')] + vlan_choices
|
|
||||||
self.fields['tagged_vlans'].choices = vlan_choices
|
|
||||||
|
|
||||||
|
|
||||||
class InterfaceBulkRenameForm(BulkRenameForm):
|
class InterfaceBulkRenameForm(BulkRenameForm):
|
||||||
pk = forms.ModelMultipleChoiceField(
|
pk = forms.ModelMultipleChoiceField(
|
||||||
|
2381
netbox/dcim/tests/test_filters.py
Normal file
2381
netbox/dcim/tests/test_filters.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -8,6 +8,20 @@ from .constants import *
|
|||||||
from .models import ConfigContext, CustomField, Graph, ExportTemplate, ObjectChange, Tag, TopologyMap
|
from .models import ConfigContext, CustomField, Graph, ExportTemplate, ObjectChange, Tag, TopologyMap
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'ConfigContextFilter',
|
||||||
|
'CreatedUpdatedFilterSet',
|
||||||
|
'CustomFieldFilter',
|
||||||
|
'CustomFieldFilterSet',
|
||||||
|
'ExportTemplateFilter',
|
||||||
|
'GraphFilter',
|
||||||
|
'LocalConfigContextFilter',
|
||||||
|
'ObjectChangeFilter',
|
||||||
|
'TagFilter',
|
||||||
|
'TopologyMapFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class CustomFieldFilter(django_filters.Filter):
|
class CustomFieldFilter(django_filters.Filter):
|
||||||
"""
|
"""
|
||||||
Filter objects by the presence of a CustomFieldValue. The filter's name is used as the CustomField name.
|
Filter objects by the presence of a CustomFieldValue. The filter's name is used as the CustomField name.
|
||||||
|
@ -263,12 +263,12 @@ class BaseScript:
|
|||||||
def run(self, data):
|
def run(self, data):
|
||||||
raise NotImplementedError("The script must define a run() method.")
|
raise NotImplementedError("The script must define a run() method.")
|
||||||
|
|
||||||
def as_form(self, data=None, files=None):
|
def as_form(self, data=None, files=None, initial=None):
|
||||||
"""
|
"""
|
||||||
Return a Django form suitable for populating the context data required to run this Script.
|
Return a Django form suitable for populating the context data required to run this Script.
|
||||||
"""
|
"""
|
||||||
vars = self._get_vars()
|
vars = self._get_vars()
|
||||||
form = ScriptForm(vars, data, files, commit_default=getattr(self.Meta, 'commit_default', True))
|
form = ScriptForm(vars, data, files, initial=initial, commit_default=getattr(self.Meta, 'commit_default', True))
|
||||||
|
|
||||||
return form
|
return form
|
||||||
|
|
||||||
|
@ -68,8 +68,9 @@ def custom_links(obj):
|
|||||||
text_rendered = render_jinja2(cl.text, context)
|
text_rendered = render_jinja2(cl.text, context)
|
||||||
if text_rendered:
|
if text_rendered:
|
||||||
link_target = ' target="_blank"' if cl.new_window else ''
|
link_target = ' target="_blank"' if cl.new_window else ''
|
||||||
|
link_rendered = render_jinja2(cl.url, context)
|
||||||
links_rendered.append(
|
links_rendered.append(
|
||||||
GROUP_LINK.format(cl.url, link_target, cl.text)
|
GROUP_LINK.format(link_rendered, link_target, text_rendered)
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
links_rendered.append(
|
links_rendered.append(
|
||||||
|
181
netbox/extras/tests/test_filters.py
Normal file
181
netbox/extras/tests/test_filters.py
Normal file
@ -0,0 +1,181 @@
|
|||||||
|
from django.contrib.contenttypes.models import ContentType
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from dcim.models import DeviceRole, Platform, Region, Site
|
||||||
|
from extras.constants import *
|
||||||
|
from extras.filters import *
|
||||||
|
from extras.models import ConfigContext, ExportTemplate, Graph
|
||||||
|
from tenancy.models import Tenant, TenantGroup
|
||||||
|
|
||||||
|
|
||||||
|
class GraphTestCase(TestCase):
|
||||||
|
queryset = Graph.objects.all()
|
||||||
|
filterset = GraphFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
graphs = (
|
||||||
|
Graph(name='Graph 1', type=GRAPH_TYPE_DEVICE, source='http://example.com/1'),
|
||||||
|
Graph(name='Graph 2', type=GRAPH_TYPE_INTERFACE, source='http://example.com/2'),
|
||||||
|
Graph(name='Graph 3', type=GRAPH_TYPE_SITE, source='http://example.com/3'),
|
||||||
|
)
|
||||||
|
Graph.objects.bulk_create(graphs)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': 'Graph 1'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_type(self):
|
||||||
|
params = {'type': GRAPH_TYPE_DEVICE}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
|
||||||
|
class ExportTemplateTestCase(TestCase):
|
||||||
|
queryset = ExportTemplate.objects.all()
|
||||||
|
filterset = ExportTemplateFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
content_types = ContentType.objects.filter(model__in=['site', 'rack', 'device'])
|
||||||
|
|
||||||
|
export_templates = (
|
||||||
|
ExportTemplate(name='Export Template 1', content_type=content_types[0], template_language=TEMPLATE_LANGUAGE_DJANGO, template_code='TESTING'),
|
||||||
|
ExportTemplate(name='Export Template 2', content_type=content_types[1], template_language=TEMPLATE_LANGUAGE_JINJA2, template_code='TESTING'),
|
||||||
|
ExportTemplate(name='Export Template 3', content_type=content_types[2], template_language=TEMPLATE_LANGUAGE_JINJA2, template_code='TESTING'),
|
||||||
|
)
|
||||||
|
ExportTemplate.objects.bulk_create(export_templates)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': 'Export Template 1'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_content_type(self):
|
||||||
|
params = {'content_type': ContentType.objects.get(model='site').pk}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_template_language(self):
|
||||||
|
params = {'template_language': TEMPLATE_LANGUAGE_JINJA2}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigContextTestCase(TestCase):
|
||||||
|
queryset = ConfigContext.objects.all()
|
||||||
|
filterset = ConfigContextFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1'),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2'),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3'),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
device_roles = (
|
||||||
|
DeviceRole(name='Device Role 1', slug='device-role-1'),
|
||||||
|
DeviceRole(name='Device Role 2', slug='device-role-2'),
|
||||||
|
DeviceRole(name='Device Role 3', slug='device-role-3'),
|
||||||
|
)
|
||||||
|
DeviceRole.objects.bulk_create(device_roles)
|
||||||
|
|
||||||
|
platforms = (
|
||||||
|
Platform(name='Platform 1', slug='platform-1'),
|
||||||
|
Platform(name='Platform 2', slug='platform-2'),
|
||||||
|
Platform(name='Platform 3', slug='platform-3'),
|
||||||
|
)
|
||||||
|
Platform.objects.bulk_create(platforms)
|
||||||
|
|
||||||
|
tenant_groups = (
|
||||||
|
TenantGroup(name='Tenant Group 1', slug='tenant-group-1'),
|
||||||
|
TenantGroup(name='Tenant Group 2', slug='tenant-group-2'),
|
||||||
|
TenantGroup(name='Tenant Group 3', slug='tenant-group-3'),
|
||||||
|
)
|
||||||
|
TenantGroup.objects.bulk_create(tenant_groups)
|
||||||
|
|
||||||
|
tenants = (
|
||||||
|
Tenant(name='Tenant 1', slug='tenant-1'),
|
||||||
|
Tenant(name='Tenant 2', slug='tenant-2'),
|
||||||
|
Tenant(name='Tenant 3', slug='tenant-3'),
|
||||||
|
)
|
||||||
|
Tenant.objects.bulk_create(tenants)
|
||||||
|
|
||||||
|
for i in range(0, 3):
|
||||||
|
is_active = bool(i % 2)
|
||||||
|
c = ConfigContext.objects.create(
|
||||||
|
name='Config Context {}'.format(i + 1),
|
||||||
|
is_active=is_active,
|
||||||
|
data='{"foo": 123}'
|
||||||
|
)
|
||||||
|
c.regions.set([regions[i]])
|
||||||
|
c.sites.set([sites[i]])
|
||||||
|
c.roles.set([device_roles[i]])
|
||||||
|
c.platforms.set([platforms[i]])
|
||||||
|
c.tenant_groups.set([tenant_groups[i]])
|
||||||
|
c.tenants.set([tenants[i]])
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': 'Config Context 1'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_is_active(self):
|
||||||
|
params = {'is_active': True}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
params = {'is_active': False}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_role(self):
|
||||||
|
device_roles = DeviceRole.objects.all()[:2]
|
||||||
|
params = {'role_id': [device_roles[0].pk, device_roles[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'role': [device_roles[0].slug, device_roles[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_platform(self):
|
||||||
|
platforms = Platform.objects.all()[:2]
|
||||||
|
params = {'platform_id': [platforms[0].pk, platforms[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'platform': [platforms[0].slug, platforms[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_tenant_group(self):
|
||||||
|
tenant_groups = TenantGroup.objects.all()[:2]
|
||||||
|
params = {'tenant_group_id': [tenant_groups[0].pk, tenant_groups[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'tenant_group': [tenant_groups[0].slug, tenant_groups[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_tenant_(self):
|
||||||
|
tenants = Tenant.objects.all()[:2]
|
||||||
|
params = {'tenant_id': [tenants[0].pk, tenants[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'tenant': [tenants[0].slug, tenants[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: ObjectChangeFilter test
|
@ -392,7 +392,7 @@ class ScriptView(PermissionRequiredMixin, View):
|
|||||||
def get(self, request, module, name):
|
def get(self, request, module, name):
|
||||||
|
|
||||||
script = self._get_script(module, name)
|
script = self._get_script(module, name)
|
||||||
form = script.as_form()
|
form = script.as_form(initial=request.GET)
|
||||||
|
|
||||||
return render(request, 'extras/script.html', {
|
return render(request, 'extras/script.html', {
|
||||||
'module': module,
|
'module': module,
|
||||||
|
@ -13,6 +13,19 @@ from .constants import *
|
|||||||
from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
|
from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'AggregateFilter',
|
||||||
|
'IPAddressFilter',
|
||||||
|
'PrefixFilter',
|
||||||
|
'RIRFilter',
|
||||||
|
'RoleFilter',
|
||||||
|
'ServiceFilter',
|
||||||
|
'VLANFilter',
|
||||||
|
'VLANGroupFilter',
|
||||||
|
'VRFFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class VRFFilter(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
|
class VRFFilter(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
|
||||||
id__in = NumericInFilter(
|
id__in = NumericInFilter(
|
||||||
field_name='id',
|
field_name='id',
|
||||||
|
645
netbox/ipam/tests/test_filters.py
Normal file
645
netbox/ipam/tests/test_filters.py
Normal file
@ -0,0 +1,645 @@
|
|||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Region, Site
|
||||||
|
from ipam.constants import *
|
||||||
|
from ipam.filters import *
|
||||||
|
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
|
||||||
|
from virtualization.models import Cluster, ClusterType, VirtualMachine
|
||||||
|
|
||||||
|
|
||||||
|
class VRFTestCase(TestCase):
|
||||||
|
queryset = VRF.objects.all()
|
||||||
|
filterset = VRFFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
vrfs = (
|
||||||
|
VRF(name='VRF 1', rd='65000:100', enforce_unique=False),
|
||||||
|
VRF(name='VRF 2', rd='65000:200', enforce_unique=False),
|
||||||
|
VRF(name='VRF 3', rd='65000:300', enforce_unique=False),
|
||||||
|
VRF(name='VRF 4', rd='65000:400', enforce_unique=True),
|
||||||
|
VRF(name='VRF 5', rd='65000:500', enforce_unique=True),
|
||||||
|
VRF(name='VRF 6', rd='65000:600', enforce_unique=True),
|
||||||
|
)
|
||||||
|
VRF.objects.bulk_create(vrfs)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['VRF 1', 'VRF 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_rd(self):
|
||||||
|
params = {'rd': ['65000:100', '65000:200']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_enforce_unique(self):
|
||||||
|
params = {'enforce_unique': 'true'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
params = {'enforce_unique': 'false'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
|
||||||
|
class RIRTestCase(TestCase):
|
||||||
|
queryset = RIR.objects.all()
|
||||||
|
filterset = RIRFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
rirs = (
|
||||||
|
RIR(name='RIR 1', slug='rir-1', is_private=False),
|
||||||
|
RIR(name='RIR 2', slug='rir-2', is_private=False),
|
||||||
|
RIR(name='RIR 3', slug='rir-3', is_private=False),
|
||||||
|
RIR(name='RIR 4', slug='rir-4', is_private=True),
|
||||||
|
RIR(name='RIR 5', slug='rir-5', is_private=True),
|
||||||
|
RIR(name='RIR 6', slug='rir-6', is_private=True),
|
||||||
|
)
|
||||||
|
RIR.objects.bulk_create(rirs)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['RIR 1', 'RIR 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['rir-1', 'rir-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_is_private(self):
|
||||||
|
params = {'is_private': 'true'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
params = {'is_private': 'false'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
|
||||||
|
class AggregateTestCase(TestCase):
|
||||||
|
queryset = Aggregate.objects.all()
|
||||||
|
filterset = AggregateFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
rirs = (
|
||||||
|
RIR(name='RIR 1', slug='rir-1'),
|
||||||
|
RIR(name='RIR 2', slug='rir-2'),
|
||||||
|
RIR(name='RIR 3', slug='rir-3'),
|
||||||
|
)
|
||||||
|
RIR.objects.bulk_create(rirs)
|
||||||
|
|
||||||
|
aggregates = (
|
||||||
|
Aggregate(family=4, prefix='10.1.0.0/16', rir=rirs[0], date_added='2020-01-01'),
|
||||||
|
Aggregate(family=4, prefix='10.2.0.0/16', rir=rirs[0], date_added='2020-01-02'),
|
||||||
|
Aggregate(family=4, prefix='10.3.0.0/16', rir=rirs[1], date_added='2020-01-03'),
|
||||||
|
Aggregate(family=6, prefix='2001:db8:1::/48', rir=rirs[1], date_added='2020-01-04'),
|
||||||
|
Aggregate(family=6, prefix='2001:db8:2::/48', rir=rirs[2], date_added='2020-01-05'),
|
||||||
|
Aggregate(family=6, prefix='2001:db8:3::/48', rir=rirs[2], date_added='2020-01-06'),
|
||||||
|
)
|
||||||
|
Aggregate.objects.bulk_create(aggregates)
|
||||||
|
|
||||||
|
def test_family(self):
|
||||||
|
params = {'family': '4'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_date_added(self):
|
||||||
|
params = {'date_added': ['2020-01-01', '2020-01-02']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
# TODO: Test for multiple values
|
||||||
|
def test_prefix(self):
|
||||||
|
params = {'prefix': '10.1.0.0/16'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_rir(self):
|
||||||
|
rirs = RIR.objects.all()[:2]
|
||||||
|
params = {'rir_id': [rirs[0].pk, rirs[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'rir': [rirs[0].slug, rirs[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
|
||||||
|
class RoleTestCase(TestCase):
|
||||||
|
queryset = Role.objects.all()
|
||||||
|
filterset = RoleFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
roles = (
|
||||||
|
Role(name='Role 1', slug='role-1'),
|
||||||
|
Role(name='Role 2', slug='role-2'),
|
||||||
|
Role(name='Role 3', slug='role-3'),
|
||||||
|
)
|
||||||
|
Role.objects.bulk_create(roles)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Role 1', 'Role 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['role-1', 'role-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class PrefixTestCase(TestCase):
|
||||||
|
queryset = Prefix.objects.all()
|
||||||
|
filterset = PrefixFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
vrfs = (
|
||||||
|
VRF(name='VRF 1', rd='65000:100'),
|
||||||
|
VRF(name='VRF 2', rd='65000:200'),
|
||||||
|
VRF(name='VRF 3', rd='65000:300'),
|
||||||
|
)
|
||||||
|
VRF.objects.bulk_create(vrfs)
|
||||||
|
|
||||||
|
vlans = (
|
||||||
|
VLAN(vid=1, name='VLAN 1'),
|
||||||
|
VLAN(vid=2, name='VLAN 2'),
|
||||||
|
VLAN(vid=3, name='VLAN 3'),
|
||||||
|
)
|
||||||
|
VLAN.objects.bulk_create(vlans)
|
||||||
|
|
||||||
|
roles = (
|
||||||
|
Role(name='Role 1', slug='role-1'),
|
||||||
|
Role(name='Role 2', slug='role-2'),
|
||||||
|
Role(name='Role 3', slug='role-3'),
|
||||||
|
)
|
||||||
|
Role.objects.bulk_create(roles)
|
||||||
|
|
||||||
|
prefixes = (
|
||||||
|
Prefix(family=4, prefix='10.0.0.0/24', site=None, vrf=None, vlan=None, role=None, is_pool=True),
|
||||||
|
Prefix(family=4, prefix='10.0.1.0/24', site=sites[0], vrf=vrfs[0], vlan=vlans[0], role=roles[0]),
|
||||||
|
Prefix(family=4, prefix='10.0.2.0/24', site=sites[1], vrf=vrfs[1], vlan=vlans[1], role=roles[1], status=PREFIX_STATUS_DEPRECATED),
|
||||||
|
Prefix(family=4, prefix='10.0.3.0/24', site=sites[2], vrf=vrfs[2], vlan=vlans[2], role=roles[2], status=PREFIX_STATUS_RESERVED),
|
||||||
|
Prefix(family=6, prefix='2001:db8::/64', site=None, vrf=None, vlan=None, role=None, is_pool=True),
|
||||||
|
Prefix(family=6, prefix='2001:db8:0:1::/64', site=sites[0], vrf=vrfs[0], vlan=vlans[0], role=roles[0]),
|
||||||
|
Prefix(family=6, prefix='2001:db8:0:2::/64', site=sites[1], vrf=vrfs[1], vlan=vlans[1], role=roles[1], status=PREFIX_STATUS_DEPRECATED),
|
||||||
|
Prefix(family=6, prefix='2001:db8:0:3::/64', site=sites[2], vrf=vrfs[2], vlan=vlans[2], role=roles[2], status=PREFIX_STATUS_RESERVED),
|
||||||
|
Prefix(family=4, prefix='10.0.0.0/16'),
|
||||||
|
Prefix(family=6, prefix='2001:db8::/32'),
|
||||||
|
)
|
||||||
|
Prefix.objects.bulk_create(prefixes)
|
||||||
|
|
||||||
|
def test_family(self):
|
||||||
|
params = {'family': '6'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)
|
||||||
|
|
||||||
|
def test_is_pool(self):
|
||||||
|
params = {'is_pool': 'true'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'is_pool': 'false'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_within(self):
|
||||||
|
params = {'within': '10.0.0.0/16'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_within_include(self):
|
||||||
|
params = {'within_include': '10.0.0.0/16'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)
|
||||||
|
|
||||||
|
def test_contains(self):
|
||||||
|
params = {'contains': '10.0.1.0/24'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'contains': '2001:db8:0:1::/64'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_mask_length(self):
|
||||||
|
params = {'mask_length': '24'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_vrf(self):
|
||||||
|
vrfs = VRF.objects.all()[:2]
|
||||||
|
params = {'vrf_id': [vrfs[0].pk, vrfs[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'vrf': [vrfs[0].rd, vrfs[1].rd]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_vlan(self):
|
||||||
|
vlans = VLAN.objects.all()[:2]
|
||||||
|
params = {'vlan_id': [vlans[0].pk, vlans[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
# TODO: Test for multiple values
|
||||||
|
params = {'vlan_vid': vlans[0].vid}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_role(self):
|
||||||
|
roles = Role.objects.all()[:2]
|
||||||
|
params = {'role_id': [roles[0].pk, roles[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'role': [roles[0].slug, roles[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_status(self):
|
||||||
|
params = {'status': [PREFIX_STATUS_DEPRECATED, PREFIX_STATUS_RESERVED]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
|
||||||
|
class IPAddressTestCase(TestCase):
|
||||||
|
queryset = IPAddress.objects.all()
|
||||||
|
filterset = IPAddressFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
vrfs = (
|
||||||
|
VRF(name='VRF 1', rd='65000:100'),
|
||||||
|
VRF(name='VRF 2', rd='65000:200'),
|
||||||
|
VRF(name='VRF 3', rd='65000:300'),
|
||||||
|
)
|
||||||
|
VRF.objects.bulk_create(vrfs)
|
||||||
|
|
||||||
|
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||||
|
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
|
||||||
|
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
|
||||||
|
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
|
||||||
|
|
||||||
|
devices = (
|
||||||
|
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
|
||||||
|
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
|
||||||
|
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
|
||||||
|
)
|
||||||
|
Device.objects.bulk_create(devices)
|
||||||
|
|
||||||
|
clustertype = ClusterType.objects.create(name='Cluster Type 1', slug='cluster-type-1')
|
||||||
|
cluster = Cluster.objects.create(type=clustertype, name='Cluster 1')
|
||||||
|
|
||||||
|
virtual_machines = (
|
||||||
|
VirtualMachine(name='Virtual Machine 1', cluster=cluster),
|
||||||
|
VirtualMachine(name='Virtual Machine 2', cluster=cluster),
|
||||||
|
VirtualMachine(name='Virtual Machine 3', cluster=cluster),
|
||||||
|
)
|
||||||
|
VirtualMachine.objects.bulk_create(virtual_machines)
|
||||||
|
|
||||||
|
interfaces = (
|
||||||
|
Interface(device=devices[0], name='Interface 1'),
|
||||||
|
Interface(device=devices[1], name='Interface 2'),
|
||||||
|
Interface(device=devices[2], name='Interface 3'),
|
||||||
|
Interface(virtual_machine=virtual_machines[0], name='Interface 1'),
|
||||||
|
Interface(virtual_machine=virtual_machines[1], name='Interface 2'),
|
||||||
|
Interface(virtual_machine=virtual_machines[2], name='Interface 3'),
|
||||||
|
)
|
||||||
|
Interface.objects.bulk_create(interfaces)
|
||||||
|
|
||||||
|
ipaddresses = (
|
||||||
|
IPAddress(family=4, address='10.0.0.1/24', vrf=None, interface=None, status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-a'),
|
||||||
|
IPAddress(family=4, address='10.0.0.2/24', vrf=vrfs[0], interface=interfaces[0], status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-b'),
|
||||||
|
IPAddress(family=4, address='10.0.0.3/24', vrf=vrfs[1], interface=interfaces[1], status=IPADDRESS_STATUS_RESERVED, role=IPADDRESS_ROLE_VIP, dns_name='ipaddress-c'),
|
||||||
|
IPAddress(family=4, address='10.0.0.4/24', vrf=vrfs[2], interface=interfaces[2], status=IPADDRESS_STATUS_DEPRECATED, role=IPADDRESS_ROLE_SECONDARY, dns_name='ipaddress-d'),
|
||||||
|
IPAddress(family=6, address='2001:db8::1/64', vrf=None, interface=None, status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-a'),
|
||||||
|
IPAddress(family=6, address='2001:db8::2/64', vrf=vrfs[0], interface=interfaces[3], status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-b'),
|
||||||
|
IPAddress(family=6, address='2001:db8::3/64', vrf=vrfs[1], interface=interfaces[4], status=IPADDRESS_STATUS_RESERVED, role=IPADDRESS_ROLE_VIP, dns_name='ipaddress-c'),
|
||||||
|
IPAddress(family=6, address='2001:db8::4/64', vrf=vrfs[2], interface=interfaces[5], status=IPADDRESS_STATUS_DEPRECATED, role=IPADDRESS_ROLE_SECONDARY, dns_name='ipaddress-d'),
|
||||||
|
)
|
||||||
|
IPAddress.objects.bulk_create(ipaddresses)
|
||||||
|
|
||||||
|
def test_family(self):
|
||||||
|
params = {'family': '6'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_dns_name(self):
|
||||||
|
params = {'dns_name': ['ipaddress-a', 'ipaddress-b']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_parent(self):
|
||||||
|
params = {'parent': '10.0.0.0/24'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'parent': '2001:db8::/64'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def filter_address(self):
|
||||||
|
# Check IPv4 and IPv6, with and without a mask
|
||||||
|
params = {'address': '10.0.0.1/24'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
params = {'address': '10.0.0.1'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
params = {'address': '2001:db8::1/64'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
params = {'address': '2001:db8::1'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_mask_length(self):
|
||||||
|
params = {'mask_length': '24'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_vrf(self):
|
||||||
|
vrfs = VRF.objects.all()[:2]
|
||||||
|
params = {'vrf_id': [vrfs[0].pk, vrfs[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'vrf': [vrfs[0].rd, vrfs[1].rd]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
# TODO: Test for multiple values
|
||||||
|
def test_device(self):
|
||||||
|
device = Device.objects.first()
|
||||||
|
params = {'device_id': device.pk}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
params = {'device': device.name}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_virtual_machine(self):
|
||||||
|
vms = VirtualMachine.objects.all()[:2]
|
||||||
|
params = {'virtual_machine_id': [vms[0].pk, vms[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'virtual_machine': [vms[0].name, vms[1].name]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_interface(self):
|
||||||
|
interfaces = Interface.objects.all()[:2]
|
||||||
|
params = {'interface_id': [interfaces[0].pk, interfaces[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'interface': ['Interface 1', 'Interface 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_assigned_to_interface(self):
|
||||||
|
params = {'assigned_to_interface': 'true'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
|
||||||
|
params = {'assigned_to_interface': 'false'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_status(self):
|
||||||
|
params = {'status': [PREFIX_STATUS_DEPRECATED, PREFIX_STATUS_RESERVED]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_role(self):
|
||||||
|
params = {'role': [IPADDRESS_ROLE_SECONDARY, IPADDRESS_ROLE_VIP]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
|
||||||
|
class VLANGroupTestCase(TestCase):
|
||||||
|
queryset = VLANGroup.objects.all()
|
||||||
|
filterset = VLANGroupFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
vlan_groups = (
|
||||||
|
VLANGroup(name='VLAN Group 1', slug='vlan-group-1', site=sites[0]),
|
||||||
|
VLANGroup(name='VLAN Group 2', slug='vlan-group-2', site=sites[1]),
|
||||||
|
VLANGroup(name='VLAN Group 3', slug='vlan-group-3', site=sites[2]),
|
||||||
|
VLANGroup(name='VLAN Group 4', slug='vlan-group-4', site=None),
|
||||||
|
)
|
||||||
|
VLANGroup.objects.bulk_create(vlan_groups)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['VLAN Group 1', 'VLAN Group 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['vlan-group-1', 'vlan-group-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class VLANTestCase(TestCase):
|
||||||
|
queryset = VLAN.objects.all()
|
||||||
|
filterset = VLANFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
roles = (
|
||||||
|
Role(name='Role 1', slug='role-1'),
|
||||||
|
Role(name='Role 2', slug='role-2'),
|
||||||
|
Role(name='Role 3', slug='role-3'),
|
||||||
|
)
|
||||||
|
Role.objects.bulk_create(roles)
|
||||||
|
|
||||||
|
groups = (
|
||||||
|
VLANGroup(name='VLAN Group 1', slug='vlan-group-1', site=sites[0]),
|
||||||
|
VLANGroup(name='VLAN Group 2', slug='vlan-group-2', site=sites[1]),
|
||||||
|
VLANGroup(name='VLAN Group 3', slug='vlan-group-3', site=None),
|
||||||
|
)
|
||||||
|
VLANGroup.objects.bulk_create(groups)
|
||||||
|
|
||||||
|
vlans = (
|
||||||
|
VLAN(vid=101, name='VLAN 101', site=sites[0], group=groups[0], role=roles[0], status=VLAN_STATUS_ACTIVE),
|
||||||
|
VLAN(vid=102, name='VLAN 102', site=sites[0], group=groups[0], role=roles[0], status=VLAN_STATUS_ACTIVE),
|
||||||
|
VLAN(vid=201, name='VLAN 201', site=sites[1], group=groups[1], role=roles[1], status=VLAN_STATUS_DEPRECATED),
|
||||||
|
VLAN(vid=202, name='VLAN 202', site=sites[1], group=groups[1], role=roles[1], status=VLAN_STATUS_DEPRECATED),
|
||||||
|
VLAN(vid=301, name='VLAN 301', site=sites[2], group=groups[2], role=roles[2], status=VLAN_STATUS_RESERVED),
|
||||||
|
VLAN(vid=302, name='VLAN 302', site=sites[2], group=groups[2], role=roles[2], status=VLAN_STATUS_RESERVED),
|
||||||
|
)
|
||||||
|
VLAN.objects.bulk_create(vlans)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['VLAN 101', 'VLAN 102']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_rd(self):
|
||||||
|
params = {'vid': ['101', '201', '301']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_group(self):
|
||||||
|
groups = VLANGroup.objects.all()[:2]
|
||||||
|
params = {'group_id': [groups[0].pk, groups[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'group': [groups[0].slug, groups[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_role(self):
|
||||||
|
roles = Role.objects.all()[:2]
|
||||||
|
params = {'role_id': [roles[0].pk, roles[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
params = {'role': [roles[0].slug, roles[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_status(self):
|
||||||
|
params = {'status': [VLAN_STATUS_ACTIVE, VLAN_STATUS_DEPRECATED]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
|
||||||
|
class ServiceTestCase(TestCase):
|
||||||
|
queryset = Service.objects.all()
|
||||||
|
filterset = ServiceFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||||
|
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
|
||||||
|
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
|
||||||
|
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
|
||||||
|
|
||||||
|
devices = (
|
||||||
|
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
|
||||||
|
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
|
||||||
|
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
|
||||||
|
)
|
||||||
|
Device.objects.bulk_create(devices)
|
||||||
|
|
||||||
|
clustertype = ClusterType.objects.create(name='Cluster Type 1', slug='cluster-type-1')
|
||||||
|
cluster = Cluster.objects.create(type=clustertype, name='Cluster 1')
|
||||||
|
|
||||||
|
virtual_machines = (
|
||||||
|
VirtualMachine(name='Virtual Machine 1', cluster=cluster),
|
||||||
|
VirtualMachine(name='Virtual Machine 2', cluster=cluster),
|
||||||
|
VirtualMachine(name='Virtual Machine 3', cluster=cluster),
|
||||||
|
)
|
||||||
|
VirtualMachine.objects.bulk_create(virtual_machines)
|
||||||
|
|
||||||
|
services = (
|
||||||
|
Service(device=devices[0], name='Service 1', protocol=IP_PROTOCOL_TCP, port=1001),
|
||||||
|
Service(device=devices[1], name='Service 2', protocol=IP_PROTOCOL_TCP, port=1002),
|
||||||
|
Service(device=devices[2], name='Service 3', protocol=IP_PROTOCOL_UDP, port=1003),
|
||||||
|
Service(virtual_machine=virtual_machines[0], name='Service 4', protocol=IP_PROTOCOL_TCP, port=2001),
|
||||||
|
Service(virtual_machine=virtual_machines[1], name='Service 5', protocol=IP_PROTOCOL_TCP, port=2002),
|
||||||
|
Service(virtual_machine=virtual_machines[2], name='Service 6', protocol=IP_PROTOCOL_UDP, port=2003),
|
||||||
|
)
|
||||||
|
Service.objects.bulk_create(services)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:3]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Service 1', 'Service 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_protocol(self):
|
||||||
|
params = {'protocol': IP_PROTOCOL_TCP}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
|
||||||
|
|
||||||
|
def test_port(self):
|
||||||
|
params = {'port': ['1001', '1002', '1003']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
|
||||||
|
|
||||||
|
def test_device(self):
|
||||||
|
devices = Device.objects.all()[:2]
|
||||||
|
params = {'device_id': [devices[0].pk, devices[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'device': [devices[0].name, devices[1].name]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_virtual_machine(self):
|
||||||
|
vms = VirtualMachine.objects.all()[:2]
|
||||||
|
params = {'virtual_machine_id': [vms[0].pk, vms[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'virtual_machine': [vms[0].name, vms[1].name]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
@ -333,7 +333,10 @@ class AggregateView(PermissionRequiredMixin, View):
|
|||||||
).annotate_depth(
|
).annotate_depth(
|
||||||
limit=0
|
limit=0
|
||||||
)
|
)
|
||||||
child_prefixes = add_available_prefixes(aggregate.prefix, child_prefixes)
|
|
||||||
|
# Add available prefixes to the table if requested
|
||||||
|
if request.GET.get('show_available', 'true') == 'true':
|
||||||
|
child_prefixes = add_available_prefixes(aggregate.prefix, child_prefixes)
|
||||||
|
|
||||||
prefix_table = tables.PrefixDetailTable(child_prefixes)
|
prefix_table = tables.PrefixDetailTable(child_prefixes)
|
||||||
if request.user.has_perm('ipam.change_prefix') or request.user.has_perm('ipam.delete_prefix'):
|
if request.user.has_perm('ipam.change_prefix') or request.user.has_perm('ipam.delete_prefix'):
|
||||||
@ -356,6 +359,7 @@ class AggregateView(PermissionRequiredMixin, View):
|
|||||||
'aggregate': aggregate,
|
'aggregate': aggregate,
|
||||||
'prefix_table': prefix_table,
|
'prefix_table': prefix_table,
|
||||||
'permissions': permissions,
|
'permissions': permissions,
|
||||||
|
'show_available': request.GET.get('show_available', 'true') == 'true',
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
@ -511,8 +515,8 @@ class PrefixPrefixesView(PermissionRequiredMixin, View):
|
|||||||
'site', 'vlan', 'role',
|
'site', 'vlan', 'role',
|
||||||
).annotate_depth(limit=0)
|
).annotate_depth(limit=0)
|
||||||
|
|
||||||
# Annotate available prefixes
|
# Add available prefixes to the table if requested
|
||||||
if child_prefixes:
|
if child_prefixes and request.GET.get('show_available', 'true') == 'true':
|
||||||
child_prefixes = add_available_prefixes(prefix.prefix, child_prefixes)
|
child_prefixes = add_available_prefixes(prefix.prefix, child_prefixes)
|
||||||
|
|
||||||
prefix_table = tables.PrefixDetailTable(child_prefixes)
|
prefix_table = tables.PrefixDetailTable(child_prefixes)
|
||||||
@ -539,6 +543,7 @@ class PrefixPrefixesView(PermissionRequiredMixin, View):
|
|||||||
'permissions': permissions,
|
'permissions': permissions,
|
||||||
'bulk_querystring': 'vrf_id={}&within={}'.format(prefix.vrf.pk if prefix.vrf else '0', prefix.prefix),
|
'bulk_querystring': 'vrf_id={}&within={}'.format(prefix.vrf.pk if prefix.vrf else '0', prefix.prefix),
|
||||||
'active_tab': 'prefixes',
|
'active_tab': 'prefixes',
|
||||||
|
'show_available': request.GET.get('show_available', 'true') == 'true',
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
@ -553,7 +558,10 @@ class PrefixIPAddressesView(PermissionRequiredMixin, View):
|
|||||||
ipaddresses = prefix.get_child_ips().prefetch_related(
|
ipaddresses = prefix.get_child_ips().prefetch_related(
|
||||||
'vrf', 'interface__device', 'primary_ip4_for', 'primary_ip6_for'
|
'vrf', 'interface__device', 'primary_ip4_for', 'primary_ip6_for'
|
||||||
)
|
)
|
||||||
ipaddresses = add_available_ipaddresses(prefix.prefix, ipaddresses, prefix.is_pool)
|
|
||||||
|
# Add available IP addresses to the table if requested
|
||||||
|
if request.GET.get('show_available', 'true') == 'true':
|
||||||
|
ipaddresses = add_available_ipaddresses(prefix.prefix, ipaddresses, prefix.is_pool)
|
||||||
|
|
||||||
ip_table = tables.IPAddressTable(ipaddresses)
|
ip_table = tables.IPAddressTable(ipaddresses)
|
||||||
if request.user.has_perm('ipam.change_ipaddress') or request.user.has_perm('ipam.delete_ipaddress'):
|
if request.user.has_perm('ipam.change_ipaddress') or request.user.has_perm('ipam.delete_ipaddress'):
|
||||||
@ -579,6 +587,7 @@ class PrefixIPAddressesView(PermissionRequiredMixin, View):
|
|||||||
'permissions': permissions,
|
'permissions': permissions,
|
||||||
'bulk_querystring': 'vrf_id={}&parent={}'.format(prefix.vrf.pk if prefix.vrf else '0', prefix.prefix),
|
'bulk_querystring': 'vrf_id={}&parent={}'.format(prefix.vrf.pk if prefix.vrf else '0', prefix.prefix),
|
||||||
'active_tab': 'ip-addresses',
|
'active_tab': 'ip-addresses',
|
||||||
|
'show_available': request.GET.get('show_available', 'true') == 'true',
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,6 +7,12 @@ from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilte
|
|||||||
from .models import Secret, SecretRole
|
from .models import Secret, SecretRole
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'SecretFilter',
|
||||||
|
'SecretRoleFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SecretRoleFilter(NameSlugSearchFilterSet):
|
class SecretRoleFilter(NameSlugSearchFilterSet):
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
|
92
netbox/secrets/tests/test_filters.py
Normal file
92
netbox/secrets/tests/test_filters.py
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
|
||||||
|
from secrets.filters import *
|
||||||
|
from secrets.models import Secret, SecretRole
|
||||||
|
|
||||||
|
|
||||||
|
class SecretRoleTestCase(TestCase):
|
||||||
|
queryset = SecretRole.objects.all()
|
||||||
|
filterset = SecretRoleFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
roles = (
|
||||||
|
SecretRole(name='Secret Role 1', slug='secret-role-1'),
|
||||||
|
SecretRole(name='Secret Role 2', slug='secret-role-2'),
|
||||||
|
SecretRole(name='Secret Role 3', slug='secret-role-3'),
|
||||||
|
)
|
||||||
|
SecretRole.objects.bulk_create(roles)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Secret Role 1', 'Secret Role 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['secret-role-1', 'secret-role-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class SecretTestCase(TestCase):
|
||||||
|
queryset = Secret.objects.all()
|
||||||
|
filterset = SecretFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||||
|
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
|
||||||
|
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
|
||||||
|
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
|
||||||
|
|
||||||
|
devices = (
|
||||||
|
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
|
||||||
|
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
|
||||||
|
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
|
||||||
|
)
|
||||||
|
Device.objects.bulk_create(devices)
|
||||||
|
|
||||||
|
roles = (
|
||||||
|
SecretRole(name='Secret Role 1', slug='secret-role-1'),
|
||||||
|
SecretRole(name='Secret Role 2', slug='secret-role-2'),
|
||||||
|
SecretRole(name='Secret Role 3', slug='secret-role-3'),
|
||||||
|
)
|
||||||
|
SecretRole.objects.bulk_create(roles)
|
||||||
|
|
||||||
|
secrets = (
|
||||||
|
Secret(device=devices[0], role=roles[0], name='Secret 1', plaintext='SECRET DATA'),
|
||||||
|
Secret(device=devices[1], role=roles[1], name='Secret 2', plaintext='SECRET DATA'),
|
||||||
|
Secret(device=devices[2], role=roles[2], name='Secret 3', plaintext='SECRET DATA'),
|
||||||
|
)
|
||||||
|
# Must call save() to encrypt Secrets
|
||||||
|
for s in secrets:
|
||||||
|
s.save()
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Secret 1', 'Secret 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_role(self):
|
||||||
|
roles = SecretRole.objects.all()[:2]
|
||||||
|
params = {'role_id': [roles[0].pk, roles[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'role': [roles[0].slug, roles[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_device(self):
|
||||||
|
devices = Device.objects.all()[:2]
|
||||||
|
params = {'device_id': [devices[0].pk, devices[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'device': [devices[0].name, devices[1].name]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
@ -112,7 +112,7 @@
|
|||||||
{% if configcontext.roles.all %}
|
{% if configcontext.roles.all %}
|
||||||
<ul>
|
<ul>
|
||||||
{% for role in configcontext.roles.all %}
|
{% for role in configcontext.roles.all %}
|
||||||
<li><a href="{{ role.get_absolute_url }}">{{ role }}</a></li>
|
<li><a href="{% url 'dcim:device_list' %}?role={{ role.slug }}">{{ role }}</a></li>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</ul>
|
</ul>
|
||||||
{% else %}
|
{% else %}
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
</div>
|
</div>
|
||||||
<h1>{% block title %}{{ aggregate }}{% endblock %}</h1>
|
<h1>{% block title %}{{ aggregate }}{% endblock %}</h1>
|
||||||
{% include 'inc/created_updated.html' with obj=aggregate %}
|
{% include 'inc/created_updated.html' with obj=aggregate %}
|
||||||
|
{% include 'ipam/inc/toggle_available.html' %}
|
||||||
<div class="pull-right noprint">
|
<div class="pull-right noprint">
|
||||||
{% custom_links aggregate %}
|
{% custom_links aggregate %}
|
||||||
</div>
|
</div>
|
||||||
|
9
netbox/templates/ipam/inc/toggle_available.html
Normal file
9
netbox/templates/ipam/inc/toggle_available.html
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
{% load helpers %}
|
||||||
|
{% if show_available is not None %}
|
||||||
|
<div class="pull-right">
|
||||||
|
<div class="btn-group" role="group">
|
||||||
|
<a href="{{ request.path }}{% querystring request show_available='true' %}" class="btn btn-default{% if show_available %} active disabled{% endif %}">Show available</a>
|
||||||
|
<a href="{{ request.path }}{% querystring request show_available='false' %}" class="btn btn-default{% if not show_available %} active disabled{% endif %}">Hide available</a>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
@ -53,6 +53,7 @@
|
|||||||
</div>
|
</div>
|
||||||
<h1>{% block title %}{{ prefix }}{% endblock %}</h1>
|
<h1>{% block title %}{{ prefix }}{% endblock %}</h1>
|
||||||
{% include 'inc/created_updated.html' with obj=prefix %}
|
{% include 'inc/created_updated.html' with obj=prefix %}
|
||||||
|
{% include 'ipam/inc/toggle_available.html' %}
|
||||||
<div class="pull-right noprint">
|
<div class="pull-right noprint">
|
||||||
{% custom_links prefix %}
|
{% custom_links prefix %}
|
||||||
</div>
|
</div>
|
||||||
|
@ -6,6 +6,12 @@ from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilte
|
|||||||
from .models import Tenant, TenantGroup
|
from .models import Tenant, TenantGroup
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'TenantFilter',
|
||||||
|
'TenantGroupFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TenantGroupFilter(NameSlugSearchFilterSet):
|
class TenantGroupFilter(NameSlugSearchFilterSet):
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
|
74
netbox/tenancy/tests/test_filters.py
Normal file
74
netbox/tenancy/tests/test_filters.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from tenancy.filters import *
|
||||||
|
from tenancy.models import Tenant, TenantGroup
|
||||||
|
|
||||||
|
|
||||||
|
class TenantGroupTestCase(TestCase):
|
||||||
|
queryset = TenantGroup.objects.all()
|
||||||
|
filterset = TenantGroupFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
groups = (
|
||||||
|
TenantGroup(name='Tenant Group 1', slug='tenant-group-1'),
|
||||||
|
TenantGroup(name='Tenant Group 2', slug='tenant-group-2'),
|
||||||
|
TenantGroup(name='Tenant Group 3', slug='tenant-group-3'),
|
||||||
|
)
|
||||||
|
TenantGroup.objects.bulk_create(groups)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Tenant Group 1', 'Tenant Group 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['tenant-group-1', 'tenant-group-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class TenantTestCase(TestCase):
|
||||||
|
queryset = Tenant.objects.all()
|
||||||
|
filterset = TenantFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
groups = (
|
||||||
|
TenantGroup(name='Tenant Group 1', slug='tenant-group-1'),
|
||||||
|
TenantGroup(name='Tenant Group 2', slug='tenant-group-2'),
|
||||||
|
TenantGroup(name='Tenant Group 3', slug='tenant-group-3'),
|
||||||
|
)
|
||||||
|
TenantGroup.objects.bulk_create(groups)
|
||||||
|
|
||||||
|
tenants = (
|
||||||
|
Tenant(name='Tenant 1', slug='tenant-1', group=groups[0]),
|
||||||
|
Tenant(name='Tenant 2', slug='tenant-2', group=groups[1]),
|
||||||
|
Tenant(name='Tenant 3', slug='tenant-3', group=groups[2]),
|
||||||
|
)
|
||||||
|
Tenant.objects.bulk_create(tenants)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Tenant 1', 'Tenant 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['tenant-1', 'tenant-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_group(self):
|
||||||
|
group = TenantGroup.objects.all()[:2]
|
||||||
|
params = {'group_id': [group[0].pk, group[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'group': [group[0].slug, group[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
@ -1,7 +1,5 @@
|
|||||||
import django_filters
|
import django_filters
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from netaddr import EUI
|
|
||||||
from netaddr.core import AddrFormatError
|
|
||||||
|
|
||||||
from dcim.models import DeviceRole, Interface, Platform, Region, Site
|
from dcim.models import DeviceRole, Interface, Platform, Region, Site
|
||||||
from extras.filters import CustomFieldFilterSet, CreatedUpdatedFilterSet
|
from extras.filters import CustomFieldFilterSet, CreatedUpdatedFilterSet
|
||||||
@ -13,6 +11,15 @@ from .constants import *
|
|||||||
from .models import Cluster, ClusterGroup, ClusterType, VirtualMachine
|
from .models import Cluster, ClusterGroup, ClusterType, VirtualMachine
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
'ClusterFilter',
|
||||||
|
'ClusterGroupFilter',
|
||||||
|
'ClusterTypeFilter',
|
||||||
|
'InterfaceFilter',
|
||||||
|
'VirtualMachineFilter',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ClusterTypeFilter(NameSlugSearchFilterSet):
|
class ClusterTypeFilter(NameSlugSearchFilterSet):
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
@ -208,8 +215,7 @@ class InterfaceFilter(django_filters.FilterSet):
|
|||||||
to_field_name='name',
|
to_field_name='name',
|
||||||
label='Virtual machine',
|
label='Virtual machine',
|
||||||
)
|
)
|
||||||
mac_address = django_filters.CharFilter(
|
mac_address = MultiValueMACAddressFilter(
|
||||||
method='_mac_address',
|
|
||||||
label='MAC address',
|
label='MAC address',
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -217,16 +223,6 @@ class InterfaceFilter(django_filters.FilterSet):
|
|||||||
model = Interface
|
model = Interface
|
||||||
fields = ['id', 'name', 'enabled', 'mtu']
|
fields = ['id', 'name', 'enabled', 'mtu']
|
||||||
|
|
||||||
def _mac_address(self, queryset, name, value):
|
|
||||||
value = value.strip()
|
|
||||||
if not value:
|
|
||||||
return queryset
|
|
||||||
try:
|
|
||||||
mac = EUI(value.strip())
|
|
||||||
return queryset.filter(mac_address=mac)
|
|
||||||
except AddrFormatError:
|
|
||||||
return queryset.none()
|
|
||||||
|
|
||||||
def search(self, queryset, name, value):
|
def search(self, queryset, name, value):
|
||||||
if not value.strip():
|
if not value.strip():
|
||||||
return queryset
|
return queryset
|
||||||
|
367
netbox/virtualization/tests/test_filters.py
Normal file
367
netbox/virtualization/tests/test_filters.py
Normal file
@ -0,0 +1,367 @@
|
|||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from dcim.models import DeviceRole, Interface, Platform, Region, Site
|
||||||
|
from virtualization.constants import *
|
||||||
|
from virtualization.filters import *
|
||||||
|
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
|
||||||
|
|
||||||
|
|
||||||
|
class ClusterTypeTestCase(TestCase):
|
||||||
|
queryset = ClusterType.objects.all()
|
||||||
|
filterset = ClusterTypeFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
cluster_types = (
|
||||||
|
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
|
||||||
|
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
|
||||||
|
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
|
||||||
|
)
|
||||||
|
ClusterType.objects.bulk_create(cluster_types)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Cluster Type 1', 'Cluster Type 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['cluster-type-1', 'cluster-type-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class ClusterGroupTestCase(TestCase):
|
||||||
|
queryset = ClusterGroup.objects.all()
|
||||||
|
filterset = ClusterGroupFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
cluster_groups = (
|
||||||
|
ClusterGroup(name='Cluster Group 1', slug='cluster-group-1'),
|
||||||
|
ClusterGroup(name='Cluster Group 2', slug='cluster-group-2'),
|
||||||
|
ClusterGroup(name='Cluster Group 3', slug='cluster-group-3'),
|
||||||
|
)
|
||||||
|
ClusterGroup.objects.bulk_create(cluster_groups)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Cluster Group 1', 'Cluster Group 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_slug(self):
|
||||||
|
params = {'slug': ['cluster-group-1', 'cluster-group-2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class ClusterTestCase(TestCase):
|
||||||
|
queryset = Cluster.objects.all()
|
||||||
|
filterset = ClusterFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
cluster_types = (
|
||||||
|
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
|
||||||
|
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
|
||||||
|
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
|
||||||
|
)
|
||||||
|
ClusterType.objects.bulk_create(cluster_types)
|
||||||
|
|
||||||
|
cluster_groups = (
|
||||||
|
ClusterGroup(name='Cluster Group 1', slug='cluster-group-1'),
|
||||||
|
ClusterGroup(name='Cluster Group 2', slug='cluster-group-2'),
|
||||||
|
ClusterGroup(name='Cluster Group 3', slug='cluster-group-3'),
|
||||||
|
)
|
||||||
|
ClusterGroup.objects.bulk_create(cluster_groups)
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
clusters = (
|
||||||
|
Cluster(name='Cluster 1', type=cluster_types[0], group=cluster_groups[0], site=sites[0]),
|
||||||
|
Cluster(name='Cluster 2', type=cluster_types[1], group=cluster_groups[1], site=sites[1]),
|
||||||
|
Cluster(name='Cluster 3', type=cluster_types[2], group=cluster_groups[2], site=sites[2]),
|
||||||
|
)
|
||||||
|
Cluster.objects.bulk_create(clusters)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Cluster 1', 'Cluster 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_group(self):
|
||||||
|
groups = ClusterGroup.objects.all()[:2]
|
||||||
|
params = {'group_id': [groups[0].pk, groups[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'group': [groups[0].slug, groups[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_type(self):
|
||||||
|
types = ClusterType.objects.all()[:2]
|
||||||
|
params = {'type_id': [types[0].pk, types[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'type': [types[0].slug, types[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class VirtualMachineTestCase(TestCase):
|
||||||
|
queryset = VirtualMachine.objects.all()
|
||||||
|
filterset = VirtualMachineFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
cluster_types = (
|
||||||
|
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
|
||||||
|
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
|
||||||
|
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
|
||||||
|
)
|
||||||
|
ClusterType.objects.bulk_create(cluster_types)
|
||||||
|
|
||||||
|
cluster_groups = (
|
||||||
|
ClusterGroup(name='Cluster Group 1', slug='cluster-group-1'),
|
||||||
|
ClusterGroup(name='Cluster Group 2', slug='cluster-group-2'),
|
||||||
|
ClusterGroup(name='Cluster Group 3', slug='cluster-group-3'),
|
||||||
|
)
|
||||||
|
ClusterGroup.objects.bulk_create(cluster_groups)
|
||||||
|
|
||||||
|
regions = (
|
||||||
|
Region(name='Test Region 1', slug='test-region-1'),
|
||||||
|
Region(name='Test Region 2', slug='test-region-2'),
|
||||||
|
Region(name='Test Region 3', slug='test-region-3'),
|
||||||
|
)
|
||||||
|
# Can't use bulk_create for models with MPTT fields
|
||||||
|
for r in regions:
|
||||||
|
r.save()
|
||||||
|
|
||||||
|
sites = (
|
||||||
|
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
|
||||||
|
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
|
||||||
|
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
|
||||||
|
)
|
||||||
|
Site.objects.bulk_create(sites)
|
||||||
|
|
||||||
|
clusters = (
|
||||||
|
Cluster(name='Cluster 1', type=cluster_types[0], group=cluster_groups[0], site=sites[0]),
|
||||||
|
Cluster(name='Cluster 2', type=cluster_types[1], group=cluster_groups[1], site=sites[1]),
|
||||||
|
Cluster(name='Cluster 3', type=cluster_types[2], group=cluster_groups[2], site=sites[2]),
|
||||||
|
)
|
||||||
|
Cluster.objects.bulk_create(clusters)
|
||||||
|
|
||||||
|
platforms = (
|
||||||
|
Platform(name='Platform 1', slug='platform-1'),
|
||||||
|
Platform(name='Platform 2', slug='platform-2'),
|
||||||
|
Platform(name='Platform 3', slug='platform-3'),
|
||||||
|
)
|
||||||
|
Platform.objects.bulk_create(platforms)
|
||||||
|
|
||||||
|
roles = (
|
||||||
|
DeviceRole(name='Device Role 1', slug='device-role-1'),
|
||||||
|
DeviceRole(name='Device Role 2', slug='device-role-2'),
|
||||||
|
DeviceRole(name='Device Role 3', slug='device-role-3'),
|
||||||
|
)
|
||||||
|
DeviceRole.objects.bulk_create(roles)
|
||||||
|
|
||||||
|
vms = (
|
||||||
|
VirtualMachine(name='Virtual Machine 1', cluster=clusters[0], platform=platforms[0], role=roles[0], status=DEVICE_STATUS_ACTIVE, vcpus=1, memory=1, disk=1),
|
||||||
|
VirtualMachine(name='Virtual Machine 2', cluster=clusters[1], platform=platforms[1], role=roles[1], status=DEVICE_STATUS_STAGED, vcpus=2, memory=2, disk=2),
|
||||||
|
VirtualMachine(name='Virtual Machine 3', cluster=clusters[2], platform=platforms[2], role=roles[2], status=DEVICE_STATUS_OFFLINE, vcpus=3, memory=3, disk=3),
|
||||||
|
)
|
||||||
|
VirtualMachine.objects.bulk_create(vms)
|
||||||
|
|
||||||
|
interfaces = (
|
||||||
|
Interface(virtual_machine=vms[0], name='Interface 1', mac_address='00-00-00-00-00-01'),
|
||||||
|
Interface(virtual_machine=vms[1], name='Interface 2', mac_address='00-00-00-00-00-02'),
|
||||||
|
Interface(virtual_machine=vms[2], name='Interface 3', mac_address='00-00-00-00-00-03'),
|
||||||
|
)
|
||||||
|
Interface.objects.bulk_create(interfaces)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Virtual Machine 1', 'Virtual Machine 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_vcpus(self):
|
||||||
|
params = {'vcpus': [1, 2]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_memory(self):
|
||||||
|
params = {'memory': [1, 2]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_disk(self):
|
||||||
|
params = {'disk': [1, 2]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_id__in(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id__in': ','.join([str(id) for id in id_list])}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_status(self):
|
||||||
|
params = {'status': [DEVICE_STATUS_ACTIVE, DEVICE_STATUS_STAGED]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_cluster_group(self):
|
||||||
|
groups = ClusterGroup.objects.all()[:2]
|
||||||
|
params = {'cluster_group_id': [groups[0].pk, groups[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'cluster_group': [groups[0].slug, groups[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_cluster_type(self):
|
||||||
|
types = ClusterType.objects.all()[:2]
|
||||||
|
params = {'cluster_type_id': [types[0].pk, types[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'cluster_type': [types[0].slug, types[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_cluster(self):
|
||||||
|
clusters = Cluster.objects.all()[:2]
|
||||||
|
params = {'cluster_id': [clusters[0].pk, clusters[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
# TODO: 'cluster' should match on name
|
||||||
|
# params = {'cluster': [clusters[0].name, clusters[1].name]}
|
||||||
|
# self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_region(self):
|
||||||
|
regions = Region.objects.all()[:2]
|
||||||
|
params = {'region_id': [regions[0].pk, regions[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'region': [regions[0].slug, regions[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_site(self):
|
||||||
|
sites = Site.objects.all()[:2]
|
||||||
|
params = {'site_id': [sites[0].pk, sites[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'site': [sites[0].slug, sites[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_role(self):
|
||||||
|
roles = DeviceRole.objects.all()[:2]
|
||||||
|
params = {'role_id': [roles[0].pk, roles[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'role': [roles[0].slug, roles[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_platform(self):
|
||||||
|
platforms = Platform.objects.all()[:2]
|
||||||
|
params = {'platform_id': [platforms[0].pk, platforms[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'platform': [platforms[0].slug, platforms[1].slug]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_mac_address(self):
|
||||||
|
params = {'mac_address': ['00-00-00-00-00-01', '00-00-00-00-00-02']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
|
||||||
|
class InterfaceTestCase(TestCase):
|
||||||
|
queryset = Interface.objects.all()
|
||||||
|
filterset = InterfaceFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpTestData(cls):
|
||||||
|
|
||||||
|
cluster_types = (
|
||||||
|
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
|
||||||
|
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
|
||||||
|
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
|
||||||
|
)
|
||||||
|
ClusterType.objects.bulk_create(cluster_types)
|
||||||
|
|
||||||
|
clusters = (
|
||||||
|
Cluster(name='Cluster 1', type=cluster_types[0]),
|
||||||
|
Cluster(name='Cluster 2', type=cluster_types[1]),
|
||||||
|
Cluster(name='Cluster 3', type=cluster_types[2]),
|
||||||
|
)
|
||||||
|
Cluster.objects.bulk_create(clusters)
|
||||||
|
|
||||||
|
vms = (
|
||||||
|
VirtualMachine(name='Virtual Machine 1', cluster=clusters[0]),
|
||||||
|
VirtualMachine(name='Virtual Machine 2', cluster=clusters[1]),
|
||||||
|
VirtualMachine(name='Virtual Machine 3', cluster=clusters[2]),
|
||||||
|
)
|
||||||
|
VirtualMachine.objects.bulk_create(vms)
|
||||||
|
|
||||||
|
interfaces = (
|
||||||
|
Interface(virtual_machine=vms[0], name='Interface 1', enabled=True, mtu=100, mac_address='00-00-00-00-00-01'),
|
||||||
|
Interface(virtual_machine=vms[1], name='Interface 2', enabled=True, mtu=200, mac_address='00-00-00-00-00-02'),
|
||||||
|
Interface(virtual_machine=vms[2], name='Interface 3', enabled=False, mtu=300, mac_address='00-00-00-00-00-03'),
|
||||||
|
)
|
||||||
|
Interface.objects.bulk_create(interfaces)
|
||||||
|
|
||||||
|
def test_id(self):
|
||||||
|
id_list = self.queryset.values_list('id', flat=True)[:2]
|
||||||
|
params = {'id': [str(id) for id in id_list]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
params = {'name': ['Interface 1', 'Interface 2']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_assigned_to_interface(self):
|
||||||
|
params = {'enabled': 'true'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'enabled': 'false'}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||||
|
|
||||||
|
def test_mtu(self):
|
||||||
|
params = {'mtu': [100, 200]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_virtual_machine(self):
|
||||||
|
vms = VirtualMachine.objects.all()[:2]
|
||||||
|
params = {'virtual_machine_id': [vms[0].pk, vms[1].pk]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
params = {'virtual_machine': [vms[0].name, vms[1].name]}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||||
|
|
||||||
|
def test_mac_address(self):
|
||||||
|
params = {'mac_address': ['00-00-00-00-00-01', '00-00-00-00-00-02']}
|
||||||
|
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
Loading…
Reference in New Issue
Block a user