Merge pull request #3908 from netbox-community/develop

Release v2.6.12
This commit is contained in:
Jeremy Stretch 2020-01-13 13:25:21 -05:00 committed by GitHub
commit 946779000f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 5140 additions and 484 deletions

23
.github/lock.yml vendored Normal file
View File

@ -0,0 +1,23 @@
# Configuration for Lock (https://github.com/apps/lock)
# Number of days of inactivity before a closed issue or pull request is locked
daysUntilLock: 90
# Skip issues and pull requests created before a given timestamp. Timestamp must
# follow ISO 8601 (`YYYY-MM-DD`). Set to `false` to disable
skipCreatedBefore: 2020-01-01
# Issues and pull requests with these labels will be ignored. Set to `[]` to disable
exemptLabels: []
# Label to add before locking, such as `outdated`. Set to `false` to disable
lockLabel: false
# Comment to post before locking. Set to `false` to disable
lockComment: false
# Assign `resolved` as the reason for locking. Set to `false` to disable
setLockReason: true
# Limit to only `issues` or `pulls`
# only: issues

7
.github/stale.yml vendored
View File

@ -1,20 +1,27 @@
# Configuration for Stale (https://github.com/apps/stale)
# Number of days of inactivity before an issue becomes stale
daysUntilStale: 14
# Number of days of inactivity before a stale issue is closed
daysUntilClose: 7
# Issues with these labels will never be considered stale
exemptLabels:
- "status: accepted"
- "status: gathering feedback"
- "status: blocked"
# Label to use when marking an issue as stale
staleLabel: wontfix
# Comment to post when marking an issue as stale. Set to `false` to disable
markComment: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. NetBox
is governed by a small group of core maintainers which means not all opened
issues may receive direct feedback. Please see our [contributing guide](https://github.com/netbox-community/netbox/blob/develop/CONTRIBUTING.md).
# Comment to post when closing a stale issue. Set to `false` to disable
closeComment: >
This issue has been automatically closed due to lack of activity. In an

View File

@ -0,0 +1,65 @@
# NAPALM
NetBox supports integration with the [NAPALM automation](https://napalm-automation.net/) library. NAPALM allows NetBox to fetch live data from devices and return it to a requester via its REST API.
!!! info
To enable the integration, the NAPALM library must be installed. See [installation steps](../../installation/2-netbox/#napalm-automation-optional) for more information.
```
GET /api/dcim/devices/1/napalm/?method=get_environment
{
"get_environment": {
...
}
}
```
## Authentication
By default, the [`NAPALM_USERNAME`](../../configuration/optional-settings/#napalm_username) and [`NAPALM_PASSWORD`](../../configuration/optional-settings/#napalm_password) are used for NAPALM authentication. They can be overridden for an individual API call through the `X-NAPALM-Username` and `X-NAPALM-Password` headers.
```
$ curl "http://localhost/api/dcim/devices/1/napalm/?method=get_environment" \
-H "Authorization: Token f4b378553dacfcfd44c5a0b9ae49b57e29c552b5" \
-H "Content-Type: application/json" \
-H "Accept: application/json; indent=4" \
-H "X-NAPALM-Username: foo" \
-H "X-NAPALM-Password: bar"
```
## Method Support
The list of supported NAPALM methods depends on the [NAPALM driver](https://napalm.readthedocs.io/en/latest/support/index.html#general-support-matrix) configured for the platform of a device. NetBox only supports [get](https://napalm.readthedocs.io/en/latest/support/index.html#getters-support-matrix) methods.
## Multiple Methods
More than one method in an API call can be invoked by adding multiple `method` parameters. For example:
```
GET /api/dcim/devices/1/napalm/?method=get_ntp_servers&method=get_ntp_peers
{
"get_ntp_servers": {
...
},
"get_ntp_peers": {
...
}
}
```
## Optional Arguments
The behavior of NAPALM drivers can be adjusted according to the [optional arguments](https://napalm.readthedocs.io/en/latest/support/index.html#optional-arguments). NetBox exposes those arguments using headers prefixed with `X-NAPALM-`.
For instance, the SSH port is changed to 2222 in this API call:
```
$ curl "http://localhost/api/dcim/devices/1/napalm/?method=get_environment" \
-H "Authorization: Token f4b378553dacfcfd44c5a0b9ae49b57e29c552b5" \
-H "Content-Type: application/json" \
-H "Accept: application/json; indent=4" \
-H "X-NAPALM-port: 2222"
```

View File

@ -69,6 +69,14 @@ If the new field will be included in the object list view, add a column to the m
Edit the object's view template to display the new field. There may also be a custom add/edit form template that needs to be updated.
### 11. Adjust API and model tests
### 11. Create/extend test cases
Extend the model and/or API tests to verify that the new field and any accompanying validation logic perform as expected. This is especially important for relational fields.
Create or extend the relevant test cases to verify that the new field and any accompanying validation logic perform as expected. This is especially important for relational fields. NetBox incorporates various test suites, including:
* API serializer/view tests
* Filter tests
* Form tests
* Model tests
* View tests
Be diligent to ensure all of the relevant test suites are adapted or extended as necessary to test any new functionality.

View File

@ -1,3 +1,40 @@
# v2.6.12 (2020-01-13)
## Enhancements
* [#1982](https://github.com/netbox-community/netbox/issues/1982) - Improved NAPALM method documentation in Swagger (OpenAPI)
* [#2050](https://github.com/netbox-community/netbox/issues/2050) - Preview image attachments when hovering over the link
* [#2113](https://github.com/netbox-community/netbox/issues/2113) - Allow NAPALM driver settings to be changed with request headers
* [#2589](https://github.com/netbox-community/netbox/issues/2589) - Toggle the display of child prefixes/IP addresses
* [#3009](https://github.com/netbox-community/netbox/issues/3009) - Search by description when assigning IP address to interfaces
* [#3021](https://github.com/netbox-community/netbox/issues/3021) - Add `tenant` filter field for cables
* [#3090](https://github.com/netbox-community/netbox/issues/3090) - Enable filtering of interfaces by name on the device view
* [#3187](https://github.com/netbox-community/netbox/issues/3187) - Add rack selection field to rack elevations view
* [#3393](https://github.com/netbox-community/netbox/issues/3393) - Paginate assigned circuits at the provider details view
* [#3440](https://github.com/netbox-community/netbox/issues/3440) - Add total path length to cable trace
* [#3491](https://github.com/netbox-community/netbox/issues/3491) - Include content of response on webhook error
* [#3623](https://github.com/netbox-community/netbox/issues/3623) - Enable word expansion during interface creation
* [#3668](https://github.com/netbox-community/netbox/issues/3668) - Enable searching by DNS name when assigning IP address
* [#3851](https://github.com/netbox-community/netbox/issues/3851) - Allow passing initial data to custom script forms
* [#3891](https://github.com/netbox-community/netbox/issues/3891) - Add `local_context_data` filter for virtual machines
## Bug Fixes
* [#3589](https://github.com/netbox-community/netbox/issues/3589) - Fix validation on tagged VLANs of an interface
* [#3849](https://github.com/netbox-community/netbox/issues/3849) - Fix ordering of models when dumping data to JSON
* [#3853](https://github.com/netbox-community/netbox/issues/3853) - Fix device role link on config context view
* [#3856](https://github.com/netbox-community/netbox/issues/3856) - Allow filtering VM interfaces by multiple MAC addresses
* [#3857](https://github.com/netbox-community/netbox/issues/3857) - Fix rendering of grouped custom links
* [#3862](https://github.com/netbox-community/netbox/issues/3862) - Allow filtering device components by multiple device names
* [#3864](https://github.com/netbox-community/netbox/issues/3864) - Disallow /0 masks for prefixes and IP addresses
* [#3872](https://github.com/netbox-community/netbox/issues/3872) - Paginate related IPs on the IP address view
* [#3876](https://github.com/netbox-community/netbox/issues/3876) - Fix minimum/maximum value rendering for site ASN field
* [#3882](https://github.com/netbox-community/netbox/issues/3882) - Fix filtering of devices by rack group
* [#3898](https://github.com/netbox-community/netbox/issues/3898) - Fix references to deleted cables without a label
* [#3905](https://github.com/netbox-community/netbox/issues/3905) - Fix divide-by-zero on power feeds with low power values
---
# v2.6.11 (2020-01-03)
## Bug Fixes

View File

@ -35,6 +35,7 @@ pages:
- Custom Scripts: 'additional-features/custom-scripts.md'
- Export Templates: 'additional-features/export-templates.md'
- Graphs: 'additional-features/graphs.md'
- NAPALM: 'additional-features/napalm.md'
- Prometheus Metrics: 'additional-features/prometheus-metrics.md'
- Reports: 'additional-features/reports.md'
- Tags: 'additional-features/tags.md'

View File

@ -8,6 +8,13 @@ from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilte
from .constants import *
from .models import Circuit, CircuitTermination, CircuitType, Provider
__all__ = (
'CircuitFilter',
'CircuitTerminationFilter',
'CircuitTypeFilter',
'ProviderFilter',
)
class ProviderFilter(CustomFieldFilterSet, CreatedUpdatedFilterSet):
id__in = NumericInFilter(

View File

@ -0,0 +1,287 @@
from django.test import TestCase
from circuits.constants import CIRCUIT_STATUS_ACTIVE, CIRCUIT_STATUS_OFFLINE, CIRCUIT_STATUS_PLANNED
from circuits.filters import *
from circuits.models import Circuit, CircuitTermination, CircuitType, Provider
from dcim.models import Region, Site
class ProviderTestCase(TestCase):
queryset = Provider.objects.all()
filterset = ProviderFilter
@classmethod
def setUpTestData(cls):
providers = (
Provider(name='Provider 1', slug='provider-1', asn=65001, account='1234'),
Provider(name='Provider 2', slug='provider-2', asn=65002, account='2345'),
Provider(name='Provider 3', slug='provider-3', asn=65003, account='3456'),
Provider(name='Provider 4', slug='provider-4', asn=65004, account='4567'),
Provider(name='Provider 5', slug='provider-5', asn=65005, account='5678'),
)
Provider.objects.bulk_create(providers)
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
)
Site.objects.bulk_create(sites)
circuit_types = (
CircuitType(name='Test Circuit Type 1', slug='test-circuit-type-1'),
CircuitType(name='Test Circuit Type 2', slug='test-circuit-type-2'),
)
CircuitType.objects.bulk_create(circuit_types)
circuits = (
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 1'),
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 1'),
)
Circuit.objects.bulk_create(circuits)
CircuitTermination.objects.bulk_create((
CircuitTermination(circuit=circuits[0], site=sites[0], term_side='A', port_speed=1000),
CircuitTermination(circuit=circuits[1], site=sites[0], term_side='A', port_speed=1000),
))
def test_name(self):
params = {'name': ['Provider 1', 'Provider 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['provider-1', 'provider-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_asn(self):
params = {'asn': ['65001', '65002']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_account(self):
params = {'account': ['1234', '2345']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class CircuitTypeTestCase(TestCase):
queryset = CircuitType.objects.all()
filterset = CircuitTypeFilter
@classmethod
def setUpTestData(cls):
CircuitType.objects.bulk_create((
CircuitType(name='Circuit Type 1', slug='circuit-type-1'),
CircuitType(name='Circuit Type 2', slug='circuit-type-2'),
CircuitType(name='Circuit Type 3', slug='circuit-type-3'),
))
def test_id(self):
params = {'id': [self.queryset.first().pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_name(self):
params = {'name': ['Circuit Type 1']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_slug(self):
params = {'slug': ['circuit-type-1']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
class CircuitTestCase(TestCase):
queryset = Circuit.objects.all()
filterset = CircuitFilter
@classmethod
def setUpTestData(cls):
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
)
Site.objects.bulk_create(sites)
circuit_types = (
CircuitType(name='Test Circuit Type 1', slug='test-circuit-type-1'),
CircuitType(name='Test Circuit Type 2', slug='test-circuit-type-2'),
)
CircuitType.objects.bulk_create(circuit_types)
providers = (
Provider(name='Provider 1', slug='provider-1'),
Provider(name='Provider 2', slug='provider-2'),
)
Provider.objects.bulk_create(providers)
circuits = (
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 1', install_date='2020-01-01', commit_rate=1000, status=CIRCUIT_STATUS_ACTIVE),
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 2', install_date='2020-01-02', commit_rate=2000, status=CIRCUIT_STATUS_ACTIVE),
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 3', install_date='2020-01-03', commit_rate=3000, status=CIRCUIT_STATUS_PLANNED),
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 4', install_date='2020-01-04', commit_rate=4000, status=CIRCUIT_STATUS_PLANNED),
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 5', install_date='2020-01-05', commit_rate=5000, status=CIRCUIT_STATUS_OFFLINE),
Circuit(provider=providers[1], type=circuit_types[1], cid='Test Circuit 6', install_date='2020-01-06', commit_rate=6000, status=CIRCUIT_STATUS_OFFLINE),
)
Circuit.objects.bulk_create(circuits)
circuit_terminations = ((
CircuitTermination(circuit=circuits[0], site=sites[0], term_side='A', port_speed=1000),
CircuitTermination(circuit=circuits[1], site=sites[1], term_side='A', port_speed=1000),
CircuitTermination(circuit=circuits[2], site=sites[2], term_side='A', port_speed=1000),
))
CircuitTermination.objects.bulk_create(circuit_terminations)
def test_cid(self):
params = {'cid': ['Test Circuit 1', 'Test Circuit 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_install_date(self):
params = {'install_date': ['2020-01-01', '2020-01-02']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_commit_rate(self):
params = {'commit_rate': ['1000', '2000']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_provider(self):
provider = Provider.objects.first()
params = {'provider_id': [provider.pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'provider': [provider.slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_type(self):
circuit_type = CircuitType.objects.first()
params = {'type_id': [circuit_type.pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'type': [circuit_type.slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_status(self):
params = {'status': [CIRCUIT_STATUS_ACTIVE, CIRCUIT_STATUS_PLANNED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class CircuitTerminationTestCase(TestCase):
queryset = CircuitTermination.objects.all()
filterset = CircuitTerminationFilter
@classmethod
def setUpTestData(cls):
sites = (
Site(name='Test Site 1', slug='test-site-1'),
Site(name='Test Site 2', slug='test-site-2'),
Site(name='Test Site 3', slug='test-site-3'),
)
Site.objects.bulk_create(sites)
circuit_types = (
CircuitType(name='Test Circuit Type 1', slug='test-circuit-type-1'),
)
CircuitType.objects.bulk_create(circuit_types)
providers = (
Provider(name='Provider 1', slug='provider-1'),
)
Provider.objects.bulk_create(providers)
circuits = (
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 1'),
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 2'),
Circuit(provider=providers[0], type=circuit_types[0], cid='Test Circuit 3'),
)
Circuit.objects.bulk_create(circuits)
circuit_terminations = ((
CircuitTermination(circuit=circuits[0], site=sites[0], term_side='A', port_speed=1000, upstream_speed=1000, xconnect_id='ABC'),
CircuitTermination(circuit=circuits[0], site=sites[1], term_side='Z', port_speed=1000, upstream_speed=1000, xconnect_id='DEF'),
CircuitTermination(circuit=circuits[1], site=sites[1], term_side='A', port_speed=2000, upstream_speed=2000, xconnect_id='GHI'),
CircuitTermination(circuit=circuits[1], site=sites[2], term_side='Z', port_speed=2000, upstream_speed=2000, xconnect_id='JKL'),
CircuitTermination(circuit=circuits[2], site=sites[2], term_side='A', port_speed=3000, upstream_speed=3000, xconnect_id='MNO'),
CircuitTermination(circuit=circuits[2], site=sites[0], term_side='Z', port_speed=3000, upstream_speed=3000, xconnect_id='PQR'),
))
CircuitTermination.objects.bulk_create(circuit_terminations)
def test_term_side(self):
params = {'term_side': 'A'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_port_speed(self):
params = {'port_speed': ['1000', '2000']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_upstream_speed(self):
params = {'upstream_speed': ['1000', '2000']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_xconnect_id(self):
params = {'xconnect_id': ['ABC', 'DEF']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_circuit_id(self):
circuits = Circuit.objects.all()[:2]
params = {'circuit_id': [circuits[0].pk, circuits[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)

View File

@ -1,3 +1,4 @@
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin
@ -5,9 +6,11 @@ from django.db import transaction
from django.db.models import Count, OuterRef, Subquery
from django.shortcuts import get_object_or_404, redirect, render
from django.views.generic import View
from django_tables2 import RequestConfig
from extras.models import Graph, GRAPH_TYPE_PROVIDER
from utilities.forms import ConfirmationForm
from utilities.paginator import EnhancedPaginator
from utilities.views import (
BulkDeleteView, BulkEditView, BulkImportView, ObjectDeleteView, ObjectEditView, ObjectListView,
)
@ -38,9 +41,18 @@ class ProviderView(PermissionRequiredMixin, View):
circuits = Circuit.objects.filter(provider=provider).prefetch_related('type', 'tenant', 'terminations__site')
show_graphs = Graph.objects.filter(type=GRAPH_TYPE_PROVIDER).exists()
circuits_table = tables.CircuitTable(circuits, orderable=False)
circuits_table.columns.hide('provider')
paginate = {
'paginator_class': EnhancedPaginator,
'per_page': request.GET.get('per_page', settings.PAGINATE_COUNT)
}
RequestConfig(request, paginate).configure(circuits_table)
return render(request, 'circuits/provider.html', {
'provider': provider,
'circuits': circuits,
'circuits_table': circuits_table,
'show_graphs': show_graphs,
})

View File

@ -370,6 +370,10 @@ class DeviceWithConfigContextSerializer(DeviceSerializer):
return obj.get_config_context()
class DeviceNAPALMSerializer(serializers.Serializer):
method = serializers.DictField()
class ConsoleServerPortSerializer(TaggitSerializer, ConnectedEndpointSerializer):
device = NestedDeviceSerializer()
cable = NestedCableSerializer(read_only=True)

View File

@ -358,6 +358,17 @@ class DeviceViewSet(CustomFieldModelViewSet):
return Response(serializer.data)
@swagger_auto_schema(
manual_parameters=[
Parameter(
name='method',
in_='query',
required=True,
type=openapi.TYPE_STRING
)
],
responses={'200': serializers.DeviceNAPALMSerializer}
)
@action(detail=True, url_path='napalm')
def napalm(self, request, pk):
"""
@ -396,13 +407,29 @@ class DeviceViewSet(CustomFieldModelViewSet):
napalm_methods = request.GET.getlist('method')
response = OrderedDict([(m, None) for m in napalm_methods])
ip_address = str(device.primary_ip.address.ip)
username = settings.NAPALM_USERNAME
password = settings.NAPALM_PASSWORD
optional_args = settings.NAPALM_ARGS.copy()
if device.platform.napalm_args is not None:
optional_args.update(device.platform.napalm_args)
# Update NAPALM parameters according to the request headers
for header in request.headers:
if header[:9].lower() != 'x-napalm-':
continue
key = header[9:]
if key.lower() == 'username':
username = request.headers[header]
elif key.lower() == 'password':
password = request.headers[header]
elif key:
optional_args[key.lower()] = request.headers[header]
d = driver(
hostname=ip_address,
username=settings.NAPALM_USERNAME,
password=settings.NAPALM_PASSWORD,
username=username,
password=password,
timeout=settings.NAPALM_TIMEOUT,
optional_args=optional_args
)

View File

@ -1,4 +1,8 @@
# BGP ASN bounds
BGP_ASN_MIN = 1
BGP_ASN_MAX = 2**32 - 1
# Rack types
RACK_TYPE_2POST = 100
RACK_TYPE_4POST = 200

View File

@ -3,14 +3,21 @@ from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models
from netaddr import AddrFormatError, EUI, mac_unix_expanded
from .constants import *
class ASNField(models.BigIntegerField):
description = "32-bit ASN field"
default_validators = [
MinValueValidator(1),
MaxValueValidator(4294967295),
MinValueValidator(BGP_ASN_MIN),
MaxValueValidator(BGP_ASN_MAX),
]
def formfield(self, **kwargs):
defaults = {'min_value': BGP_ASN_MIN, 'max_value': BGP_ASN_MAX}
defaults.update(**kwargs)
return super().formfield(**defaults)
class mac_unix_expanded_uppercase(mac_unix_expanded):
word_fmt = '%.2X'

View File

@ -21,6 +21,45 @@ from .models import (
)
__all__ = (
'CableFilter',
'ConsoleConnectionFilter',
'ConsolePortFilter',
'ConsolePortTemplateFilter',
'ConsoleServerPortFilter',
'ConsoleServerPortTemplateFilter',
'DeviceBayFilter',
'DeviceBayTemplateFilter',
'DeviceFilter',
'DeviceRoleFilter',
'DeviceTypeFilter',
'FrontPortFilter',
'FrontPortTemplateFilter',
'InterfaceConnectionFilter',
'InterfaceFilter',
'InterfaceTemplateFilter',
'InventoryItemFilter',
'ManufacturerFilter',
'PlatformFilter',
'PowerConnectionFilter',
'PowerFeedFilter',
'PowerOutletFilter',
'PowerOutletTemplateFilter',
'PowerPanelFilter',
'PowerPortFilter',
'PowerPortTemplateFilter',
'RackFilter',
'RackGroupFilter',
'RackReservationFilter',
'RackRoleFilter',
'RearPortFilter',
'RearPortTemplateFilter',
'RegionFilter',
'SiteFilter',
'VirtualChassisFilter',
)
class RegionFilter(NameSlugSearchFilterSet):
parent_id = django_filters.ModelMultipleChoiceFilter(
queryset=Region.objects.all(),
@ -646,7 +685,8 @@ class DeviceComponentFilterSet(django_filters.FilterSet):
queryset=Device.objects.all(),
label='Device (ID)',
)
device = django_filters.ModelChoiceFilter(
device = django_filters.ModelMultipleChoiceFilter(
field_name='device__name',
queryset=Device.objects.all(),
to_field_name='name',
label='Device (name)',
@ -1010,6 +1050,14 @@ class CableFilter(django_filters.FilterSet):
method='filter_device',
field_name='device__site__slug'
)
tenant_id = MultiValueNumberFilter(
method='filter_device',
field_name='device__tenant_id'
)
tenant = MultiValueNumberFilter(
method='filter_device',
field_name='device__tenant__slug'
)
class Meta:
model = Cable

View File

@ -74,6 +74,17 @@ class InterfaceCommonForm:
elif self.cleaned_data['mode'] == IFACE_MODE_TAGGED_ALL:
self.cleaned_data['tagged_vlans'] = []
# Validate tagged VLANs; must be a global VLAN or in the same site
elif self.cleaned_data['mode'] == IFACE_MODE_TAGGED:
valid_sites = [None, self.cleaned_data['device'].site]
invalid_vlans = [str(v) for v in tagged_vlans if v.site not in valid_sites]
if invalid_vlans:
raise forms.ValidationError({
'tagged_vlans': "The tagged VLANs ({}) must belong to the same site as the interface's parent "
"device/VM, or they must be global".format(', '.join(invalid_vlans))
})
class BulkRenameForm(forms.Form):
"""
@ -281,8 +292,8 @@ class SiteBulkEditForm(BootstrapMixin, AddRemoveTagsForm, CustomFieldBulkEditFor
)
)
asn = forms.IntegerField(
min_value=1,
max_value=4294967295,
min_value=BGP_ASN_MIN,
max_value=BGP_ASN_MAX,
required=False,
label='ASN'
)
@ -703,6 +714,34 @@ class RackFilterForm(BootstrapMixin, TenancyFilterForm, CustomFieldFilterForm):
)
#
# Rack elevations
#
class RackElevationFilterForm(RackFilterForm):
field_order = ['q', 'region', 'site', 'group_id', 'id', 'status', 'role', 'tenant_group', 'tenant']
id = ChainedModelChoiceField(
queryset=Rack.objects.all(),
label='Rack',
chains=(
('site', 'site'),
('group_id', 'group_id'),
),
required=False,
widget=APISelectMultiple(
api_url='/api/dcim/racks/',
display_field='display_name',
)
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Filter the rack field based on the site and group
self.fields['site'].widget.add_filter_for('id', 'site')
self.fields['group_id'].widget.add_filter_for('id', 'group_id')
#
# Rack reservations
#
@ -1752,7 +1791,7 @@ class DeviceBulkEditForm(BootstrapMixin, AddRemoveTagsForm, CustomFieldBulkEditF
class DeviceFilterForm(BootstrapMixin, LocalConfigContextFilterForm, TenancyFilterForm, CustomFieldFilterForm):
model = Device
field_order = [
'q', 'region', 'site', 'group_id', 'rack_id', 'status', 'role', 'tenant_group', 'tenant',
'q', 'region', 'site', 'rack_group_id', 'rack_id', 'status', 'role', 'tenant_group', 'tenant',
'manufacturer_id', 'device_type_id', 'mac_address', 'has_primary_ip',
]
q = forms.CharField(
@ -1778,12 +1817,12 @@ class DeviceFilterForm(BootstrapMixin, LocalConfigContextFilterForm, TenancyFilt
api_url="/api/dcim/sites/",
value_field="slug",
filter_for={
'group_id': 'site',
'rack_group_id': 'site',
'rack_id': 'site',
}
)
)
group_id = FilterChoiceField(
rack_group_id = FilterChoiceField(
queryset=RackGroup.objects.prefetch_related(
'site'
),
@ -2250,36 +2289,6 @@ class InterfaceForm(InterfaceCommonForm, BootstrapMixin, forms.ModelForm):
device__in=[self.instance.device, self.instance.device.get_vc_master()], type=IFACE_TYPE_LAG
)
# Limit VLan choices to those in: global vlans, global groups, the current site's group, the current site
vlan_choices = []
global_vlans = VLAN.objects.filter(site=None, group=None)
vlan_choices.append(
('Global', [(vlan.pk, vlan) for vlan in global_vlans])
)
for group in VLANGroup.objects.filter(site=None):
global_group_vlans = VLAN.objects.filter(group=group)
vlan_choices.append(
(group.name, [(vlan.pk, vlan) for vlan in global_group_vlans])
)
site = getattr(self.instance.parent, 'site', None)
if site is not None:
# Add non-grouped site VLANs
site_vlans = VLAN.objects.filter(site=site, group=None)
vlan_choices.append((site.name, [(vlan.pk, vlan) for vlan in site_vlans]))
# Add grouped site VLANs
for group in VLANGroup.objects.filter(site=site):
site_group_vlans = VLAN.objects.filter(group=group)
vlan_choices.append((
'{} / {}'.format(group.site.name, group.name),
[(vlan.pk, vlan) for vlan in site_group_vlans]
))
self.fields['untagged_vlan'].choices = [(None, '---------')] + vlan_choices
self.fields['tagged_vlans'].choices = vlan_choices
class InterfaceCreateForm(InterfaceCommonForm, ComponentForm, forms.Form):
name_pattern = ExpandableNameField(
@ -2360,36 +2369,6 @@ class InterfaceCreateForm(InterfaceCommonForm, ComponentForm, forms.Form):
else:
self.fields['lag'].queryset = Interface.objects.none()
# Limit VLan choices to those in: global vlans, global groups, the current site's group, the current site
vlan_choices = []
global_vlans = VLAN.objects.filter(site=None, group=None)
vlan_choices.append(
('Global', [(vlan.pk, vlan) for vlan in global_vlans])
)
for group in VLANGroup.objects.filter(site=None):
global_group_vlans = VLAN.objects.filter(group=group)
vlan_choices.append(
(group.name, [(vlan.pk, vlan) for vlan in global_group_vlans])
)
site = getattr(self.parent, 'site', None)
if site is not None:
# Add non-grouped site VLANs
site_vlans = VLAN.objects.filter(site=site, group=None)
vlan_choices.append((site.name, [(vlan.pk, vlan) for vlan in site_vlans]))
# Add grouped site VLANs
for group in VLANGroup.objects.filter(site=site):
site_group_vlans = VLAN.objects.filter(group=group)
vlan_choices.append((
'{} / {}'.format(group.site.name, group.name),
[(vlan.pk, vlan) for vlan in site_group_vlans]
))
self.fields['untagged_vlan'].choices = [(None, '---------')] + vlan_choices
self.fields['tagged_vlans'].choices = vlan_choices
class InterfaceBulkEditForm(InterfaceCommonForm, BootstrapMixin, AddRemoveTagsForm, BulkEditForm):
pk = forms.ModelMultipleChoiceField(
@ -2472,36 +2451,6 @@ class InterfaceBulkEditForm(InterfaceCommonForm, BootstrapMixin, AddRemoveTagsFo
else:
self.fields['lag'].choices = []
# Limit VLan choices to those in: global vlans, global groups, the current site's group, the current site
vlan_choices = []
global_vlans = VLAN.objects.filter(site=None, group=None)
vlan_choices.append(
('Global', [(vlan.pk, vlan) for vlan in global_vlans])
)
for group in VLANGroup.objects.filter(site=None):
global_group_vlans = VLAN.objects.filter(group=group)
vlan_choices.append(
(group.name, [(vlan.pk, vlan) for vlan in global_group_vlans])
)
if self.parent_obj is not None:
site = getattr(self.parent_obj, 'site', None)
if site is not None:
# Add non-grouped site VLANs
site_vlans = VLAN.objects.filter(site=site, group=None)
vlan_choices.append((site.name, [(vlan.pk, vlan) for vlan in site_vlans]))
# Add grouped site VLANs
for group in VLANGroup.objects.filter(site=site):
site_group_vlans = VLAN.objects.filter(group=group)
vlan_choices.append((
'{} / {}'.format(group.site.name, group.name),
[(vlan.pk, vlan) for vlan in site_group_vlans]
))
self.fields['untagged_vlan'].choices = [(None, '---------')] + vlan_choices
self.fields['tagged_vlans'].choices = vlan_choices
class InterfaceBulkRenameForm(BulkRenameForm):
pk = forms.ModelMultipleChoiceField(
@ -2855,6 +2804,7 @@ class ConnectCableToCircuitTerminationForm(BootstrapMixin, ChainedFieldsMixin, f
termination_b_provider = forms.ModelChoiceField(
queryset=Provider.objects.all(),
label='Provider',
required=False,
widget=APISelect(
api_url='/api/circuits/providers/',
filter_for={
@ -2908,6 +2858,7 @@ class ConnectCableToPowerFeedForm(BootstrapMixin, ChainedFieldsMixin, forms.Mode
termination_b_site = forms.ModelChoiceField(
queryset=Site.objects.all(),
label='Site',
required=False,
widget=APISelect(
api_url='/api/dcim/sites/',
display_field='cid',
@ -2939,6 +2890,7 @@ class ConnectCableToPowerFeedForm(BootstrapMixin, ChainedFieldsMixin, forms.Mode
('rack_group', 'termination_b_rackgroup'),
),
label='Power Panel',
required=False,
widget=APISelect(
api_url='/api/dcim/power-panels/',
filter_for={
@ -3170,6 +3122,17 @@ class CableFilterForm(BootstrapMixin, forms.Form):
}
)
)
tenant = FilterChoiceField(
queryset=Tenant.objects.all(),
to_field_name='slug',
widget=APISelectMultiple(
api_url="/api/tenancy/tenants/",
value_field='slug',
filter_for={
'device_id': 'tenant',
}
)
)
rack_id = FilterChoiceField(
queryset=Rack.objects.all(),
label='Rack',

View File

@ -2755,259 +2755,6 @@ class VirtualChassis(ChangeLoggedModel):
)
#
# Cables
#
class Cable(ChangeLoggedModel):
"""
A physical connection between two endpoints.
"""
termination_a_type = models.ForeignKey(
to=ContentType,
limit_choices_to={'model__in': CABLE_TERMINATION_TYPES},
on_delete=models.PROTECT,
related_name='+'
)
termination_a_id = models.PositiveIntegerField()
termination_a = GenericForeignKey(
ct_field='termination_a_type',
fk_field='termination_a_id'
)
termination_b_type = models.ForeignKey(
to=ContentType,
limit_choices_to={'model__in': CABLE_TERMINATION_TYPES},
on_delete=models.PROTECT,
related_name='+'
)
termination_b_id = models.PositiveIntegerField()
termination_b = GenericForeignKey(
ct_field='termination_b_type',
fk_field='termination_b_id'
)
type = models.PositiveSmallIntegerField(
choices=CABLE_TYPE_CHOICES,
blank=True,
null=True
)
status = models.BooleanField(
choices=CONNECTION_STATUS_CHOICES,
default=CONNECTION_STATUS_CONNECTED
)
label = models.CharField(
max_length=100,
blank=True
)
color = ColorField(
blank=True
)
length = models.PositiveSmallIntegerField(
blank=True,
null=True
)
length_unit = models.PositiveSmallIntegerField(
choices=CABLE_LENGTH_UNIT_CHOICES,
blank=True,
null=True
)
# Stores the normalized length (in meters) for database ordering
_abs_length = models.DecimalField(
max_digits=10,
decimal_places=4,
blank=True,
null=True
)
# Cache the associated device (where applicable) for the A and B terminations. This enables filtering of Cables by
# their associated Devices.
_termination_a_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
_termination_b_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
csv_headers = [
'termination_a_type', 'termination_a_id', 'termination_b_type', 'termination_b_id', 'type', 'status', 'label',
'color', 'length', 'length_unit',
]
class Meta:
ordering = ['pk']
unique_together = (
('termination_a_type', 'termination_a_id'),
('termination_b_type', 'termination_b_id'),
)
def __str__(self):
if self.label:
return self.label
# Save a copy of the PK on the instance since it's nullified if .delete() is called
if not hasattr(self, 'id_string'):
self.id_string = '#{}'.format(self.pk)
return self.id_string
def get_absolute_url(self):
return reverse('dcim:cable', args=[self.pk])
def clean(self):
# Validate that termination A exists
if not hasattr(self, 'termination_a_type'):
raise ValidationError('Termination A type has not been specified')
try:
self.termination_a_type.model_class().objects.get(pk=self.termination_a_id)
except ObjectDoesNotExist:
raise ValidationError({
'termination_a': 'Invalid ID for type {}'.format(self.termination_a_type)
})
# Validate that termination B exists
if not hasattr(self, 'termination_b_type'):
raise ValidationError('Termination B type has not been specified')
try:
self.termination_b_type.model_class().objects.get(pk=self.termination_b_id)
except ObjectDoesNotExist:
raise ValidationError({
'termination_b': 'Invalid ID for type {}'.format(self.termination_b_type)
})
type_a = self.termination_a_type.model
type_b = self.termination_b_type.model
# Validate interface types
if type_a == 'interface' and self.termination_a.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination_a_id': 'Cables cannot be terminated to {} interfaces'.format(
self.termination_a.get_type_display()
)
})
if type_b == 'interface' and self.termination_b.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination_b_id': 'Cables cannot be terminated to {} interfaces'.format(
self.termination_b.get_type_display()
)
})
# Check that termination types are compatible
if type_b not in COMPATIBLE_TERMINATION_TYPES.get(type_a):
raise ValidationError("Incompatible termination types: {} and {}".format(
self.termination_a_type, self.termination_b_type
))
# A component with multiple positions must be connected to a component with an equal number of positions
term_a_positions = getattr(self.termination_a, 'positions', 1)
term_b_positions = getattr(self.termination_b, 'positions', 1)
if term_a_positions != term_b_positions:
raise ValidationError(
"{} has {} positions and {} has {}. Both terminations must have the same number of positions.".format(
self.termination_a, term_a_positions, self.termination_b, term_b_positions
)
)
# A termination point cannot be connected to itself
if self.termination_a == self.termination_b:
raise ValidationError("Cannot connect {} to itself".format(self.termination_a_type))
# A front port cannot be connected to its corresponding rear port
if (
type_a in ['frontport', 'rearport'] and
type_b in ['frontport', 'rearport'] and
(
getattr(self.termination_a, 'rear_port', None) == self.termination_b or
getattr(self.termination_b, 'rear_port', None) == self.termination_a
)
):
raise ValidationError("A front port cannot be connected to it corresponding rear port")
# Check for an existing Cable connected to either termination object
if self.termination_a.cable not in (None, self):
raise ValidationError("{} already has a cable attached (#{})".format(
self.termination_a, self.termination_a.cable_id
))
if self.termination_b.cable not in (None, self):
raise ValidationError("{} already has a cable attached (#{})".format(
self.termination_b, self.termination_b.cable_id
))
# Validate length and length_unit
if self.length is not None and self.length_unit is None:
raise ValidationError("Must specify a unit when setting a cable length")
elif self.length is None:
self.length_unit = None
def save(self, *args, **kwargs):
# Store the given length (if any) in meters for use in database ordering
if self.length and self.length_unit:
self._abs_length = to_meters(self.length, self.length_unit)
# Store the parent Device for the A and B terminations (if applicable) to enable filtering
if hasattr(self.termination_a, 'device'):
self._termination_a_device = self.termination_a.device
if hasattr(self.termination_b, 'device'):
self._termination_b_device = self.termination_b.device
super().save(*args, **kwargs)
def to_csv(self):
return (
'{}.{}'.format(self.termination_a_type.app_label, self.termination_a_type.model),
self.termination_a_id,
'{}.{}'.format(self.termination_b_type.app_label, self.termination_b_type.model),
self.termination_b_id,
self.get_type_display(),
self.get_status_display(),
self.label,
self.color,
self.length,
self.length_unit,
)
def get_status_class(self):
return 'success' if self.status else 'info'
def get_compatible_types(self):
"""
Return all termination types compatible with termination A.
"""
if self.termination_a is None:
return
return COMPATIBLE_TERMINATION_TYPES[self.termination_a._meta.model_name]
def get_path_endpoints(self):
"""
Traverse both ends of a cable path and return its connected endpoints. Note that one or both endpoints may be
None.
"""
a_path = self.termination_b.trace()
b_path = self.termination_a.trace()
# Determine overall path status (connected or planned)
if self.status == CONNECTION_STATUS_PLANNED:
path_status = CONNECTION_STATUS_PLANNED
else:
path_status = CONNECTION_STATUS_CONNECTED
for segment in a_path[1:] + b_path[1:]:
if segment[1] is None or segment[1].status == CONNECTION_STATUS_PLANNED:
path_status = CONNECTION_STATUS_PLANNED
break
a_endpoint = a_path[-1][2]
b_endpoint = b_path[-1][2]
return a_endpoint, b_endpoint, path_status
#
# Power
#
@ -3187,3 +2934,260 @@ class PowerFeed(ChangeLoggedModel, CableTermination, CustomFieldModel):
def get_status_class(self):
return STATUS_CLASSES[self.status]
#
# Cables
#
class Cable(ChangeLoggedModel):
"""
A physical connection between two endpoints.
"""
termination_a_type = models.ForeignKey(
to=ContentType,
limit_choices_to={'model__in': CABLE_TERMINATION_TYPES},
on_delete=models.PROTECT,
related_name='+'
)
termination_a_id = models.PositiveIntegerField()
termination_a = GenericForeignKey(
ct_field='termination_a_type',
fk_field='termination_a_id'
)
termination_b_type = models.ForeignKey(
to=ContentType,
limit_choices_to={'model__in': CABLE_TERMINATION_TYPES},
on_delete=models.PROTECT,
related_name='+'
)
termination_b_id = models.PositiveIntegerField()
termination_b = GenericForeignKey(
ct_field='termination_b_type',
fk_field='termination_b_id'
)
type = models.PositiveSmallIntegerField(
choices=CABLE_TYPE_CHOICES,
blank=True,
null=True
)
status = models.BooleanField(
choices=CONNECTION_STATUS_CHOICES,
default=CONNECTION_STATUS_CONNECTED
)
label = models.CharField(
max_length=100,
blank=True
)
color = ColorField(
blank=True
)
length = models.PositiveSmallIntegerField(
blank=True,
null=True
)
length_unit = models.PositiveSmallIntegerField(
choices=CABLE_LENGTH_UNIT_CHOICES,
blank=True,
null=True
)
# Stores the normalized length (in meters) for database ordering
_abs_length = models.DecimalField(
max_digits=10,
decimal_places=4,
blank=True,
null=True
)
# Cache the associated device (where applicable) for the A and B terminations. This enables filtering of Cables by
# their associated Devices.
_termination_a_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
_termination_b_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
csv_headers = [
'termination_a_type', 'termination_a_id', 'termination_b_type', 'termination_b_id', 'type', 'status', 'label',
'color', 'length', 'length_unit',
]
class Meta:
ordering = ['pk']
unique_together = (
('termination_a_type', 'termination_a_id'),
('termination_b_type', 'termination_b_id'),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# A copy of the PK to be used by __str__ in case the object is deleted
self._pk = self.pk
def __str__(self):
return self.label or '#{}'.format(self._pk)
def get_absolute_url(self):
return reverse('dcim:cable', args=[self.pk])
def clean(self):
# Validate that termination A exists
if not hasattr(self, 'termination_a_type'):
raise ValidationError('Termination A type has not been specified')
try:
self.termination_a_type.model_class().objects.get(pk=self.termination_a_id)
except ObjectDoesNotExist:
raise ValidationError({
'termination_a': 'Invalid ID for type {}'.format(self.termination_a_type)
})
# Validate that termination B exists
if not hasattr(self, 'termination_b_type'):
raise ValidationError('Termination B type has not been specified')
try:
self.termination_b_type.model_class().objects.get(pk=self.termination_b_id)
except ObjectDoesNotExist:
raise ValidationError({
'termination_b': 'Invalid ID for type {}'.format(self.termination_b_type)
})
type_a = self.termination_a_type.model
type_b = self.termination_b_type.model
# Validate interface types
if type_a == 'interface' and self.termination_a.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination_a_id': 'Cables cannot be terminated to {} interfaces'.format(
self.termination_a.get_type_display()
)
})
if type_b == 'interface' and self.termination_b.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination_b_id': 'Cables cannot be terminated to {} interfaces'.format(
self.termination_b.get_type_display()
)
})
# Check that termination types are compatible
if type_b not in COMPATIBLE_TERMINATION_TYPES.get(type_a):
raise ValidationError("Incompatible termination types: {} and {}".format(
self.termination_a_type, self.termination_b_type
))
# A component with multiple positions must be connected to a component with an equal number of positions
term_a_positions = getattr(self.termination_a, 'positions', 1)
term_b_positions = getattr(self.termination_b, 'positions', 1)
if term_a_positions != term_b_positions:
raise ValidationError(
"{} has {} positions and {} has {}. Both terminations must have the same number of positions.".format(
self.termination_a, term_a_positions, self.termination_b, term_b_positions
)
)
# A termination point cannot be connected to itself
if self.termination_a == self.termination_b:
raise ValidationError("Cannot connect {} to itself".format(self.termination_a_type))
# A front port cannot be connected to its corresponding rear port
if (
type_a in ['frontport', 'rearport'] and
type_b in ['frontport', 'rearport'] and
(
getattr(self.termination_a, 'rear_port', None) == self.termination_b or
getattr(self.termination_b, 'rear_port', None) == self.termination_a
)
):
raise ValidationError("A front port cannot be connected to it corresponding rear port")
# Check for an existing Cable connected to either termination object
if self.termination_a.cable not in (None, self):
raise ValidationError("{} already has a cable attached (#{})".format(
self.termination_a, self.termination_a.cable_id
))
if self.termination_b.cable not in (None, self):
raise ValidationError("{} already has a cable attached (#{})".format(
self.termination_b, self.termination_b.cable_id
))
# Validate length and length_unit
if self.length is not None and self.length_unit is None:
raise ValidationError("Must specify a unit when setting a cable length")
elif self.length is None:
self.length_unit = None
def save(self, *args, **kwargs):
# Store the given length (if any) in meters for use in database ordering
if self.length and self.length_unit:
self._abs_length = to_meters(self.length, self.length_unit)
else:
self._abs_length = None
# Store the parent Device for the A and B terminations (if applicable) to enable filtering
if hasattr(self.termination_a, 'device'):
self._termination_a_device = self.termination_a.device
if hasattr(self.termination_b, 'device'):
self._termination_b_device = self.termination_b.device
super().save(*args, **kwargs)
# Update the private pk used in __str__ in case this is a new object (i.e. just got its pk)
self._pk = self.pk
def to_csv(self):
return (
'{}.{}'.format(self.termination_a_type.app_label, self.termination_a_type.model),
self.termination_a_id,
'{}.{}'.format(self.termination_b_type.app_label, self.termination_b_type.model),
self.termination_b_id,
self.get_type_display(),
self.get_status_display(),
self.label,
self.color,
self.length,
self.length_unit,
)
def get_status_class(self):
return 'success' if self.status else 'info'
def get_compatible_types(self):
"""
Return all termination types compatible with termination A.
"""
if self.termination_a is None:
return
return COMPATIBLE_TERMINATION_TYPES[self.termination_a._meta.model_name]
def get_path_endpoints(self):
"""
Traverse both ends of a cable path and return its connected endpoints. Note that one or both endpoints may be
None.
"""
a_path = self.termination_b.trace()
b_path = self.termination_a.trace()
# Determine overall path status (connected or planned)
if self.status == CONNECTION_STATUS_PLANNED:
path_status = CONNECTION_STATUS_PLANNED
else:
path_status = CONNECTION_STATUS_CONNECTED
for segment in a_path[1:] + b_path[1:]:
if segment[1] is None or segment[1].status == CONNECTION_STATUS_PLANNED:
path_status = CONNECTION_STATUS_PLANNED
break
a_endpoint = a_path[-1][2]
b_endpoint = b_path[-1][2]
return a_endpoint, b_endpoint, path_status

File diff suppressed because it is too large Load Diff

View File

@ -325,9 +325,12 @@ class CableTestCase(TestCase):
def test_cable_deletion(self):
"""
When a Cable is deleted, the `cable` field on its termination points must be nullified.
When a Cable is deleted, the `cable` field on its termination points must be nullified. The str() method
should still return the PK of the string even after being nullified.
"""
self.cable.delete()
self.assertIsNone(self.cable.pk)
self.assertNotEqual(str(self.cable), '#None')
interface1 = Interface.objects.get(pk=self.interface1.pk)
self.assertIsNone(interface1.cable)
interface2 = Interface.objects.get(pk=self.interface2.pk)

View File

@ -388,7 +388,7 @@ class RackElevationListView(PermissionRequiredMixin, View):
'page': page,
'total_count': total_count,
'face_id': face_id,
'filter_form': forms.RackFilterForm(request.GET),
'filter_form': forms.RackElevationFilterForm(request.GET),
})
@ -1754,10 +1754,13 @@ class CableTraceView(PermissionRequiredMixin, View):
def get(self, request, model, pk):
obj = get_object_or_404(model, pk=pk)
trace = obj.trace(follow_circuits=True)
total_length = sum([entry[1]._abs_length for entry in trace if entry[1] and entry[1]._abs_length])
return render(request, 'dcim/cable_trace.html', {
'obj': obj,
'trace': obj.trace(follow_circuits=True),
'trace': trace,
'total_length': total_length,
})

View File

@ -8,6 +8,20 @@ from .constants import *
from .models import ConfigContext, CustomField, Graph, ExportTemplate, ObjectChange, Tag, TopologyMap
__all__ = (
'ConfigContextFilter',
'CreatedUpdatedFilterSet',
'CustomFieldFilter',
'CustomFieldFilterSet',
'ExportTemplateFilter',
'GraphFilter',
'LocalConfigContextFilter',
'ObjectChangeFilter',
'TagFilter',
'TopologyMapFilter',
)
class CustomFieldFilter(django_filters.Filter):
"""
Filter objects by the presence of a CustomFieldValue. The filter's name is used as the CustomField name.

View File

@ -263,12 +263,12 @@ class BaseScript:
def run(self, data):
raise NotImplementedError("The script must define a run() method.")
def as_form(self, data=None, files=None):
def as_form(self, data=None, files=None, initial=None):
"""
Return a Django form suitable for populating the context data required to run this Script.
"""
vars = self._get_vars()
form = ScriptForm(vars, data, files, commit_default=getattr(self.Meta, 'commit_default', True))
form = ScriptForm(vars, data, files, initial=initial, commit_default=getattr(self.Meta, 'commit_default', True))
return form

View File

@ -68,8 +68,9 @@ def custom_links(obj):
text_rendered = render_jinja2(cl.text, context)
if text_rendered:
link_target = ' target="_blank"' if cl.new_window else ''
link_rendered = render_jinja2(cl.url, context)
links_rendered.append(
GROUP_LINK.format(cl.url, link_target, cl.text)
GROUP_LINK.format(link_rendered, link_target, text_rendered)
)
except Exception as e:
links_rendered.append(

View File

@ -0,0 +1,181 @@
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from dcim.models import DeviceRole, Platform, Region, Site
from extras.constants import *
from extras.filters import *
from extras.models import ConfigContext, ExportTemplate, Graph
from tenancy.models import Tenant, TenantGroup
class GraphTestCase(TestCase):
queryset = Graph.objects.all()
filterset = GraphFilter
@classmethod
def setUpTestData(cls):
graphs = (
Graph(name='Graph 1', type=GRAPH_TYPE_DEVICE, source='http://example.com/1'),
Graph(name='Graph 2', type=GRAPH_TYPE_INTERFACE, source='http://example.com/2'),
Graph(name='Graph 3', type=GRAPH_TYPE_SITE, source='http://example.com/3'),
)
Graph.objects.bulk_create(graphs)
def test_name(self):
params = {'name': 'Graph 1'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_type(self):
params = {'type': GRAPH_TYPE_DEVICE}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
class ExportTemplateTestCase(TestCase):
queryset = ExportTemplate.objects.all()
filterset = ExportTemplateFilter
@classmethod
def setUpTestData(cls):
content_types = ContentType.objects.filter(model__in=['site', 'rack', 'device'])
export_templates = (
ExportTemplate(name='Export Template 1', content_type=content_types[0], template_language=TEMPLATE_LANGUAGE_DJANGO, template_code='TESTING'),
ExportTemplate(name='Export Template 2', content_type=content_types[1], template_language=TEMPLATE_LANGUAGE_JINJA2, template_code='TESTING'),
ExportTemplate(name='Export Template 3', content_type=content_types[2], template_language=TEMPLATE_LANGUAGE_JINJA2, template_code='TESTING'),
)
ExportTemplate.objects.bulk_create(export_templates)
def test_name(self):
params = {'name': 'Export Template 1'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_content_type(self):
params = {'content_type': ContentType.objects.get(model='site').pk}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_template_language(self):
params = {'template_language': TEMPLATE_LANGUAGE_JINJA2}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class ConfigContextTestCase(TestCase):
queryset = ConfigContext.objects.all()
filterset = ConfigContextFilter
@classmethod
def setUpTestData(cls):
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1'),
Site(name='Test Site 2', slug='test-site-2'),
Site(name='Test Site 3', slug='test-site-3'),
)
Site.objects.bulk_create(sites)
device_roles = (
DeviceRole(name='Device Role 1', slug='device-role-1'),
DeviceRole(name='Device Role 2', slug='device-role-2'),
DeviceRole(name='Device Role 3', slug='device-role-3'),
)
DeviceRole.objects.bulk_create(device_roles)
platforms = (
Platform(name='Platform 1', slug='platform-1'),
Platform(name='Platform 2', slug='platform-2'),
Platform(name='Platform 3', slug='platform-3'),
)
Platform.objects.bulk_create(platforms)
tenant_groups = (
TenantGroup(name='Tenant Group 1', slug='tenant-group-1'),
TenantGroup(name='Tenant Group 2', slug='tenant-group-2'),
TenantGroup(name='Tenant Group 3', slug='tenant-group-3'),
)
TenantGroup.objects.bulk_create(tenant_groups)
tenants = (
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
Tenant(name='Tenant 3', slug='tenant-3'),
)
Tenant.objects.bulk_create(tenants)
for i in range(0, 3):
is_active = bool(i % 2)
c = ConfigContext.objects.create(
name='Config Context {}'.format(i + 1),
is_active=is_active,
data='{"foo": 123}'
)
c.regions.set([regions[i]])
c.sites.set([sites[i]])
c.roles.set([device_roles[i]])
c.platforms.set([platforms[i]])
c.tenant_groups.set([tenant_groups[i]])
c.tenants.set([tenants[i]])
def test_name(self):
params = {'name': 'Config Context 1'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_is_active(self):
params = {'is_active': True}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'is_active': False}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_role(self):
device_roles = DeviceRole.objects.all()[:2]
params = {'role_id': [device_roles[0].pk, device_roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'role': [device_roles[0].slug, device_roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_platform(self):
platforms = Platform.objects.all()[:2]
params = {'platform_id': [platforms[0].pk, platforms[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'platform': [platforms[0].slug, platforms[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_tenant_group(self):
tenant_groups = TenantGroup.objects.all()[:2]
params = {'tenant_group_id': [tenant_groups[0].pk, tenant_groups[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'tenant_group': [tenant_groups[0].slug, tenant_groups[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_tenant_(self):
tenants = Tenant.objects.all()[:2]
params = {'tenant_id': [tenants[0].pk, tenants[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'tenant': [tenants[0].slug, tenants[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
# TODO: ObjectChangeFilter test

View File

@ -392,7 +392,7 @@ class ScriptView(PermissionRequiredMixin, View):
def get(self, request, module, name):
script = self._get_script(module, name)
form = script.as_form()
form = script.as_form(initial=request.GET)
return render(request, 'extras/script.html', {
'module': module,

View File

@ -60,5 +60,5 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
return 'Status {} returned, webhook successfully processed.'.format(response.status_code)
else:
raise requests.exceptions.RequestException(
"Status {} returned, webhook FAILED to process.".format(response.status_code)
"Status {} returned with content '{}', webhook FAILED to process.".format(response.status_code, response.content)
)

View File

@ -13,6 +13,19 @@ from .constants import *
from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
__all__ = (
'AggregateFilter',
'IPAddressFilter',
'PrefixFilter',
'RIRFilter',
'RoleFilter',
'ServiceFilter',
'VLANFilter',
'VLANGroupFilter',
'VRFFilter',
)
class VRFFilter(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
id__in = NumericInFilter(
field_name='id',

View File

@ -933,7 +933,7 @@ class IPAddressBulkEditForm(BootstrapMixin, AddRemoveTagsForm, CustomFieldBulkEd
class IPAddressAssignForm(BootstrapMixin, forms.Form):
vrf = forms.ModelChoiceField(
vrf_id = forms.ModelChoiceField(
queryset=VRF.objects.all(),
required=False,
label='VRF',
@ -942,8 +942,9 @@ class IPAddressAssignForm(BootstrapMixin, forms.Form):
api_url="/api/ipam/vrfs/"
)
)
address = forms.CharField(
label='IP Address'
q = forms.CharField(
required=False,
label='Search',
)

View File

@ -177,6 +177,12 @@ class Aggregate(ChangeLoggedModel, CustomFieldModel):
# Clear host bits from prefix
self.prefix = self.prefix.cidr
# /0 masks are not acceptable
if self.prefix.prefixlen == 0:
raise ValidationError({
'prefix': "Cannot create aggregate with /0 mask."
})
# Ensure that the aggregate being added is not covered by an existing aggregate
covering_aggregates = Aggregate.objects.filter(prefix__net_contains_or_equals=str(self.prefix))
if self.pk:
@ -347,6 +353,12 @@ class Prefix(ChangeLoggedModel, CustomFieldModel):
if self.prefix:
# /0 masks are not acceptable
if self.prefix.prefixlen == 0:
raise ValidationError({
'prefix': "Cannot create prefix with /0 mask."
})
# Disallow host masks
if self.prefix.version == 4 and self.prefix.prefixlen == 32:
raise ValidationError({
@ -622,6 +634,12 @@ class IPAddress(ChangeLoggedModel, CustomFieldModel):
if self.address:
# /0 masks are not acceptable
if self.address.prefixlen == 0:
raise ValidationError({
'address': "Cannot create IP address with /0 mask."
})
# Enforce unique IP space (if applicable)
if self.role not in IPADDRESS_ROLES_NONUNIQUE and ((
self.vrf is None and settings.ENFORCE_GLOBAL_UNIQUE

View File

@ -373,7 +373,7 @@ class IPAddressAssignTable(BaseTable):
class Meta(BaseTable.Meta):
model = IPAddress
fields = ('address', 'vrf', 'status', 'role', 'tenant', 'parent', 'interface', 'description')
fields = ('address', 'dns_name', 'vrf', 'status', 'role', 'tenant', 'parent', 'interface', 'description')
orderable = False

View File

@ -0,0 +1,645 @@
from django.test import TestCase
from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Region, Site
from ipam.constants import *
from ipam.filters import *
from ipam.models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
from virtualization.models import Cluster, ClusterType, VirtualMachine
class VRFTestCase(TestCase):
queryset = VRF.objects.all()
filterset = VRFFilter
@classmethod
def setUpTestData(cls):
vrfs = (
VRF(name='VRF 1', rd='65000:100', enforce_unique=False),
VRF(name='VRF 2', rd='65000:200', enforce_unique=False),
VRF(name='VRF 3', rd='65000:300', enforce_unique=False),
VRF(name='VRF 4', rd='65000:400', enforce_unique=True),
VRF(name='VRF 5', rd='65000:500', enforce_unique=True),
VRF(name='VRF 6', rd='65000:600', enforce_unique=True),
)
VRF.objects.bulk_create(vrfs)
def test_name(self):
params = {'name': ['VRF 1', 'VRF 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_rd(self):
params = {'rd': ['65000:100', '65000:200']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_enforce_unique(self):
params = {'enforce_unique': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'enforce_unique': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
class RIRTestCase(TestCase):
queryset = RIR.objects.all()
filterset = RIRFilter
@classmethod
def setUpTestData(cls):
rirs = (
RIR(name='RIR 1', slug='rir-1', is_private=False),
RIR(name='RIR 2', slug='rir-2', is_private=False),
RIR(name='RIR 3', slug='rir-3', is_private=False),
RIR(name='RIR 4', slug='rir-4', is_private=True),
RIR(name='RIR 5', slug='rir-5', is_private=True),
RIR(name='RIR 6', slug='rir-6', is_private=True),
)
RIR.objects.bulk_create(rirs)
def test_name(self):
params = {'name': ['RIR 1', 'RIR 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['rir-1', 'rir-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_is_private(self):
params = {'is_private': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'is_private': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
class AggregateTestCase(TestCase):
queryset = Aggregate.objects.all()
filterset = AggregateFilter
@classmethod
def setUpTestData(cls):
rirs = (
RIR(name='RIR 1', slug='rir-1'),
RIR(name='RIR 2', slug='rir-2'),
RIR(name='RIR 3', slug='rir-3'),
)
RIR.objects.bulk_create(rirs)
aggregates = (
Aggregate(family=4, prefix='10.1.0.0/16', rir=rirs[0], date_added='2020-01-01'),
Aggregate(family=4, prefix='10.2.0.0/16', rir=rirs[0], date_added='2020-01-02'),
Aggregate(family=4, prefix='10.3.0.0/16', rir=rirs[1], date_added='2020-01-03'),
Aggregate(family=6, prefix='2001:db8:1::/48', rir=rirs[1], date_added='2020-01-04'),
Aggregate(family=6, prefix='2001:db8:2::/48', rir=rirs[2], date_added='2020-01-05'),
Aggregate(family=6, prefix='2001:db8:3::/48', rir=rirs[2], date_added='2020-01-06'),
)
Aggregate.objects.bulk_create(aggregates)
def test_family(self):
params = {'family': '4'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_date_added(self):
params = {'date_added': ['2020-01-01', '2020-01-02']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
# TODO: Test for multiple values
def test_prefix(self):
params = {'prefix': '10.1.0.0/16'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_rir(self):
rirs = RIR.objects.all()[:2]
params = {'rir_id': [rirs[0].pk, rirs[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'rir': [rirs[0].slug, rirs[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
class RoleTestCase(TestCase):
queryset = Role.objects.all()
filterset = RoleFilter
@classmethod
def setUpTestData(cls):
roles = (
Role(name='Role 1', slug='role-1'),
Role(name='Role 2', slug='role-2'),
Role(name='Role 3', slug='role-3'),
)
Role.objects.bulk_create(roles)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Role 1', 'Role 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['role-1', 'role-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class PrefixTestCase(TestCase):
queryset = Prefix.objects.all()
filterset = PrefixFilter
@classmethod
def setUpTestData(cls):
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
)
Site.objects.bulk_create(sites)
vrfs = (
VRF(name='VRF 1', rd='65000:100'),
VRF(name='VRF 2', rd='65000:200'),
VRF(name='VRF 3', rd='65000:300'),
)
VRF.objects.bulk_create(vrfs)
vlans = (
VLAN(vid=1, name='VLAN 1'),
VLAN(vid=2, name='VLAN 2'),
VLAN(vid=3, name='VLAN 3'),
)
VLAN.objects.bulk_create(vlans)
roles = (
Role(name='Role 1', slug='role-1'),
Role(name='Role 2', slug='role-2'),
Role(name='Role 3', slug='role-3'),
)
Role.objects.bulk_create(roles)
prefixes = (
Prefix(family=4, prefix='10.0.0.0/24', site=None, vrf=None, vlan=None, role=None, is_pool=True),
Prefix(family=4, prefix='10.0.1.0/24', site=sites[0], vrf=vrfs[0], vlan=vlans[0], role=roles[0]),
Prefix(family=4, prefix='10.0.2.0/24', site=sites[1], vrf=vrfs[1], vlan=vlans[1], role=roles[1], status=PREFIX_STATUS_DEPRECATED),
Prefix(family=4, prefix='10.0.3.0/24', site=sites[2], vrf=vrfs[2], vlan=vlans[2], role=roles[2], status=PREFIX_STATUS_RESERVED),
Prefix(family=6, prefix='2001:db8::/64', site=None, vrf=None, vlan=None, role=None, is_pool=True),
Prefix(family=6, prefix='2001:db8:0:1::/64', site=sites[0], vrf=vrfs[0], vlan=vlans[0], role=roles[0]),
Prefix(family=6, prefix='2001:db8:0:2::/64', site=sites[1], vrf=vrfs[1], vlan=vlans[1], role=roles[1], status=PREFIX_STATUS_DEPRECATED),
Prefix(family=6, prefix='2001:db8:0:3::/64', site=sites[2], vrf=vrfs[2], vlan=vlans[2], role=roles[2], status=PREFIX_STATUS_RESERVED),
Prefix(family=4, prefix='10.0.0.0/16'),
Prefix(family=6, prefix='2001:db8::/32'),
)
Prefix.objects.bulk_create(prefixes)
def test_family(self):
params = {'family': '6'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)
def test_is_pool(self):
params = {'is_pool': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'is_pool': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_within(self):
params = {'within': '10.0.0.0/16'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_within_include(self):
params = {'within_include': '10.0.0.0/16'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)
def test_contains(self):
params = {'contains': '10.0.1.0/24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'contains': '2001:db8:0:1::/64'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_mask_length(self):
params = {'mask_length': '24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_vrf(self):
vrfs = VRF.objects.all()[:2]
params = {'vrf_id': [vrfs[0].pk, vrfs[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'vrf': [vrfs[0].rd, vrfs[1].rd]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_vlan(self):
vlans = VLAN.objects.all()[:2]
params = {'vlan_id': [vlans[0].pk, vlans[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
# TODO: Test for multiple values
params = {'vlan_vid': vlans[0].vid}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_role(self):
roles = Role.objects.all()[:2]
params = {'role_id': [roles[0].pk, roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'role': [roles[0].slug, roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_status(self):
params = {'status': [PREFIX_STATUS_DEPRECATED, PREFIX_STATUS_RESERVED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
class IPAddressTestCase(TestCase):
queryset = IPAddress.objects.all()
filterset = IPAddressFilter
@classmethod
def setUpTestData(cls):
vrfs = (
VRF(name='VRF 1', rd='65000:100'),
VRF(name='VRF 2', rd='65000:200'),
VRF(name='VRF 3', rd='65000:300'),
)
VRF.objects.bulk_create(vrfs)
site = Site.objects.create(name='Site 1', slug='site-1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
)
Device.objects.bulk_create(devices)
clustertype = ClusterType.objects.create(name='Cluster Type 1', slug='cluster-type-1')
cluster = Cluster.objects.create(type=clustertype, name='Cluster 1')
virtual_machines = (
VirtualMachine(name='Virtual Machine 1', cluster=cluster),
VirtualMachine(name='Virtual Machine 2', cluster=cluster),
VirtualMachine(name='Virtual Machine 3', cluster=cluster),
)
VirtualMachine.objects.bulk_create(virtual_machines)
interfaces = (
Interface(device=devices[0], name='Interface 1'),
Interface(device=devices[1], name='Interface 2'),
Interface(device=devices[2], name='Interface 3'),
Interface(virtual_machine=virtual_machines[0], name='Interface 1'),
Interface(virtual_machine=virtual_machines[1], name='Interface 2'),
Interface(virtual_machine=virtual_machines[2], name='Interface 3'),
)
Interface.objects.bulk_create(interfaces)
ipaddresses = (
IPAddress(family=4, address='10.0.0.1/24', vrf=None, interface=None, status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-a'),
IPAddress(family=4, address='10.0.0.2/24', vrf=vrfs[0], interface=interfaces[0], status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-b'),
IPAddress(family=4, address='10.0.0.3/24', vrf=vrfs[1], interface=interfaces[1], status=IPADDRESS_STATUS_RESERVED, role=IPADDRESS_ROLE_VIP, dns_name='ipaddress-c'),
IPAddress(family=4, address='10.0.0.4/24', vrf=vrfs[2], interface=interfaces[2], status=IPADDRESS_STATUS_DEPRECATED, role=IPADDRESS_ROLE_SECONDARY, dns_name='ipaddress-d'),
IPAddress(family=6, address='2001:db8::1/64', vrf=None, interface=None, status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-a'),
IPAddress(family=6, address='2001:db8::2/64', vrf=vrfs[0], interface=interfaces[3], status=IPADDRESS_STATUS_ACTIVE, role=None, dns_name='ipaddress-b'),
IPAddress(family=6, address='2001:db8::3/64', vrf=vrfs[1], interface=interfaces[4], status=IPADDRESS_STATUS_RESERVED, role=IPADDRESS_ROLE_VIP, dns_name='ipaddress-c'),
IPAddress(family=6, address='2001:db8::4/64', vrf=vrfs[2], interface=interfaces[5], status=IPADDRESS_STATUS_DEPRECATED, role=IPADDRESS_ROLE_SECONDARY, dns_name='ipaddress-d'),
)
IPAddress.objects.bulk_create(ipaddresses)
def test_family(self):
params = {'family': '6'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_dns_name(self):
params = {'dns_name': ['ipaddress-a', 'ipaddress-b']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_parent(self):
params = {'parent': '10.0.0.0/24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'parent': '2001:db8::/64'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def filter_address(self):
# Check IPv4 and IPv6, with and without a mask
params = {'address': '10.0.0.1/24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'address': '10.0.0.1'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'address': '2001:db8::1/64'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'address': '2001:db8::1'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_mask_length(self):
params = {'mask_length': '24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_vrf(self):
vrfs = VRF.objects.all()[:2]
params = {'vrf_id': [vrfs[0].pk, vrfs[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'vrf': [vrfs[0].rd, vrfs[1].rd]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
# TODO: Test for multiple values
def test_device(self):
device = Device.objects.first()
params = {'device_id': device.pk}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'device': device.name}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_virtual_machine(self):
vms = VirtualMachine.objects.all()[:2]
params = {'virtual_machine_id': [vms[0].pk, vms[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'virtual_machine': [vms[0].name, vms[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_interface(self):
interfaces = Interface.objects.all()[:2]
params = {'interface_id': [interfaces[0].pk, interfaces[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'interface': ['Interface 1', 'Interface 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_assigned_to_interface(self):
params = {'assigned_to_interface': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 6)
params = {'assigned_to_interface': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_status(self):
params = {'status': [PREFIX_STATUS_DEPRECATED, PREFIX_STATUS_RESERVED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_role(self):
params = {'role': [IPADDRESS_ROLE_SECONDARY, IPADDRESS_ROLE_VIP]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
class VLANGroupTestCase(TestCase):
queryset = VLANGroup.objects.all()
filterset = VLANGroupFilter
@classmethod
def setUpTestData(cls):
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
)
Site.objects.bulk_create(sites)
vlan_groups = (
VLANGroup(name='VLAN Group 1', slug='vlan-group-1', site=sites[0]),
VLANGroup(name='VLAN Group 2', slug='vlan-group-2', site=sites[1]),
VLANGroup(name='VLAN Group 3', slug='vlan-group-3', site=sites[2]),
VLANGroup(name='VLAN Group 4', slug='vlan-group-4', site=None),
)
VLANGroup.objects.bulk_create(vlan_groups)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['VLAN Group 1', 'VLAN Group 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['vlan-group-1', 'vlan-group-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class VLANTestCase(TestCase):
queryset = VLAN.objects.all()
filterset = VLANFilter
@classmethod
def setUpTestData(cls):
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
)
Site.objects.bulk_create(sites)
roles = (
Role(name='Role 1', slug='role-1'),
Role(name='Role 2', slug='role-2'),
Role(name='Role 3', slug='role-3'),
)
Role.objects.bulk_create(roles)
groups = (
VLANGroup(name='VLAN Group 1', slug='vlan-group-1', site=sites[0]),
VLANGroup(name='VLAN Group 2', slug='vlan-group-2', site=sites[1]),
VLANGroup(name='VLAN Group 3', slug='vlan-group-3', site=None),
)
VLANGroup.objects.bulk_create(groups)
vlans = (
VLAN(vid=101, name='VLAN 101', site=sites[0], group=groups[0], role=roles[0], status=VLAN_STATUS_ACTIVE),
VLAN(vid=102, name='VLAN 102', site=sites[0], group=groups[0], role=roles[0], status=VLAN_STATUS_ACTIVE),
VLAN(vid=201, name='VLAN 201', site=sites[1], group=groups[1], role=roles[1], status=VLAN_STATUS_DEPRECATED),
VLAN(vid=202, name='VLAN 202', site=sites[1], group=groups[1], role=roles[1], status=VLAN_STATUS_DEPRECATED),
VLAN(vid=301, name='VLAN 301', site=sites[2], group=groups[2], role=roles[2], status=VLAN_STATUS_RESERVED),
VLAN(vid=302, name='VLAN 302', site=sites[2], group=groups[2], role=roles[2], status=VLAN_STATUS_RESERVED),
)
VLAN.objects.bulk_create(vlans)
def test_name(self):
params = {'name': ['VLAN 101', 'VLAN 102']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_rd(self):
params = {'vid': ['101', '201', '301']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_group(self):
groups = VLANGroup.objects.all()[:2]
params = {'group_id': [groups[0].pk, groups[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'group': [groups[0].slug, groups[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_role(self):
roles = Role.objects.all()[:2]
params = {'role_id': [roles[0].pk, roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'role': [roles[0].slug, roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_status(self):
params = {'status': [VLAN_STATUS_ACTIVE, VLAN_STATUS_DEPRECATED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
class ServiceTestCase(TestCase):
queryset = Service.objects.all()
filterset = ServiceFilter
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site-1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
)
Device.objects.bulk_create(devices)
clustertype = ClusterType.objects.create(name='Cluster Type 1', slug='cluster-type-1')
cluster = Cluster.objects.create(type=clustertype, name='Cluster 1')
virtual_machines = (
VirtualMachine(name='Virtual Machine 1', cluster=cluster),
VirtualMachine(name='Virtual Machine 2', cluster=cluster),
VirtualMachine(name='Virtual Machine 3', cluster=cluster),
)
VirtualMachine.objects.bulk_create(virtual_machines)
services = (
Service(device=devices[0], name='Service 1', protocol=IP_PROTOCOL_TCP, port=1001),
Service(device=devices[1], name='Service 2', protocol=IP_PROTOCOL_TCP, port=1002),
Service(device=devices[2], name='Service 3', protocol=IP_PROTOCOL_UDP, port=1003),
Service(virtual_machine=virtual_machines[0], name='Service 4', protocol=IP_PROTOCOL_TCP, port=2001),
Service(virtual_machine=virtual_machines[1], name='Service 5', protocol=IP_PROTOCOL_TCP, port=2002),
Service(virtual_machine=virtual_machines[2], name='Service 6', protocol=IP_PROTOCOL_UDP, port=2003),
)
Service.objects.bulk_create(services)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:3]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_name(self):
params = {'name': ['Service 1', 'Service 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_protocol(self):
params = {'protocol': IP_PROTOCOL_TCP}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_port(self):
params = {'port': ['1001', '1002', '1003']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'device': [devices[0].name, devices[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_virtual_machine(self):
vms = VirtualMachine.objects.all()[:2]
params = {'virtual_machine_id': [vms[0].pk, vms[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'virtual_machine': [vms[0].name, vms[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -333,7 +333,10 @@ class AggregateView(PermissionRequiredMixin, View):
).annotate_depth(
limit=0
)
child_prefixes = add_available_prefixes(aggregate.prefix, child_prefixes)
# Add available prefixes to the table if requested
if request.GET.get('show_available', 'true') == 'true':
child_prefixes = add_available_prefixes(aggregate.prefix, child_prefixes)
prefix_table = tables.PrefixDetailTable(child_prefixes)
if request.user.has_perm('ipam.change_prefix') or request.user.has_perm('ipam.delete_prefix'):
@ -356,6 +359,7 @@ class AggregateView(PermissionRequiredMixin, View):
'aggregate': aggregate,
'prefix_table': prefix_table,
'permissions': permissions,
'show_available': request.GET.get('show_available', 'true') == 'true',
})
@ -511,8 +515,8 @@ class PrefixPrefixesView(PermissionRequiredMixin, View):
'site', 'vlan', 'role',
).annotate_depth(limit=0)
# Annotate available prefixes
if child_prefixes:
# Add available prefixes to the table if requested
if child_prefixes and request.GET.get('show_available', 'true') == 'true':
child_prefixes = add_available_prefixes(prefix.prefix, child_prefixes)
prefix_table = tables.PrefixDetailTable(child_prefixes)
@ -539,6 +543,7 @@ class PrefixPrefixesView(PermissionRequiredMixin, View):
'permissions': permissions,
'bulk_querystring': 'vrf_id={}&within={}'.format(prefix.vrf.pk if prefix.vrf else '0', prefix.prefix),
'active_tab': 'prefixes',
'show_available': request.GET.get('show_available', 'true') == 'true',
})
@ -553,7 +558,10 @@ class PrefixIPAddressesView(PermissionRequiredMixin, View):
ipaddresses = prefix.get_child_ips().prefetch_related(
'vrf', 'interface__device', 'primary_ip4_for', 'primary_ip6_for'
)
ipaddresses = add_available_ipaddresses(prefix.prefix, ipaddresses, prefix.is_pool)
# Add available IP addresses to the table if requested
if request.GET.get('show_available', 'true') == 'true':
ipaddresses = add_available_ipaddresses(prefix.prefix, ipaddresses, prefix.is_pool)
ip_table = tables.IPAddressTable(ipaddresses)
if request.user.has_perm('ipam.change_ipaddress') or request.user.has_perm('ipam.delete_ipaddress'):
@ -579,6 +587,7 @@ class PrefixIPAddressesView(PermissionRequiredMixin, View):
'permissions': permissions,
'bulk_querystring': 'vrf_id={}&parent={}'.format(prefix.vrf.pk if prefix.vrf else '0', prefix.prefix),
'active_tab': 'ip-addresses',
'show_available': request.GET.get('show_available', 'true') == 'true',
})
@ -677,7 +686,14 @@ class IPAddressView(PermissionRequiredMixin, View):
).filter(
vrf=ipaddress.vrf, address__net_contained_or_equal=str(ipaddress.address)
)
related_ips_table = tables.IPAddressTable(list(related_ips), orderable=False)
related_ips_table = tables.IPAddressTable(related_ips, orderable=False)
paginate = {
'paginator_class': EnhancedPaginator,
'per_page': request.GET.get('per_page', settings.PAGINATE_COUNT)
}
RequestConfig(request, paginate).configure(related_ips_table)
return render(request, 'ipam/ipaddress.html', {
'ipaddress': ipaddress,
@ -740,13 +756,12 @@ class IPAddressAssignView(PermissionRequiredMixin, View):
if form.is_valid():
queryset = IPAddress.objects.prefetch_related(
addresses = IPAddress.objects.prefetch_related(
'vrf', 'tenant', 'interface__device', 'interface__virtual_machine'
).filter(
vrf=form.cleaned_data['vrf'],
address__istartswith=form.cleaned_data['address'],
)[:100] # Limit to 100 results
table = tables.IPAddressAssignTable(queryset)
)
# Limit to 100 results
addresses = filters.IPAddressFilter(request.POST, addresses).qs[:100]
table = tables.IPAddressAssignTable(addresses)
return render(request, 'ipam/ipaddress_assign.html', {
'form': form,

View File

@ -12,7 +12,7 @@ from django.core.exceptions import ImproperlyConfigured
# Environment setup
#
VERSION = '2.6.11'
VERSION = '2.6.12'
# Hostname
HOSTNAME = platform.node()

View File

@ -7,7 +7,7 @@ $(document).ready(function() {
// "Toggle" checkbox for object lists (PK column)
$('input:checkbox.toggle').click(function() {
$(this).closest('table').find('input:checkbox[name=pk]').prop('checked', $(this).prop('checked'));
$(this).closest('table').find('input:checkbox[name=pk]:visible').prop('checked', $(this).prop('checked'));
// Show the "select all" box if present
if ($(this).is(':checked')) {
@ -398,4 +398,43 @@ $(document).ready(function() {
// Account for the header height when hash-scrolling
window.addEventListener('load', headerOffsetScroll);
window.addEventListener('hashchange', headerOffsetScroll);
// Offset between the preview window and the window edges
const IMAGE_PREVIEW_OFFSET_X = 20;
const IMAGE_PREVIEW_OFFSET_Y = 10;
// Preview an image attachment when the link is hovered over
$('a.image-preview').on('mouseover', function(e) {
// Twice the offset to account for all sides of the picture
var maxWidth = window.innerWidth - (e.clientX + (IMAGE_PREVIEW_OFFSET_X * 2));
var maxHeight = window.innerHeight - (e.clientY + (IMAGE_PREVIEW_OFFSET_Y * 2));
var img = $('<img>').attr('id', 'image-preview-window').css({
display: 'none',
position: 'absolute',
maxWidth: maxWidth + 'px',
maxHeight: maxHeight + 'px',
left: e.pageX + IMAGE_PREVIEW_OFFSET_X + 'px',
top: e.pageY + IMAGE_PREVIEW_OFFSET_Y + 'px',
boxShadow: '0 0px 12px 3px rgba(0, 0, 0, 0.4)',
});
// Remove any existing preview windows and add the current one
$('#image-preview-window').remove();
$('body').append(img);
// Once loaded, show the preview if the image is indeed an image
img.on('load', function(e) {
if (e.target.complete && e.target.naturalWidth) {
$('#image-preview-window').fadeIn('fast');
}
});
// Begin loading
img.attr('src', e.target.href);
});
// Fade the image out; it will be deleted when another one is previewed
$('a.image-preview').on('mouseout', function() {
$('#image-preview-window').fadeOut('fast');
});
});

View File

@ -0,0 +1,30 @@
// Toggle the display of IP addresses under interfaces
$('button.toggle-ips').click(function() {
var selected = $(this).attr('selected');
if (selected) {
$('#interfaces_table tr.ipaddresses').hide();
} else {
$('#interfaces_table tr.ipaddresses').show();
}
$(this).attr('selected', !selected);
$(this).children('span').toggleClass('glyphicon-check glyphicon-unchecked');
return false;
});
// Inteface filtering
$('input.interface-filter').on('input', function() {
var filter = new RegExp(this.value);
for (interface of $(this).closest('form').find('tbody > tr')) {
// Slice off 'interface_' at the start of the ID
if (filter && filter.test(interface.id.slice(10))) {
// Match the toggle in case the filter now matches the interface
$(interface).find('input:checkbox[name=pk]').prop('checked', $('input.toggle').prop('checked'));
$(interface).show();
} else {
// Uncheck to prevent actions from including it when it doesn't match
$(interface).find('input:checkbox[name=pk]').prop('checked', false);
$(interface).hide();
}
}
});

View File

@ -7,6 +7,12 @@ from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilte
from .models import Secret, SecretRole
__all__ = (
'SecretFilter',
'SecretRoleFilter',
)
class SecretRoleFilter(NameSlugSearchFilterSet):
class Meta:

View File

@ -0,0 +1,92 @@
from django.test import TestCase
from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Site
from secrets.filters import *
from secrets.models import Secret, SecretRole
class SecretRoleTestCase(TestCase):
queryset = SecretRole.objects.all()
filterset = SecretRoleFilter
@classmethod
def setUpTestData(cls):
roles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
SecretRole(name='Secret Role 3', slug='secret-role-3'),
)
SecretRole.objects.bulk_create(roles)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Secret Role 1', 'Secret Role 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['secret-role-1', 'secret-role-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class SecretTestCase(TestCase):
queryset = Secret.objects.all()
filterset = SecretFilter
@classmethod
def setUpTestData(cls):
site = Site.objects.create(name='Site 1', slug='site-1')
manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type 1')
device_role = DeviceRole.objects.create(name='Device Role 1', slug='device-role-1')
devices = (
Device(device_type=device_type, name='Device 1', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 2', site=site, device_role=device_role),
Device(device_type=device_type, name='Device 3', site=site, device_role=device_role),
)
Device.objects.bulk_create(devices)
roles = (
SecretRole(name='Secret Role 1', slug='secret-role-1'),
SecretRole(name='Secret Role 2', slug='secret-role-2'),
SecretRole(name='Secret Role 3', slug='secret-role-3'),
)
SecretRole.objects.bulk_create(roles)
secrets = (
Secret(device=devices[0], role=roles[0], name='Secret 1', plaintext='SECRET DATA'),
Secret(device=devices[1], role=roles[1], name='Secret 2', plaintext='SECRET DATA'),
Secret(device=devices[2], role=roles[2], name='Secret 3', plaintext='SECRET DATA'),
)
# Must call save() to encrypt Secrets
for s in secrets:
s.save()
def test_name(self):
params = {'name': ['Secret 1', 'Secret 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_role(self):
roles = SecretRole.objects.all()[:2]
params = {'role_id': [roles[0].pk, roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'role': [roles[0].slug, roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_device(self):
devices = Device.objects.all()[:2]
params = {'device_id': [devices[0].pk, devices[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'device': [devices[0].name, devices[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -125,58 +125,7 @@
<div class="panel-heading">
<strong>Circuits</strong>
</div>
<table class="table table-hover panel-body">
<tr>
<th>Circuit ID</th>
<th>Type</th>
<th>Tenant</th>
<th>A Side</th>
<th>Z Side</th>
<th>Description</th>
</tr>
{% for c in circuits %}
<tr>
<td>
<a href="{% url 'circuits:circuit' pk=c.pk %}">{{ c.cid }}</a>
</td>
<td>
<a href="{% url 'circuits:circuit_list' %}?type={{ c.type.slug }}">{{ c.type }}</a>
</td>
<td>
{% if c.tenant %}
<a href="{% url 'tenancy:tenant' slug=c.tenant.slug %}">{{ c.tenant }}</a>
{% else %}
<span class="text-muted">&mdash;</span>
{% endif %}
</td>
<td>
{% if c.termination_a %}
<a href="{% url 'dcim:site' slug=c.termination_a.site.slug %}">{{ c.termination_a.site }}</a>
{% else %}
<span class="text-muted">&mdash;</span>
{% endif %}
</td>
<td>
{% if c.termination_z %}
<a href="{% url 'dcim:site' slug=c.termination_z.site.slug %}">{{ c.termination_z.site }}</a>
{% else %}
<span class="text-muted">&mdash;</span>
{% endif %}
</td>
<td>
{% if c.description %}
{{ c.description }}
{% else %}
<span class="text-muted">&mdash;</span>
{% endif %}
</td>
</tr>
{% empty %}
<tr>
<td colspan="6" class="text-muted">None</td>
</tr>
{% endfor %}
</table>
{% include 'inc/table.html' with table=circuits_table %}
{% if perms.circuits.add_circuit %}
<div class="panel-footer text-right noprint">
<a href="{% url 'circuits:circuit_add' %}?provider={{ provider.pk }}" class="btn btn-xs btn-primary">
@ -185,6 +134,7 @@
</div>
{% endif %}
</div>
{% include 'inc/paginator.html' with paginator=circuits_table.paginator page=circuits_table.page %}
</div>
</div>
{% include 'inc/modal.html' with modal_name='graphs' %}

View File

@ -10,7 +10,10 @@
<div class="col-md-4 col-md-offset-1 text-center">
<h4>Near End</h4>
</div>
<div class="col-md-4 col-md-offset-3 text-center">
<div class="col-md-3 text-center">
{% if total_length %}<h5>Total length: {{ total_length|floatformat:"-2" }} Meters<h5>{% endif %}
</div>
<div class="col-md-4 text-center">
<h4>Far End</h4>
</div>
</div>

View File

@ -556,6 +556,9 @@
<span class="glyphicon glyphicon-check" aria-hidden="true"></span> Show IPs
</button>
</div>
<div class="col-md-2 pull-right noprint">
<input class="form-control interface-filter" type="text" placeholder="Filter" title="RegEx-enabled" style="height: 23px" />
</div>
</div>
<table id="interfaces_table" class="table table-hover table-headings panel-body component-list">
<thead>
@ -900,19 +903,8 @@ function toggleConnection(elem) {
$(".cable-toggle").click(function() {
return toggleConnection($(this));
});
// Toggle the display of IP addresses under interfaces
$('button.toggle-ips').click(function() {
var selected = $(this).attr('selected');
if (selected) {
$('#interfaces_table tr.ipaddresses').hide();
} else {
$('#interfaces_table tr.ipaddresses').show();
}
$(this).attr('selected', !selected);
$(this).children('span').toggleClass('glyphicon-check glyphicon-unchecked');
return false;
});
</script>
<script src="{% static 'js/interface_toggles.js' %}?v{{ settings.VERSION }}"></script>
<script src="{% static 'js/graphs.js' %}?v{{ settings.VERSION }}"></script>
<script src="{% static 'js/secrets.js' %}?v{{ settings.VERSION }}"></script>
{% endblock %}

View File

@ -112,7 +112,9 @@
{% if utilization %}
<td>
{{ utilization.allocated }}VA / {{ powerfeed.available_power }}VA
{% utilization_graph utilization.allocated|percentage:powerfeed.available_power %}
{% if powerfeed.available_power > 0 %}
{% utilization_graph utilization.allocated|percentage:powerfeed.available_power %}
{% endif %}
</td>
{% else %}
<td class="text-muted">N/A</td>

View File

@ -112,7 +112,7 @@
{% if configcontext.roles.all %}
<ul>
{% for role in configcontext.roles.all %}
<li><a href="{{ role.get_absolute_url }}">{{ role }}</a></li>
<li><a href="{% url 'dcim:device_list' %}?role={{ role.slug }}">{{ role }}</a></li>
{% endfor %}
</ul>
{% else %}

View File

@ -10,7 +10,7 @@
<tr{% if not attachment.size %} class="danger"{% endif %}>
<td>
<i class="fa fa-image"></i>
<a href="{{ attachment.image.url }}" target="_blank">{{ attachment }}</a>
<a class="image-preview" href="{{ attachment.image.url }}" target="_blank">{{ attachment }}</a>
</td>
<td>{{ attachment.size|filesizeformat }}</td>
<td>{{ attachment.created }}</td>

View File

@ -40,6 +40,7 @@
</div>
<h1>{% block title %}{{ aggregate }}{% endblock %}</h1>
{% include 'inc/created_updated.html' with obj=aggregate %}
{% include 'ipam/inc/toggle_available.html' %}
<div class="pull-right noprint">
{% custom_links aggregate %}
</div>

View File

@ -0,0 +1,9 @@
{% load helpers %}
{% if show_available is not None %}
<div class="pull-right">
<div class="btn-group" role="group">
<a href="{{ request.path }}{% querystring request show_available='true' %}" class="btn btn-default{% if show_available %} active disabled{% endif %}">Show available</a>
<a href="{{ request.path }}{% querystring request show_available='false' %}" class="btn btn-default{% if not show_available %} active disabled{% endif %}">Hide available</a>
</div>
</div>
{% endif %}

View File

@ -160,7 +160,7 @@
{% if duplicate_ips_table.rows %}
{% include 'panel_table.html' with table=duplicate_ips_table heading='Duplicate IP Addresses' panel_class='danger' %}
{% endif %}
{% include 'panel_table.html' with table=related_ips_table heading='Related IP Addresses' panel_class='default noprint' %}
{% include 'utilities/obj_table.html' with table=related_ips_table table_template='panel_table.html' heading='Related IP Addresses' panel_class='default noprint' %}
</div>
</div>
{% endblock %}

View File

@ -24,8 +24,8 @@
<div class="panel panel-default">
<div class="panel-heading"><strong>Select IP Address</strong></div>
<div class="panel-body">
{% render_field form.vrf %}
{% render_field form.address %}
{% render_field form.vrf_id %}
{% render_field form.q %}
</div>
</div>
</div>

View File

@ -53,6 +53,7 @@
</div>
<h1>{% block title %}{{ prefix }}{% endblock %}</h1>
{% include 'inc/created_updated.html' with obj=prefix %}
{% include 'ipam/inc/toggle_available.html' %}
<div class="pull-right noprint">
{% custom_links prefix %}
</div>

View File

@ -1,5 +1,6 @@
{% extends '_base.html' %}
{% load custom_links %}
{% load static %}
{% load helpers %}
{% block header %}
@ -253,6 +254,9 @@
<span class="glyphicon glyphicon-check" aria-hidden="true"></span> Show IPs
</button>
</div>
<div class="col-md-2 pull-right noprint">
<input class="form-control interface-filter" type="text" placeholder="Filter" title="RegEx-enabled" style="height: 23px" />
</div>
</div>
<table id="interfaces_table" class="table table-hover table-headings panel-body component-list">
<thead>
@ -312,18 +316,5 @@
{% endblock %}
{% block javascript %}
<script type="text/javascript">
// Toggle the display of IP addresses under interfaces
$('button.toggle-ips').click(function() {
var selected = $(this).attr('selected');
if (selected) {
$('#interfaces_table tr.ipaddresses').hide();
} else {
$('#interfaces_table tr.ipaddresses').show();
}
$(this).attr('selected', !selected);
$(this).children('span').toggleClass('glyphicon-check glyphicon-unchecked');
return false;
});
</script>
<script src="{% static 'js/interface_toggles.js' %}?v{{ settings.VERSION }}"></script>
{% endblock %}

View File

@ -6,6 +6,12 @@ from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilte
from .models import Tenant, TenantGroup
__all__ = (
'TenantFilter',
'TenantGroupFilter',
)
class TenantGroupFilter(NameSlugSearchFilterSet):
class Meta:

View File

@ -0,0 +1,74 @@
from django.test import TestCase
from tenancy.filters import *
from tenancy.models import Tenant, TenantGroup
class TenantGroupTestCase(TestCase):
queryset = TenantGroup.objects.all()
filterset = TenantGroupFilter
@classmethod
def setUpTestData(cls):
groups = (
TenantGroup(name='Tenant Group 1', slug='tenant-group-1'),
TenantGroup(name='Tenant Group 2', slug='tenant-group-2'),
TenantGroup(name='Tenant Group 3', slug='tenant-group-3'),
)
TenantGroup.objects.bulk_create(groups)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Tenant Group 1', 'Tenant Group 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['tenant-group-1', 'tenant-group-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class TenantTestCase(TestCase):
queryset = Tenant.objects.all()
filterset = TenantFilter
@classmethod
def setUpTestData(cls):
groups = (
TenantGroup(name='Tenant Group 1', slug='tenant-group-1'),
TenantGroup(name='Tenant Group 2', slug='tenant-group-2'),
TenantGroup(name='Tenant Group 3', slug='tenant-group-3'),
)
TenantGroup.objects.bulk_create(groups)
tenants = (
Tenant(name='Tenant 1', slug='tenant-1', group=groups[0]),
Tenant(name='Tenant 2', slug='tenant-2', group=groups[1]),
Tenant(name='Tenant 3', slug='tenant-3', group=groups[2]),
)
Tenant.objects.bulk_create(tenants)
def test_name(self):
params = {'name': ['Tenant 1', 'Tenant 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['tenant-1', 'tenant-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_group(self):
group = TenantGroup.objects.all()[:2]
params = {'group_id': [group[0].pk, group[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'group': [group[0].slug, group[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -60,8 +60,16 @@ def parse_alphanumeric_range(string):
for n in list(range(int(begin), int(end) + 1)):
values.append(n)
else:
for n in list(range(ord(begin), ord(end) + 1)):
values.append(chr(n))
# Value-based
if begin == end:
values.append(begin)
# Range-based
else:
# Not a valid range (more than a single character)
if not len(begin) == len(end) == 1:
raise forms.ValidationError('Range "{}" is invalid.'.format(dash_range))
for n in list(range(ord(begin), ord(end) + 1)):
values.append(chr(n))
return values
@ -481,6 +489,7 @@ class ExpandableNameField(forms.CharField):
'Mixed cases and types within a single range are not supported.<br />' \
'Examples:<ul><li><code>ge-0/0/[0-23,25,30]</code></li>' \
'<li><code>e[0-3][a-d,f]</code></li>' \
'<li><code>[xe,ge]-0/0/0</code></li>' \
'<li><code>e[0-3,a-d,f]</code></li></ul>'
def to_python(self, value):

View File

@ -0,0 +1,283 @@
from django import forms
from django.test import TestCase
from utilities.forms import *
class ExpandIPAddress(TestCase):
"""
Validate the operation of expand_ipaddress_pattern().
"""
def test_ipv4_range(self):
input = '1.2.3.[9-10]/32'
output = sorted([
'1.2.3.9/32',
'1.2.3.10/32',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 4)), output)
def test_ipv4_set(self):
input = '1.2.3.[4,44]/32'
output = sorted([
'1.2.3.4/32',
'1.2.3.44/32',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 4)), output)
def test_ipv4_multiple_ranges(self):
input = '1.[9-10].3.[9-11]/32'
output = sorted([
'1.9.3.9/32',
'1.9.3.10/32',
'1.9.3.11/32',
'1.10.3.9/32',
'1.10.3.10/32',
'1.10.3.11/32',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 4)), output)
def test_ipv4_multiple_sets(self):
input = '1.[2,22].3.[4,44]/32'
output = sorted([
'1.2.3.4/32',
'1.2.3.44/32',
'1.22.3.4/32',
'1.22.3.44/32',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 4)), output)
def test_ipv4_set_and_range(self):
input = '1.[2,22].3.[9-11]/32'
output = sorted([
'1.2.3.9/32',
'1.2.3.10/32',
'1.2.3.11/32',
'1.22.3.9/32',
'1.22.3.10/32',
'1.22.3.11/32',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 4)), output)
def test_ipv6_range(self):
input = 'fec::abcd:[9-b]/64'
output = sorted([
'fec::abcd:9/64',
'fec::abcd:a/64',
'fec::abcd:b/64',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 6)), output)
def test_ipv6_range_multichar_field(self):
input = 'fec::abcd:[f-11]/64'
output = sorted([
'fec::abcd:f/64',
'fec::abcd:10/64',
'fec::abcd:11/64',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 6)), output)
def test_ipv6_set(self):
input = 'fec::abcd:[9,ab]/64'
output = sorted([
'fec::abcd:9/64',
'fec::abcd:ab/64',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 6)), output)
def test_ipv6_multiple_ranges(self):
input = 'fec::[1-2]bcd:[9-b]/64'
output = sorted([
'fec::1bcd:9/64',
'fec::1bcd:a/64',
'fec::1bcd:b/64',
'fec::2bcd:9/64',
'fec::2bcd:a/64',
'fec::2bcd:b/64',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 6)), output)
def test_ipv6_multiple_sets(self):
input = 'fec::[a,f]bcd:[9,ab]/64'
output = sorted([
'fec::abcd:9/64',
'fec::abcd:ab/64',
'fec::fbcd:9/64',
'fec::fbcd:ab/64',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 6)), output)
def test_ipv6_set_and_range(self):
input = 'fec::[dead,beaf]:[9-b]/64'
output = sorted([
'fec::dead:9/64',
'fec::dead:a/64',
'fec::dead:b/64',
'fec::beaf:9/64',
'fec::beaf:a/64',
'fec::beaf:b/64',
])
self.assertEqual(sorted(expand_ipaddress_pattern(input, 6)), output)
def test_invalid_address_family(self):
with self.assertRaisesRegex(Exception, 'Invalid IP address family: 5'):
sorted(expand_ipaddress_pattern(None, 5))
def test_invalid_non_pattern(self):
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.4/32', 4))
def test_invalid_range(self):
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[4-]/32', 4))
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[-4]/32', 4))
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[4--5]/32', 4))
def test_invalid_range_bounds(self):
self.assertEqual(sorted(expand_ipaddress_pattern('1.2.3.[4-3]/32', 6)), [])
def test_invalid_set(self):
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[4]/32', 4))
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[4,]/32', 4))
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[,4]/32', 4))
with self.assertRaises(ValueError):
sorted(expand_ipaddress_pattern('1.2.3.[4,,5]/32', 4))
class ExpandAlphanumeric(TestCase):
"""
Validate the operation of expand_alphanumeric_pattern().
"""
def test_range_numberic(self):
input = 'r[9-11]a'
output = sorted([
'r9a',
'r10a',
'r11a',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_range_alpha(self):
input = '[r-t]1a'
output = sorted([
'r1a',
's1a',
't1a',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_set(self):
input = '[r,t]1a'
output = sorted([
'r1a',
't1a',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_set_multichar(self):
input = '[ra,tb]1a'
output = sorted([
'ra1a',
'tb1a',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_multiple_ranges(self):
input = '[r-t]1[a-b]'
output = sorted([
'r1a',
'r1b',
's1a',
's1b',
't1a',
't1b',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_multiple_sets(self):
input = '[ra,tb]1[ax,by]'
output = sorted([
'ra1ax',
'ra1by',
'tb1ax',
'tb1by',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_set_and_range(self):
input = '[ra,tb]1[a-c]'
output = sorted([
'ra1a',
'ra1b',
'ra1c',
'tb1a',
'tb1b',
'tb1c',
])
self.assertEqual(sorted(expand_alphanumeric_pattern(input)), output)
def test_invalid_non_pattern(self):
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r9a'))
def test_invalid_range(self):
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[8-]a'))
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[-8]a'))
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[8--9]a'))
def test_invalid_range_alphanumeric(self):
self.assertEqual(sorted(expand_alphanumeric_pattern('r[9-a]a')), [])
self.assertEqual(sorted(expand_alphanumeric_pattern('r[a-9]a')), [])
def test_invalid_range_bounds(self):
self.assertEqual(sorted(expand_alphanumeric_pattern('r[9-8]a')), [])
self.assertEqual(sorted(expand_alphanumeric_pattern('r[b-a]a')), [])
def test_invalid_range_len(self):
with self.assertRaises(forms.ValidationError):
sorted(expand_alphanumeric_pattern('r[a-bb]a'))
def test_invalid_set(self):
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[a]a'))
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[a,]a'))
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[,a]a'))
with self.assertRaises(ValueError):
sorted(expand_alphanumeric_pattern('r[a,,b]a'))

View File

@ -1,10 +1,8 @@
import django_filters
from django.db.models import Q
from netaddr import EUI
from netaddr.core import AddrFormatError
from dcim.models import DeviceRole, Interface, Platform, Region, Site
from extras.filters import CustomFieldFilterSet, CreatedUpdatedFilterSet
from extras.filters import CustomFieldFilterSet, CreatedUpdatedFilterSet, LocalConfigContextFilter
from tenancy.filtersets import TenancyFilterSet
from utilities.filters import (
MultiValueMACAddressFilter, NameSlugSearchFilterSet, NumericInFilter, TagFilter, TreeNodeMultipleChoiceFilter,
@ -13,6 +11,15 @@ from .constants import *
from .models import Cluster, ClusterGroup, ClusterType, VirtualMachine
__all__ = (
'ClusterFilter',
'ClusterGroupFilter',
'ClusterTypeFilter',
'InterfaceFilter',
'VirtualMachineFilter',
)
class ClusterTypeFilter(NameSlugSearchFilterSet):
class Meta:
@ -92,7 +99,7 @@ class ClusterFilter(CustomFieldFilterSet, CreatedUpdatedFilterSet):
)
class VirtualMachineFilter(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
class VirtualMachineFilter(LocalConfigContextFilter, TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
@ -208,8 +215,7 @@ class InterfaceFilter(django_filters.FilterSet):
to_field_name='name',
label='Virtual machine',
)
mac_address = django_filters.CharFilter(
method='_mac_address',
mac_address = MultiValueMACAddressFilter(
label='MAC address',
)
@ -217,16 +223,6 @@ class InterfaceFilter(django_filters.FilterSet):
model = Interface
fields = ['id', 'name', 'enabled', 'mtu']
def _mac_address(self, queryset, name, value):
value = value.strip()
if not value:
return queryset
try:
mac = EUI(value.strip())
return queryset.filter(mac_address=mac)
except AddrFormatError:
return queryset.none()
def search(self, queryset, name, value):
if not value.strip():
return queryset

View File

@ -0,0 +1,373 @@
from django.test import TestCase
from dcim.models import DeviceRole, Interface, Platform, Region, Site
from virtualization.constants import *
from virtualization.filters import *
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
class ClusterTypeTestCase(TestCase):
queryset = ClusterType.objects.all()
filterset = ClusterTypeFilter
@classmethod
def setUpTestData(cls):
cluster_types = (
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
)
ClusterType.objects.bulk_create(cluster_types)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Cluster Type 1', 'Cluster Type 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['cluster-type-1', 'cluster-type-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class ClusterGroupTestCase(TestCase):
queryset = ClusterGroup.objects.all()
filterset = ClusterGroupFilter
@classmethod
def setUpTestData(cls):
cluster_groups = (
ClusterGroup(name='Cluster Group 1', slug='cluster-group-1'),
ClusterGroup(name='Cluster Group 2', slug='cluster-group-2'),
ClusterGroup(name='Cluster Group 3', slug='cluster-group-3'),
)
ClusterGroup.objects.bulk_create(cluster_groups)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Cluster Group 1', 'Cluster Group 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_slug(self):
params = {'slug': ['cluster-group-1', 'cluster-group-2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class ClusterTestCase(TestCase):
queryset = Cluster.objects.all()
filterset = ClusterFilter
@classmethod
def setUpTestData(cls):
cluster_types = (
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
)
ClusterType.objects.bulk_create(cluster_types)
cluster_groups = (
ClusterGroup(name='Cluster Group 1', slug='cluster-group-1'),
ClusterGroup(name='Cluster Group 2', slug='cluster-group-2'),
ClusterGroup(name='Cluster Group 3', slug='cluster-group-3'),
)
ClusterGroup.objects.bulk_create(cluster_groups)
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
)
Site.objects.bulk_create(sites)
clusters = (
Cluster(name='Cluster 1', type=cluster_types[0], group=cluster_groups[0], site=sites[0]),
Cluster(name='Cluster 2', type=cluster_types[1], group=cluster_groups[1], site=sites[1]),
Cluster(name='Cluster 3', type=cluster_types[2], group=cluster_groups[2], site=sites[2]),
)
Cluster.objects.bulk_create(clusters)
def test_name(self):
params = {'name': ['Cluster 1', 'Cluster 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_group(self):
groups = ClusterGroup.objects.all()[:2]
params = {'group_id': [groups[0].pk, groups[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'group': [groups[0].slug, groups[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_type(self):
types = ClusterType.objects.all()[:2]
params = {'type_id': [types[0].pk, types[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'type': [types[0].slug, types[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class VirtualMachineTestCase(TestCase):
queryset = VirtualMachine.objects.all()
filterset = VirtualMachineFilter
@classmethod
def setUpTestData(cls):
cluster_types = (
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
)
ClusterType.objects.bulk_create(cluster_types)
cluster_groups = (
ClusterGroup(name='Cluster Group 1', slug='cluster-group-1'),
ClusterGroup(name='Cluster Group 2', slug='cluster-group-2'),
ClusterGroup(name='Cluster Group 3', slug='cluster-group-3'),
)
ClusterGroup.objects.bulk_create(cluster_groups)
regions = (
Region(name='Test Region 1', slug='test-region-1'),
Region(name='Test Region 2', slug='test-region-2'),
Region(name='Test Region 3', slug='test-region-3'),
)
# Can't use bulk_create for models with MPTT fields
for r in regions:
r.save()
sites = (
Site(name='Test Site 1', slug='test-site-1', region=regions[0]),
Site(name='Test Site 2', slug='test-site-2', region=regions[1]),
Site(name='Test Site 3', slug='test-site-3', region=regions[2]),
)
Site.objects.bulk_create(sites)
clusters = (
Cluster(name='Cluster 1', type=cluster_types[0], group=cluster_groups[0], site=sites[0]),
Cluster(name='Cluster 2', type=cluster_types[1], group=cluster_groups[1], site=sites[1]),
Cluster(name='Cluster 3', type=cluster_types[2], group=cluster_groups[2], site=sites[2]),
)
Cluster.objects.bulk_create(clusters)
platforms = (
Platform(name='Platform 1', slug='platform-1'),
Platform(name='Platform 2', slug='platform-2'),
Platform(name='Platform 3', slug='platform-3'),
)
Platform.objects.bulk_create(platforms)
roles = (
DeviceRole(name='Device Role 1', slug='device-role-1'),
DeviceRole(name='Device Role 2', slug='device-role-2'),
DeviceRole(name='Device Role 3', slug='device-role-3'),
)
DeviceRole.objects.bulk_create(roles)
vms = (
VirtualMachine(name='Virtual Machine 1', cluster=clusters[0], platform=platforms[0], role=roles[0], status=DEVICE_STATUS_ACTIVE, vcpus=1, memory=1, disk=1, local_context_data={"foo": 123}),
VirtualMachine(name='Virtual Machine 2', cluster=clusters[1], platform=platforms[1], role=roles[1], status=DEVICE_STATUS_STAGED, vcpus=2, memory=2, disk=2),
VirtualMachine(name='Virtual Machine 3', cluster=clusters[2], platform=platforms[2], role=roles[2], status=DEVICE_STATUS_OFFLINE, vcpus=3, memory=3, disk=3),
)
VirtualMachine.objects.bulk_create(vms)
interfaces = (
Interface(virtual_machine=vms[0], name='Interface 1', mac_address='00-00-00-00-00-01'),
Interface(virtual_machine=vms[1], name='Interface 2', mac_address='00-00-00-00-00-02'),
Interface(virtual_machine=vms[2], name='Interface 3', mac_address='00-00-00-00-00-03'),
)
Interface.objects.bulk_create(interfaces)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Virtual Machine 1', 'Virtual Machine 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_vcpus(self):
params = {'vcpus': [1, 2]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_memory(self):
params = {'memory': [1, 2]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_disk(self):
params = {'disk': [1, 2]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_id__in(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id__in': ','.join([str(id) for id in id_list])}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_status(self):
params = {'status': [DEVICE_STATUS_ACTIVE, DEVICE_STATUS_STAGED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_cluster_group(self):
groups = ClusterGroup.objects.all()[:2]
params = {'cluster_group_id': [groups[0].pk, groups[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'cluster_group': [groups[0].slug, groups[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_cluster_type(self):
types = ClusterType.objects.all()[:2]
params = {'cluster_type_id': [types[0].pk, types[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'cluster_type': [types[0].slug, types[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_cluster(self):
clusters = Cluster.objects.all()[:2]
params = {'cluster_id': [clusters[0].pk, clusters[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
# TODO: 'cluster' should match on name
# params = {'cluster': [clusters[0].name, clusters[1].name]}
# self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_region(self):
regions = Region.objects.all()[:2]
params = {'region_id': [regions[0].pk, regions[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'region': [regions[0].slug, regions[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_role(self):
roles = DeviceRole.objects.all()[:2]
params = {'role_id': [roles[0].pk, roles[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'role': [roles[0].slug, roles[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_platform(self):
platforms = Platform.objects.all()[:2]
params = {'platform_id': [platforms[0].pk, platforms[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'platform': [platforms[0].slug, platforms[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_mac_address(self):
params = {'mac_address': ['00-00-00-00-00-01', '00-00-00-00-00-02']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_local_context_data(self):
params = {'local_context_data': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'local_context_data': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class InterfaceTestCase(TestCase):
queryset = Interface.objects.all()
filterset = InterfaceFilter
@classmethod
def setUpTestData(cls):
cluster_types = (
ClusterType(name='Cluster Type 1', slug='cluster-type-1'),
ClusterType(name='Cluster Type 2', slug='cluster-type-2'),
ClusterType(name='Cluster Type 3', slug='cluster-type-3'),
)
ClusterType.objects.bulk_create(cluster_types)
clusters = (
Cluster(name='Cluster 1', type=cluster_types[0]),
Cluster(name='Cluster 2', type=cluster_types[1]),
Cluster(name='Cluster 3', type=cluster_types[2]),
)
Cluster.objects.bulk_create(clusters)
vms = (
VirtualMachine(name='Virtual Machine 1', cluster=clusters[0]),
VirtualMachine(name='Virtual Machine 2', cluster=clusters[1]),
VirtualMachine(name='Virtual Machine 3', cluster=clusters[2]),
)
VirtualMachine.objects.bulk_create(vms)
interfaces = (
Interface(virtual_machine=vms[0], name='Interface 1', enabled=True, mtu=100, mac_address='00-00-00-00-00-01'),
Interface(virtual_machine=vms[1], name='Interface 2', enabled=True, mtu=200, mac_address='00-00-00-00-00-02'),
Interface(virtual_machine=vms[2], name='Interface 3', enabled=False, mtu=300, mac_address='00-00-00-00-00-03'),
)
Interface.objects.bulk_create(interfaces)
def test_id(self):
id_list = self.queryset.values_list('id', flat=True)[:2]
params = {'id': [str(id) for id in id_list]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_name(self):
params = {'name': ['Interface 1', 'Interface 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_assigned_to_interface(self):
params = {'enabled': 'true'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'enabled': 'false'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_mtu(self):
params = {'mtu': [100, 200]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_virtual_machine(self):
vms = VirtualMachine.objects.all()[:2]
params = {'virtual_machine_id': [vms[0].pk, vms[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'virtual_machine': [vms[0].name, vms[1].name]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_mac_address(self):
params = {'mac_address': ['00-00-00-00-00-01', '00-00-00-00-00-02']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)