netbox/netbox/ipam/filters.py
John Anderson ade844f7a7 fixes #3428 - caching invalidation issues
Mitgate invalidation issues by using prefetch_related instead of select_related.
Also use invalidated_update instead of just update.
2019-08-19 01:53:39 -04:00

483 lines
14 KiB
Python

import django_filters
import netaddr
from django.core.exceptions import ValidationError
from django.db.models import Q
from netaddr.core import AddrFormatError
from dcim.models import Site, Device, Interface
from extras.filters import CustomFieldFilterSet
from tenancy.filtersets import TenancyFilterSet
from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilter
from virtualization.models import VirtualMachine
from .constants import IPADDRESS_ROLE_CHOICES, IPADDRESS_STATUS_CHOICES, PREFIX_STATUS_CHOICES, VLAN_STATUS_CHOICES
from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
class VRFFilter(TenancyFilterSet, CustomFieldFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
)
q = django_filters.CharFilter(
method='search',
label='Search',
)
tag = TagFilter()
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(name__icontains=value) |
Q(rd__icontains=value) |
Q(description__icontains=value)
)
class Meta:
model = VRF
fields = ['name', 'rd', 'enforce_unique']
class RIRFilter(NameSlugSearchFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
)
class Meta:
model = RIR
fields = ['name', 'slug', 'is_private']
class AggregateFilter(CustomFieldFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
)
q = django_filters.CharFilter(
method='search',
label='Search',
)
prefix = django_filters.CharFilter(
method='filter_prefix',
label='Prefix',
)
rir_id = django_filters.ModelMultipleChoiceFilter(
queryset=RIR.objects.all(),
label='RIR (ID)',
)
rir = django_filters.ModelMultipleChoiceFilter(
field_name='rir__slug',
queryset=RIR.objects.all(),
to_field_name='slug',
label='RIR (slug)',
)
tag = TagFilter()
class Meta:
model = Aggregate
fields = ['family', 'date_added']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(description__icontains=value)
try:
prefix = str(netaddr.IPNetwork(value.strip()).cidr)
qs_filter |= Q(prefix__net_contains_or_equals=prefix)
except (AddrFormatError, ValueError):
pass
return queryset.filter(qs_filter)
def filter_prefix(self, queryset, name, value):
if not value.strip():
return queryset
try:
query = str(netaddr.IPNetwork(value).cidr)
return queryset.filter(prefix=query)
except ValidationError:
return queryset.none()
class RoleFilter(NameSlugSearchFilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
class Meta:
model = Role
fields = ['id', 'name', 'slug']
class PrefixFilter(TenancyFilterSet, CustomFieldFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
)
q = django_filters.CharFilter(
method='search',
label='Search',
)
prefix = django_filters.CharFilter(
method='filter_prefix',
label='Prefix',
)
within = django_filters.CharFilter(
method='search_within',
label='Within prefix',
)
within_include = django_filters.CharFilter(
method='search_within_include',
label='Within and including prefix',
)
contains = django_filters.CharFilter(
method='search_contains',
label='Prefixes which contain this prefix or IP',
)
mask_length = django_filters.NumberFilter(
method='filter_mask_length',
label='Mask length',
)
vrf_id = django_filters.ModelMultipleChoiceFilter(
queryset=VRF.objects.all(),
label='VRF',
)
vrf = django_filters.ModelMultipleChoiceFilter(
field_name='vrf__rd',
queryset=VRF.objects.all(),
to_field_name='rd',
label='VRF (RD)',
)
site_id = django_filters.ModelMultipleChoiceFilter(
queryset=Site.objects.all(),
label='Site (ID)',
)
site = django_filters.ModelMultipleChoiceFilter(
field_name='site__slug',
queryset=Site.objects.all(),
to_field_name='slug',
label='Site (slug)',
)
vlan_id = django_filters.ModelMultipleChoiceFilter(
queryset=VLAN.objects.all(),
label='VLAN (ID)',
)
vlan_vid = django_filters.NumberFilter(
field_name='vlan__vid',
label='VLAN number (1-4095)',
)
role_id = django_filters.ModelMultipleChoiceFilter(
queryset=Role.objects.all(),
label='Role (ID)',
)
role = django_filters.ModelMultipleChoiceFilter(
field_name='role__slug',
queryset=Role.objects.all(),
to_field_name='slug',
label='Role (slug)',
)
status = django_filters.MultipleChoiceFilter(
choices=PREFIX_STATUS_CHOICES,
null_value=None
)
tag = TagFilter()
class Meta:
model = Prefix
fields = ['family', 'is_pool']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(description__icontains=value)
try:
prefix = str(netaddr.IPNetwork(value.strip()).cidr)
qs_filter |= Q(prefix__net_contains_or_equals=prefix)
except (AddrFormatError, ValueError):
pass
return queryset.filter(qs_filter)
def filter_prefix(self, queryset, name, value):
if not value.strip():
return queryset
try:
query = str(netaddr.IPNetwork(value).cidr)
return queryset.filter(prefix=query)
except ValidationError:
return queryset.none()
def search_within(self, queryset, name, value):
value = value.strip()
if not value:
return queryset
try:
query = str(netaddr.IPNetwork(value).cidr)
return queryset.filter(prefix__net_contained=query)
except (AddrFormatError, ValueError):
return queryset.none()
def search_within_include(self, queryset, name, value):
value = value.strip()
if not value:
return queryset
try:
query = str(netaddr.IPNetwork(value).cidr)
return queryset.filter(prefix__net_contained_or_equal=query)
except (AddrFormatError, ValueError):
return queryset.none()
def search_contains(self, queryset, name, value):
value = value.strip()
if not value:
return queryset
try:
# Searching by prefix
if '/' in value:
return queryset.filter(prefix__net_contains_or_equals=str(netaddr.IPNetwork(value).cidr))
# Searching by IP address
else:
return queryset.filter(prefix__net_contains=str(netaddr.IPAddress(value)))
except (AddrFormatError, ValueError):
return queryset.none()
def filter_mask_length(self, queryset, name, value):
if not value:
return queryset
return queryset.filter(prefix__net_mask_length=value)
class IPAddressFilter(TenancyFilterSet, CustomFieldFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
)
q = django_filters.CharFilter(
method='search',
label='Search',
)
parent = django_filters.CharFilter(
method='search_by_parent',
label='Parent prefix',
)
address = django_filters.CharFilter(
method='filter_address',
label='Address',
)
mask_length = django_filters.NumberFilter(
method='filter_mask_length',
label='Mask length',
)
vrf_id = django_filters.ModelMultipleChoiceFilter(
queryset=VRF.objects.all(),
label='VRF',
)
vrf = django_filters.ModelMultipleChoiceFilter(
field_name='vrf__rd',
queryset=VRF.objects.all(),
to_field_name='rd',
label='VRF (RD)',
)
device = django_filters.CharFilter(
method='filter_device',
field_name='name',
label='Device',
)
device_id = django_filters.NumberFilter(
method='filter_device',
field_name='pk',
label='Device (ID)',
)
virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
field_name='interface__virtual_machine',
queryset=VirtualMachine.objects.all(),
label='Virtual machine (ID)',
)
virtual_machine = django_filters.ModelMultipleChoiceFilter(
field_name='interface__virtual_machine__name',
queryset=VirtualMachine.objects.all(),
to_field_name='name',
label='Virtual machine (name)',
)
interface = django_filters.ModelMultipleChoiceFilter(
field_name='interface__name',
queryset=Interface.objects.all(),
to_field_name='name',
label='Interface (ID)',
)
interface_id = django_filters.ModelMultipleChoiceFilter(
queryset=Interface.objects.all(),
label='Interface (ID)',
)
status = django_filters.MultipleChoiceFilter(
choices=IPADDRESS_STATUS_CHOICES,
null_value=None
)
role = django_filters.MultipleChoiceFilter(
choices=IPADDRESS_ROLE_CHOICES
)
tag = TagFilter()
class Meta:
model = IPAddress
fields = ['family', 'dns_name']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = (
Q(dns_name__icontains=value) |
Q(description__icontains=value) |
Q(address__istartswith=value)
)
return queryset.filter(qs_filter)
def search_by_parent(self, queryset, name, value):
value = value.strip()
if not value:
return queryset
try:
query = str(netaddr.IPNetwork(value.strip()).cidr)
return queryset.filter(address__net_host_contained=query)
except (AddrFormatError, ValueError):
return queryset.none()
def filter_address(self, queryset, name, value):
if not value.strip():
return queryset
try:
# Match address and subnet mask
if '/' in value:
return queryset.filter(address=value)
return queryset.filter(address__net_host=value)
except ValidationError:
return queryset.none()
def filter_mask_length(self, queryset, name, value):
if not value:
return queryset
return queryset.filter(address__net_mask_length=value)
def filter_device(self, queryset, name, value):
try:
device = Device.objects.prefetch_related('device_type').get(**{name: value})
vc_interface_ids = [i['id'] for i in device.vc_interfaces.values('id')]
return queryset.filter(interface_id__in=vc_interface_ids)
except Device.DoesNotExist:
return queryset.none()
class VLANGroupFilter(NameSlugSearchFilterSet):
site_id = django_filters.ModelMultipleChoiceFilter(
queryset=Site.objects.all(),
label='Site (ID)',
)
site = django_filters.ModelMultipleChoiceFilter(
field_name='site__slug',
queryset=Site.objects.all(),
to_field_name='slug',
label='Site (slug)',
)
class Meta:
model = VLANGroup
fields = ['id', 'name', 'slug']
class VLANFilter(TenancyFilterSet, CustomFieldFilterSet):
id__in = NumericInFilter(
field_name='id',
lookup_expr='in'
)
q = django_filters.CharFilter(
method='search',
label='Search',
)
site_id = django_filters.ModelMultipleChoiceFilter(
queryset=Site.objects.all(),
label='Site (ID)',
)
site = django_filters.ModelMultipleChoiceFilter(
field_name='site__slug',
queryset=Site.objects.all(),
to_field_name='slug',
label='Site (slug)',
)
group_id = django_filters.ModelMultipleChoiceFilter(
queryset=VLANGroup.objects.all(),
label='Group (ID)',
)
group = django_filters.ModelMultipleChoiceFilter(
field_name='group__slug',
queryset=VLANGroup.objects.all(),
to_field_name='slug',
label='Group',
)
role_id = django_filters.ModelMultipleChoiceFilter(
queryset=Role.objects.all(),
label='Role (ID)',
)
role = django_filters.ModelMultipleChoiceFilter(
field_name='role__slug',
queryset=Role.objects.all(),
to_field_name='slug',
label='Role (slug)',
)
status = django_filters.MultipleChoiceFilter(
choices=VLAN_STATUS_CHOICES,
null_value=None
)
tag = TagFilter()
class Meta:
model = VLAN
fields = ['vid', 'name']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
try:
qs_filter |= Q(vid=int(value.strip()))
except ValueError:
pass
return queryset.filter(qs_filter)
class ServiceFilter(django_filters.FilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
device_id = django_filters.ModelMultipleChoiceFilter(
queryset=Device.objects.all(),
label='Device (ID)',
)
device = django_filters.ModelMultipleChoiceFilter(
field_name='device__name',
queryset=Device.objects.all(),
to_field_name='name',
label='Device (name)',
)
virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
queryset=VirtualMachine.objects.all(),
label='Virtual machine (ID)',
)
virtual_machine = django_filters.ModelMultipleChoiceFilter(
field_name='virtual_machine__name',
queryset=VirtualMachine.objects.all(),
to_field_name='name',
label='Virtual machine (name)',
)
tag = TagFilter()
class Meta:
model = Service
fields = ['id', 'name', 'protocol', 'port']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
return queryset.filter(qs_filter)