Merge branch 'develop' into 2921-tags-select2

This commit is contained in:
hSaria
2020-01-21 21:29:21 +00:00
committed by GitHub
24 changed files with 853 additions and 128 deletions

View File

@@ -545,6 +545,7 @@ class InterfaceTypeChoices(ChoiceSet):
TYPE_80211N = 'ieee802.11n'
TYPE_80211AC = 'ieee802.11ac'
TYPE_80211AD = 'ieee802.11ad'
TYPE_80211AX = 'ieee802.11ax'
# Cellular
TYPE_GSM = 'gsm'
@@ -650,6 +651,7 @@ class InterfaceTypeChoices(ChoiceSet):
(TYPE_80211N, 'IEEE 802.11n'),
(TYPE_80211AC, 'IEEE 802.11ac'),
(TYPE_80211AD, 'IEEE 802.11ad'),
(TYPE_80211AX, 'IEEE 802.11ax'),
)
),
(
@@ -800,6 +802,7 @@ class InterfaceTypeChoices(ChoiceSet):
TYPE_SUMMITSTACK128: 5310,
TYPE_SUMMITSTACK256: 5320,
TYPE_SUMMITSTACK512: 5330,
TYPE_OTHER: 32767,
}

View File

@@ -1,6 +1,5 @@
import django_filters
from django.contrib.auth.models import User
from django.db.models import Q
from extras.filters import CustomFieldFilterSet, LocalConfigContextFilterSet, CreatedUpdatedFilterSet
from tenancy.filters import TenancyFilterSet
@@ -356,6 +355,10 @@ class DeviceTypeFilterSet(CustomFieldFilterSet, CreatedUpdatedFilterSet):
method='_pass_through_ports',
label='Has pass-through ports',
)
device_bays = django_filters.BooleanFilter(
method='_device_bays',
label='Has device bays',
)
tag = TagFilter()
class Meta:
@@ -395,6 +398,9 @@ class DeviceTypeFilterSet(CustomFieldFilterSet, CreatedUpdatedFilterSet):
rearport_templates__isnull=value
)
def _device_bays(self, queryset, name, value):
return queryset.exclude(device_bay_templates__isnull=value)
class DeviceTypeComponentFilterSet(NameSlugSearchFilterSet):
devicetype_id = django_filters.ModelMultipleChoiceFilter(
@@ -623,6 +629,10 @@ class DeviceFilterSet(LocalConfigContextFilterSet, TenancyFilterSet, CustomField
method='_pass_through_ports',
label='Has pass-through ports',
)
device_bays = django_filters.BooleanFilter(
method='_device_bays',
label='Has device bays',
)
tag = TagFilter()
class Meta:
@@ -676,21 +686,25 @@ class DeviceFilterSet(LocalConfigContextFilterSet, TenancyFilterSet, CustomField
rearports__isnull=value
)
def _device_bays(self, queryset, name, value):
return queryset.exclude(device_bays__isnull=value)
class DeviceComponentFilterSet(django_filters.FilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
region_id = django_filters.ModelMultipleChoiceFilter(
field_name='device__site__region',
region_id = TreeNodeMultipleChoiceFilter(
queryset=Region.objects.all(),
field_name='device__site__region__in',
label='Region (ID)',
)
region = django_filters.ModelMultipleChoiceFilter(
field_name='device__site__region__in',
region = TreeNodeMultipleChoiceFilter(
queryset=Region.objects.all(),
label='Region name (slug)',
field_name='device__site__region__in',
to_field_name='slug',
label='Region (slug)',
)
site_id = django_filters.ModelMultipleChoiceFilter(
field_name='device__site',
@@ -700,6 +714,7 @@ class DeviceComponentFilterSet(django_filters.FilterSet):
site = django_filters.ModelMultipleChoiceFilter(
field_name='device__site__slug',
queryset=Site.objects.all(),
to_field_name='slug',
label='Site name (slug)',
)
device_id = django_filters.ModelMultipleChoiceFilter(
@@ -787,35 +802,13 @@ class PowerOutletFilterSet(DeviceComponentFilterSet):
fields = ['id', 'name', 'feed_leg', 'description', 'connection_status']
class InterfaceFilterSet(django_filters.FilterSet):
"""
Not using DeviceComponentFilterSet for Interfaces because we need to check for VirtualChassis membership.
"""
class InterfaceFilterSet(DeviceComponentFilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
region_id = django_filters.ModelMultipleChoiceFilter(
field_name='device__site__region',
queryset=Region.objects.all(),
label='Region (ID)',
)
region = django_filters.ModelMultipleChoiceFilter(
field_name='device__site__region__in',
queryset=Region.objects.all(),
label='Region name (slug)',
)
site_id = django_filters.ModelMultipleChoiceFilter(
field_name='device__site',
queryset=Site.objects.all(),
label='Site (ID)',
)
site = django_filters.ModelMultipleChoiceFilter(
field_name='device__site__slug',
to_field_name='slug',
queryset=Site.objects.all(),
label='Site name (slug)',
)
# Override device and device_id filters from DeviceComponentFilterSet to match against any peer virtual chassis
# members
device = MultiValueCharFilter(
method='filter_device',
field_name='name',
@@ -859,14 +852,6 @@ class InterfaceFilterSet(django_filters.FilterSet):
model = Interface
fields = ['id', 'name', 'connection_status', 'type', 'enabled', 'mtu', 'mgmt_only', 'mode', 'description']
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(name__icontains=value) |
Q(description__icontains=value)
).distinct()
def filter_device(self, queryset, name, value):
try:
devices = Device.objects.filter(**{'{}__in'.format(name): value})

View File

@@ -67,21 +67,25 @@ class DeviceComponentFilterForm(BootstrapMixin, forms.Form):
required=False,
label='Search'
)
region = TreeNodeChoiceField(
region = FilterChoiceField(
queryset=Region.objects.all(),
required=False,
widget=APISelect(
api_url="/api/dcim/regions/"
)
)
site = forms.ModelChoiceField(
queryset=Site.objects.all(),
to_field_name='slug',
required=False,
help_text='Name of parent site',
error_messages={
'invalid_choice': 'Site not found.',
}
widget=APISelectMultiple(
api_url='/api/dcim/regions/',
value_field='slug',
filter_for={
'site': 'region'
}
)
)
site = FilterChoiceField(
queryset=Site.objects.all(),
to_field_name='slug',
widget=APISelectMultiple(
api_url="/api/dcim/sites/",
value_field="slug"
)
)

View File

@@ -0,0 +1,20 @@
from django.db import migrations
def interface_type_to_slug(apps, schema_editor):
Interface = apps.get_model('dcim', 'Interface')
Interface.objects.filter(type=32767).update(type='other')
class Migration(migrations.Migration):
dependencies = [
('dcim', '0090_cable_termination_models'),
]
operations = [
# Missed type "other" in the initial migration (see #3967)
migrations.RunPython(
code=interface_type_to_slug
),
]

View File

@@ -395,14 +395,23 @@ class RackElevationHelperMixin:
fill='black'
)
)
link.add(drawing.rect(start, end, fill='#{}'.format(color)))
link.set_desc('{}{} ({}U) {} {}'.format(
device.device_role, device.device_type.display_name,
device.device_type.u_height, device.asset_tag or '', device.serial or ''
))
link.add(drawing.rect(start, end, style='fill: #{}'.format(color), class_='slot'))
hex_color = '#{}'.format(foreground_color(color))
link.add(drawing.text(device.name, insert=text, fill=hex_color))
link.add(drawing.text(str(device), insert=text, fill=hex_color))
@staticmethod
def _draw_device_rear(drawing, device, start, end, text):
drawing.add(drawing.rect(start, end, class_="blocked"))
drawing.add(drawing.text(device.name, insert=text))
rect = drawing.rect(start, end, class_="blocked")
rect.set_desc('{}{} ({}U) {} {}'.format(
device.device_role, device.device_type.display_name,
device.device_type.u_height, device.asset_tag or '', device.serial or ''
))
drawing.add(rect)
drawing.add(drawing.text(str(device), insert=text))
@staticmethod
def _draw_empty(drawing, rack, start, end, text, id_, face_id, class_):
@@ -459,6 +468,21 @@ class RackElevationHelperMixin:
return drawing
def merge_elevations(self, face):
elevation = self.get_rack_units(face=face, expand_devices=False)
other_face = DeviceFaceChoices.FACE_FRONT if face == DeviceFaceChoices.FACE_REAR else DeviceFaceChoices.FACE_REAR
other = self.get_rack_units(face=other_face)
unit_cursor = 0
for u in elevation:
o = other[unit_cursor]
if not u['device'] and o['device']:
u['device'] = o['device']
u['height'] = 1
unit_cursor += u.get('height', 1)
return elevation
def get_elevation_svg(self, face=DeviceFaceChoices.FACE_FRONT, unit_width=230, unit_height=20):
"""
Return an SVG of the rack elevation
@@ -468,7 +492,7 @@ class RackElevationHelperMixin:
:param unit_height: Height of each rack unit for the rendered drawing. Note this is not the total
height of the elevation
"""
elevation = self.get_rack_units(face=face, expand_devices=False)
elevation = self.merge_elevations(face)
reserved_units = self.get_reserved_units().keys()
return self._draw_elevations(elevation, reserved_units, face, unit_width, unit_height)
@@ -1464,7 +1488,7 @@ class Device(ChangeLoggedModel, ConfigContextModel, CustomFieldModel):
try:
# Child devices cannot be assigned to a rack face/unit
if self.device_type.is_child_device and self.face is not None:
if self.device_type.is_child_device and self.face:
raise ValidationError({
'face': "Child device types cannot be assigned to a rack face. This is an attribute of the "
"parent device."

View File

@@ -595,12 +595,11 @@ class DeviceTypeTestCase(TestCase):
params = {'pass_through_ports': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
# TODO: Add device_bay filter
# def test_device_bays(self):
# params = {'device_bays': 'true'}
# self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
# params = {'device_bays': 'false'}
# self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_device_bays(self):
params = {'device_bays': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'device_bays': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
class ConsolePortTemplateTestCase(TestCase):
@@ -1322,12 +1321,11 @@ class DeviceTestCase(TestCase):
params = {'pass_through_ports': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
# TODO: Add device_bay filter
# def test_device_bays(self):
# params = {'device_bays': 'true'}
# self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
# params = {'device_bays': 'false'}
# self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_device_bays(self):
params = {'device_bays': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'device_bays': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_local_context_data(self):
params = {'local_context_data': 'true'}
@@ -1343,16 +1341,28 @@ class ConsolePortTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1392,6 +1402,20 @@ class ConsolePortTestCase(TestCase):
params = {'connection_status': 'True'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1413,16 +1437,28 @@ class ConsoleServerPortTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1462,6 +1498,20 @@ class ConsoleServerPortTestCase(TestCase):
params = {'connection_status': 'True'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1483,16 +1533,28 @@ class PowerPortTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1540,6 +1602,20 @@ class PowerPortTestCase(TestCase):
params = {'connection_status': 'True'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1561,16 +1637,28 @@ class PowerOutletTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1615,6 +1703,20 @@ class PowerOutletTestCase(TestCase):
params = {'connection_status': 'True'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1636,16 +1738,28 @@ class InterfaceTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1702,6 +1816,20 @@ class InterfaceTestCase(TestCase):
params = {'description': ['First', 'Second']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1737,16 +1865,28 @@ class FrontPortTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1793,6 +1933,20 @@ class FrontPortTestCase(TestCase):
params = {'description': ['First', 'Second']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1814,16 +1968,28 @@ class RearPortTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name=None, device_type=device_type, device_role=device_role, site=site), # For cable connections
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
Device(name=None, device_type=device_type, device_role=device_role, site=sites[3]), # For cable connections
)
Device.objects.bulk_create(devices)
@@ -1864,6 +2030,20 @@ class RearPortTestCase(TestCase):
params = {'description': ['First', 'Second']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
@@ -1885,15 +2065,27 @@ class DeviceBayTestCase(TestCase):
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site1')
regions = (
Region(name='Region 1', slug='region-1'),
Region(name='Region 2', slug='region-2'),
Region(name='Region 3', slug='region-3'),
)
for region in regions:
region.save()
sites = Site.objects.bulk_create((
Site(name='Site 1', slug='site-1', region=regions[0]),
Site(name='Site 2', slug='site-2', region=regions[1]),
Site(name='Site 3', slug='site-3', region=regions[2]),
Site(name='Site X', slug='site-x'),
))
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(name='Device 1', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=site),
Device(name='Device 1', device_type=device_type, device_role=device_role, site=sites[0]),
Device(name='Device 2', device_type=device_type, device_role=device_role, site=sites[1]),
Device(name='Device 3', device_type=device_type, device_role=device_role, site=sites[2]),
)
Device.objects.bulk_create(devices)
@@ -1917,6 +2109,20 @@ class DeviceBayTestCase(TestCase):
params = {'description': ['First', 'Second']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}

View File

@@ -10,6 +10,7 @@ from django.db import models
from django.http import HttpResponse
from django.template import Template, Context
from django.urls import reverse
from django.utils.text import slugify
from taggit.models import TagBase, GenericTaggedItemBase
from utilities.fields import ColorField
@@ -952,6 +953,13 @@ class Tag(TagBase, ChangeLoggedModel):
def get_absolute_url(self):
return reverse('extras:tag', args=[self.slug])
def slugify(self, tag, i=None):
# Allow Unicode in Tag slugs (avoids empty slugs for Tags with all-Unicode names)
slug = slugify(tag, allow_unicode=True)
if i is not None:
slug += "_%d" % i
return slug
class TaggedItem(GenericTaggedItemBase):
tag = models.ForeignKey(

View File

@@ -3,7 +3,7 @@ from django.test import TestCase
from dcim.models import Site
from extras.choices import TemplateLanguageChoices
from extras.models import Graph
from extras.models import Graph, Tag
class GraphTest(TestCase):
@@ -44,3 +44,12 @@ class GraphTest(TestCase):
self.assertEqual(graph.embed_url(self.site), RENDERED_TEXT)
self.assertEqual(graph.embed_link(self.site), RENDERED_TEXT)
class TagTest(TestCase):
def test_create_tag_unicode(self):
tag = Tag(name='Testing Unicode: 台灣')
tag.save()
self.assertEqual(tag.slug, 'testing-unicode-台灣')

View File

@@ -0,0 +1,89 @@
import django_rq
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from rest_framework import status
from dcim.models import Site
from extras.choices import ObjectChangeActionChoices
from extras.models import Webhook
from utilities.testing import APITestCase
class WebhookTest(APITestCase):
def setUp(self):
super().setUp()
self.queue = django_rq.get_queue('default')
self.queue.empty() # Begin each test with an empty queue
@classmethod
def setUpTestData(cls):
site_ct = ContentType.objects.get_for_model(Site)
PAYLOAD_URL = "http://localhost/"
webhooks = Webhook.objects.bulk_create((
Webhook(name='Site Create Webhook', type_create=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Update Webhook', type_update=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=PAYLOAD_URL),
))
for webhook in webhooks:
webhook.obj_type.set([site_ct])
def test_enqueue_webhook_create(self):
# Create an object via the REST API
data = {
'name': 'Test Site',
'slug': 'test-site',
}
url = reverse('dcim-api:site-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 1)
# Verify that a job was queued for the object creation webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.args[0], Webhook.objects.get(type_create=True))
self.assertEqual(job.args[1]['id'], response.data['id'])
self.assertEqual(job.args[2], 'site')
self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_CREATE)
def test_enqueue_webhook_update(self):
site = Site.objects.create(name='Site 1', slug='site-1')
# Update an object via the REST API
data = {
'comments': 'Updated the site',
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.args[0], Webhook.objects.get(type_update=True))
self.assertEqual(job.args[1]['id'], site.pk)
self.assertEqual(job.args[2], 'site')
self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_UPDATE)
def test_enqueue_webhook_delete(self):
site = Site.objects.create(name='Site 1', slug='site-1')
# Delete an object via the REST API
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.args[0], Webhook.objects.get(type_delete=True))
self.assertEqual(job.args[1]['id'], site.pk)
self.assertEqual(job.args[2], 'site')
self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_DELETE)

View File

@@ -6,8 +6,7 @@ import requests
from django_rq import job
from rest_framework.utils.encoders import JSONEncoder
from .choices import ObjectChangeActionChoices
from .constants import *
from .choices import ObjectChangeActionChoices, WebhookContentTypeChoices
@job('default')
@@ -35,9 +34,9 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
'headers': headers
}
if webhook.http_content_type == WEBHOOK_CT_JSON:
if webhook.http_content_type == WebhookContentTypeChoices.CONTENTTYPE_JSON:
params.update({'data': json.dumps(payload, cls=JSONEncoder)})
elif webhook.http_content_type == WEBHOOK_CT_X_WWW_FORM_ENCODED:
elif webhook.http_content_type == WebhookContentTypeChoices.CONTENTTYPE_FORMDATA:
params.update({'data': payload})
prepared_request = requests.Request(**params).prepare()
@@ -61,5 +60,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
return 'Status {} returned, webhook successfully processed.'.format(response.status_code)
else:
raise requests.exceptions.RequestException(
"Status {} returned with content '{}', webhook FAILED to process.".format(response.status_code, response.content)
"Status {} returned with content '{}', webhook FAILED to process.".format(
response.status_code, response.content
)
)

View File

@@ -7,7 +7,7 @@ from rest_framework import status
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from ipam.choices import *
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
from utilities.testing import APITestCase, choices_to_dict
from utilities.testing import APITestCase, choices_to_dict, disable_warnings
class AppTest(APITestCase):
@@ -1007,7 +1007,8 @@ class VLANTest(APITestCase):
self.prefix1.save()
url = reverse('ipam-api:vlan-detail', kwargs={'pk': self.vlan1.pk})
response = self.client.delete(url, **self.header)
with disable_warnings('django.request'):
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_409_CONFLICT)

View File

@@ -2,12 +2,199 @@ import netaddr
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
from ipam.choices import IPAddressRoleChoices
from ipam.models import IPAddress, Prefix, VRF
from ipam.choices import IPAddressRoleChoices, PrefixStatusChoices
from ipam.models import Aggregate, IPAddress, Prefix, RIR, VLAN, VLANGroup, VRF
class TestAggregate(TestCase):
def test_get_utilization(self):
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
aggregate = Aggregate(prefix=netaddr.IPNetwork('10.0.0.0/8'), rir=rir)
aggregate.save()
# 25% utilization
Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/12')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.16.0.0/12')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.32.0.0/12')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.48.0.0/12')),
))
self.assertEqual(aggregate.get_utilization(), 25)
# 50% utilization
Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.64.0.0/10')),
))
self.assertEqual(aggregate.get_utilization(), 50)
# 100% utilization
Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.128.0.0/9')),
))
self.assertEqual(aggregate.get_utilization(), 100)
class TestPrefix(TestCase):
def test_get_duplicates(self):
prefixes = Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(family=4, prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(family=4, prefix=netaddr.IPNetwork('192.0.2.0/24')),
))
duplicate_prefix_pks = [p.pk for p in prefixes[0].get_duplicates()]
self.assertSetEqual(set(duplicate_prefix_pks), {prefixes[1].pk, prefixes[2].pk})
def test_get_child_prefixes(self):
vrfs = VRF.objects.bulk_create((
VRF(name='VRF 1'),
VRF(name='VRF 2'),
VRF(name='VRF 3'),
))
prefixes = Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/24'), vrf=None),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
))
child_prefix_pks = {p.pk for p in prefixes[0].get_child_prefixes()}
# Global container should return all children
self.assertSetEqual(child_prefix_pks, {prefixes[1].pk, prefixes[2].pk, prefixes[3].pk, prefixes[4].pk})
prefixes[0].vrf = vrfs[0]
prefixes[0].save()
child_prefix_pks = {p.pk for p in prefixes[0].get_child_prefixes()}
# VRF container is limited to its own VRF
self.assertSetEqual(child_prefix_pks, {prefixes[2].pk})
def test_get_child_ips(self):
vrfs = VRF.objects.bulk_create((
VRF(name='VRF 1'),
VRF(name='VRF 2'),
VRF(name='VRF 3'),
))
parent_prefix = Prefix.objects.create(
family=4, prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
)
ips = IPAddress.objects.bulk_create((
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
))
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
# Global container should return all children
self.assertSetEqual(child_ip_pks, {ips[0].pk, ips[1].pk, ips[2].pk, ips[3].pk})
parent_prefix.vrf = vrfs[0]
parent_prefix.save()
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
# VRF container is limited to its own VRF
self.assertSetEqual(child_ip_pks, {ips[1].pk})
def test_get_available_prefixes(self):
prefixes = Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/20')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.32.0/20')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.128.0/18')),
))
missing_prefixes = netaddr.IPSet([
netaddr.IPNetwork('10.0.16.0/20'),
netaddr.IPNetwork('10.0.48.0/20'),
netaddr.IPNetwork('10.0.64.0/18'),
netaddr.IPNetwork('10.0.192.0/18'),
])
available_prefixes = prefixes[0].get_available_prefixes()
self.assertEqual(available_prefixes, missing_prefixes)
def test_get_available_ips(self):
parent_prefix = Prefix.objects.create(family=4, prefix=netaddr.IPNetwork('10.0.0.0/28'))
IPAddress.objects.bulk_create((
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.1/26')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.3/26')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.5/26')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.7/26')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.9/26')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.11/26')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.13/26')),
))
missing_ips = netaddr.IPSet([
'10.0.0.2/32',
'10.0.0.4/32',
'10.0.0.6/32',
'10.0.0.8/32',
'10.0.0.10/32',
'10.0.0.12/32',
'10.0.0.14/32',
])
available_ips = parent_prefix.get_available_ips()
self.assertEqual(available_ips, missing_ips)
def test_get_first_available_prefix(self):
prefixes = Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/24')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.1.0/24')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.2.0/24')),
))
self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.3.0/24'))
Prefix.objects.create(family=4, prefix=netaddr.IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.4.0/22'))
def test_get_first_available_ip(self):
parent_prefix = Prefix.objects.create(family=4, prefix=netaddr.IPNetwork('10.0.0.0/24'))
IPAddress.objects.bulk_create((
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.1/24')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.2/24')),
IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.3/24')),
))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.4/24')
IPAddress.objects.create(family=4, address=netaddr.IPNetwork('10.0.0.4/24'))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.5/24')
def test_get_utilization(self):
# Container Prefix
prefix = Prefix.objects.create(
family=4,
prefix=netaddr.IPNetwork('10.0.0.0/24'),
status=PrefixStatusChoices.STATUS_CONTAINER
)
Prefix.objects.bulk_create((
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.0/26')),
Prefix(family=4, prefix=netaddr.IPNetwork('10.0.0.128/26')),
))
self.assertEqual(prefix.get_utilization(), 50)
# Non-container Prefix
prefix.status = PrefixStatusChoices.STATUS_ACTIVE
prefix.save()
IPAddress.objects.bulk_create(
# Create 32 IPAddresses within the Prefix
[IPAddress(family=4, address=netaddr.IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
)
self.assertEqual(prefix.get_utilization(), 12) # ~= 12%
#
# Uniqueness enforcement tests
#
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
@@ -35,6 +222,20 @@ class TestPrefix(TestCase):
class TestIPAddress(TestCase):
def test_get_duplicates(self):
ips = IPAddress.objects.bulk_create((
IPAddress(family=4, address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(family=4, address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(family=4, address=netaddr.IPNetwork('192.0.2.1/24')),
))
duplicate_ip_pks = [p.pk for p in ips[0].get_duplicates()]
self.assertSetEqual(set(duplicate_ip_pks), {ips[1].pk, ips[2].pk})
#
# Uniqueness enforcement tests
#
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
@@ -63,3 +264,22 @@ class TestIPAddress(TestCase):
def test_duplicate_nonunique_role(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
class TestVLANGroup(TestCase):
def test_get_next_available_vid(self):
vlangroup = VLANGroup.objects.create(name='VLAN Group 1', slug='vlan-group-1')
VLAN.objects.bulk_create((
VLAN(name='VLAN 1', vid=1, group=vlangroup),
VLAN(name='VLAN 2', vid=2, group=vlangroup),
VLAN(name='VLAN 3', vid=3, group=vlangroup),
VLAN(name='VLAN 5', vid=5, group=vlangroup),
))
self.assertEqual(vlangroup.get_next_available_vid(), 4)
VLAN.objects.bulk_create((
VLAN(name='VLAN 4', vid=4, group=vlangroup),
))
self.assertEqual(vlangroup.get_next_available_vid(), 6)

View File

@@ -12,7 +12,7 @@ from django.core.exceptions import ImproperlyConfigured
# Environment setup
#
VERSION = '2.7.2-dev'
VERSION = '2.7.3-dev'
# Hostname
HOSTNAME = platform.node()

View File

@@ -16,6 +16,8 @@ def validate_rsa_key(key, is_secret=True):
"""
Validate the format and type of an RSA key.
"""
if key.startswith('ssh-rsa '):
raise forms.ValidationError("OpenSSH line format is not supported. Please ensure that your public is in PEM (base64) format.")
try:
key = RSA.importKey(key)
except ValueError:

View File

@@ -36,3 +36,5 @@ GY2b4PKuSTcsYjbg8adOGzFL9RXLI1X4PHNCzD/Y1vdM3jJXv+luk3TU+JIbzJeN
5ZEEz+sIdlMPCAACaZAY/t9Kd/LxHr0o4K/6gqkZIukxFCK6sN53gibAXfaKc4xl
qQIDAQAB
-----END PUBLIC KEY-----"""
SSH_PUBLIC_KEY = """ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCy2yMGnuvmM5CnFG8CsohfUYobXU7+pz/RJtvUUnARAY11Ybc3cn0tvzn4aPxclX8+514n6R7jJCZuVGJXXapqZDq2l+PLmgLhyBJxE9qq7rbp4EAJiUP0inDyf8qFzSKT7Rm8cjHvY3v2GI32JUXuWACA23t5YPUqVglkjfdVX8VHJh6fMQrQ4O3CKKh2x0S82UHH7SaYH0HqOknPgyRQ+ZQorUU25IpzJPesk29nN3DYqfY+VQsKJOLglWvoapaZiu+wK/7ovXqYXNuhfAwlkjbCRKjwix1kZjtDS44US1//BCaT7AeuwMpFLI44v/VajoxTfE0h74Mxl48mNt7Qme4lbXxH8yMa6HNfDp4vjnxPE1CWuSrFo4G+HI1rc22qSmw9e67qIGRbcI7/cIFpeBvnfCCgWrqWZ6ZzdAZJCnu7/aWn00+VG+54GFmJ+3R2xhWcu+Uzn+o1aWROtUuzq0qR6zdXME3A0Oud2uQrQAiAGFdWpfvcOEbD+tlPNDk= test"""

View File

@@ -0,0 +1,33 @@
from django.test import TestCase
from secrets.forms import UserKeyForm
from secrets.models import UserKey
from utilities.testing import create_test_user
from .constants import PUBLIC_KEY, SSH_PUBLIC_KEY
class UserKeyFormTestCase(TestCase):
def setUp(self):
user = create_test_user(
permissions=[
'secrets.view_secretrole',
'secrets.add_secretrole',
]
)
self.userkey = UserKey(user=user)
def test_upload_rsakey(self):
form = UserKeyForm(
data={'public_key': PUBLIC_KEY},
instance=self.userkey,
)
self.assertTrue(form.is_valid())
self.assertTrue(form.save())
def test_upload_sshkey(self):
form = UserKeyForm(
data={'public_key': SSH_PUBLIC_KEY},
instance=self.userkey,
)
print(form.is_valid())
self.assertFalse(form.is_valid())

View File

@@ -84,7 +84,7 @@
</a>
</li>
{% if perms.dcim.napalm_read %}
{% if device.status != 1 %}
{% if device.status != 'active' %}
{% include 'dcim/inc/device_napalm_tabs.html' with disabled_message='Device must be in active status' %}
{% elif not device.platform %}
{% include 'dcim/inc/device_napalm_tabs.html' with disabled_message='No platform assigned to this device' %}

View File

@@ -1,3 +1,6 @@
import logging
from contextlib import contextmanager
from django.contrib.auth.models import Permission, User
from rest_framework.test import APITestCase as _APITestCase
@@ -62,3 +65,15 @@ def choices_to_dict(choices_list):
return {
choice['value']: choice['label'] for choice in choices_list
}
@contextmanager
def disable_warnings(logger_name):
"""
Temporarily suppress expected warning messages to keep the test output clean.
"""
logger = logging.getLogger(logger_name)
current_level = logger.level
logger.setLevel(logging.ERROR)
yield
logger.setLevel(current_level)

View File

@@ -9,7 +9,7 @@ from dcim.models import Region, Site
from extras.choices import CustomFieldTypeChoices
from extras.models import CustomField
from ipam.models import VLAN
from utilities.testing import APITestCase
from utilities.testing import APITestCase, disable_warnings
class WritableNestedSerializerTest(APITestCase):
@@ -50,7 +50,8 @@ class WritableNestedSerializerTest(APITestCase):
}
url = reverse('ipam-api:vlan-list')
response = self.client.post(url, data, format='json', **self.header)
with disable_warnings('django.request'):
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertEqual(VLAN.objects.count(), 0)
@@ -85,7 +86,8 @@ class WritableNestedSerializerTest(APITestCase):
}
url = reverse('ipam-api:vlan-list')
response = self.client.post(url, data, format='json', **self.header)
with disable_warnings('django.request'):
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertEqual(VLAN.objects.count(), 0)
@@ -104,7 +106,8 @@ class WritableNestedSerializerTest(APITestCase):
}
url = reverse('ipam-api:vlan-list')
response = self.client.post(url, data, format='json', **self.header)
with disable_warnings('django.request'):
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertEqual(VLAN.objects.count(), 0)
@@ -119,7 +122,8 @@ class WritableNestedSerializerTest(APITestCase):
}
url = reverse('ipam-api:vlan-list')
response = self.client.post(url, data, format='json', **self.header)
with disable_warnings('django.request'):
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertEqual(VLAN.objects.count(), 0)

View File

@@ -5,7 +5,7 @@ from rest_framework import status
from dcim.choices import InterfaceModeChoices
from dcim.models import Interface
from ipam.models import IPAddress, VLAN
from utilities.testing import APITestCase, choices_to_dict
from utilities.testing import APITestCase, choices_to_dict, disable_warnings
from virtualization.choices import *
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
@@ -417,7 +417,8 @@ class VirtualMachineTest(APITestCase):
}
url = reverse('virtualization-api:virtualmachine-list')
response = self.client.post(url, data, format='json', **self.header)
with disable_warnings('django.request'):
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertEqual(VirtualMachine.objects.count(), 4)