Merge v2.6.3

This commit is contained in:
Jeremy Stretch
2019-09-04 16:45:33 -04:00
88 changed files with 2742 additions and 1105 deletions

View File

@@ -230,6 +230,6 @@ class ObjectChangeViewSet(ReadOnlyModelViewSet):
"""
Retrieve a list of recent changes.
"""
queryset = ObjectChange.objects.select_related('user')
queryset = ObjectChange.objects.prefetch_related('user')
serializer_class = serializers.ObjectChangeSerializer
filterset_class = filters.ObjectChangeFilter

View File

@@ -8,9 +8,10 @@ from taggit.forms import TagField
from dcim.models import DeviceRole, Platform, Region, Site
from tenancy.models import Tenant, TenantGroup
from utilities.constants import COLOR_CHOICES
from utilities.forms import (
add_blank_choice, APISelectMultiple, BootstrapMixin, BulkEditForm, BulkEditNullBooleanSelect, CommentField,
ContentTypeSelect, FilterChoiceField, LaxURLField, JSONField, SlugField,
add_blank_choice, APISelectMultiple, BootstrapMixin, BulkEditForm, BulkEditNullBooleanSelect, ColorSelect,
CommentField, ContentTypeSelect, FilterChoiceField, LaxURLField, JSONField, SlugField,
)
from .constants import (
CF_FILTER_DISABLED, CF_TYPE_BOOLEAN, CF_TYPE_DATE, CF_TYPE_INTEGER, CF_TYPE_SELECT, CF_TYPE_URL,
@@ -111,8 +112,10 @@ class CustomFieldForm(forms.ModelForm):
# If editing an existing object, initialize values for all custom fields
if self.instance.pk:
existing_values = CustomFieldValue.objects.filter(obj_type=self.obj_type, obj_id=self.instance.pk)\
.select_related('field')
existing_values = CustomFieldValue.objects.filter(
obj_type=self.obj_type,
obj_id=self.instance.pk
).prefetch_related('field')
for cfv in existing_values:
self.initial['cf_{}'.format(str(cfv.field.name))] = cfv.serialized_value
@@ -120,9 +123,11 @@ class CustomFieldForm(forms.ModelForm):
for field_name in self.custom_fields:
try:
cfv = CustomFieldValue.objects.select_related('field').get(field=self.fields[field_name].model,
obj_type=self.obj_type,
obj_id=self.instance.pk)
cfv = CustomFieldValue.objects.prefetch_related('field').get(
field=self.fields[field_name].model,
obj_type=self.obj_type,
obj_id=self.instance.pk
)
except CustomFieldValue.DoesNotExist:
# Skip this field if none exists already and its value is empty
if self.cleaned_data[field_name] in [None, '']:
@@ -215,6 +220,21 @@ class TagFilterForm(BootstrapMixin, forms.Form):
)
class TagBulkEditForm(BootstrapMixin, BulkEditForm):
pk = forms.ModelMultipleChoiceField(
queryset=Tag.objects.all(),
widget=forms.MultipleHiddenInput
)
color = forms.CharField(
max_length=6,
required=False,
widget=ColorSelect()
)
class Meta:
nullable_fields = []
#
# Config contexts
#
@@ -380,3 +400,34 @@ class ObjectChangeFilterForm(BootstrapMixin, forms.Form):
widget=ContentTypeSelect(),
label='Object Type'
)
#
# Scripts
#
class ScriptForm(BootstrapMixin, forms.Form):
_commit = forms.BooleanField(
required=False,
initial=True,
label="Commit changes",
help_text="Commit changes to the database (uncheck for a dry-run)"
)
def __init__(self, vars, *args, **kwargs):
super().__init__(*args, **kwargs)
# Dynamically populate fields for variables
for name, var in vars.items():
self.fields[name] = var.as_field()
# Move _commit to the end of the form
self.fields.move_to_end('_commit', True)
@property
def requires_input(self):
"""
A boolean indicating whether the form requires user input (ignore the _commit field).
"""
return bool(len(self.fields) > 1)

View File

@@ -5,6 +5,7 @@ import sys
from django import get_version
from django.apps import apps
from django.conf import settings
from django.contrib.auth.models import User
from django.core.management.base import BaseCommand
APPS = ['circuits', 'dcim', 'extras', 'ipam', 'secrets', 'tenancy', 'users', 'virtualization']
@@ -50,6 +51,9 @@ class Command(BaseCommand):
except KeyError:
pass
# Additional objects to include
namespace['User'] = User
# Load convenience commands
namespace.update({
'lsmodels': self._lsmodels,

View File

@@ -6,39 +6,46 @@ from datetime import timedelta
from django.conf import settings
from django.db.models.signals import post_delete, post_save
from django.utils import timezone
from django.utils.functional import curry
from django_prometheus.models import model_deletes, model_inserts, model_updates
from extras.webhooks import enqueue_webhooks
from .constants import (
OBJECTCHANGE_ACTION_CREATE, OBJECTCHANGE_ACTION_DELETE, OBJECTCHANGE_ACTION_UPDATE,
)
from .models import ObjectChange
from .signals import purge_changelog
from .webhooks import enqueue_webhooks
_thread_locals = threading.local()
def cache_changed_object(instance, **kwargs):
action = OBJECTCHANGE_ACTION_CREATE if kwargs['created'] else OBJECTCHANGE_ACTION_UPDATE
# Cache the object for further processing was the response has completed.
_thread_locals.changed_objects.append(
(instance, action)
)
def cache_changed_object(sender, instance, **kwargs):
"""
Cache an object being created or updated for the changelog.
"""
if hasattr(instance, 'to_objectchange'):
action = OBJECTCHANGE_ACTION_CREATE if kwargs['created'] else OBJECTCHANGE_ACTION_UPDATE
objectchange = instance.to_objectchange(action)
_thread_locals.changed_objects.append(
(instance, objectchange)
)
def _record_object_deleted(request, instance, **kwargs):
def cache_deleted_object(sender, instance, **kwargs):
"""
Cache an object being deleted for the changelog.
"""
if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(OBJECTCHANGE_ACTION_DELETE)
_thread_locals.changed_objects.append(
(instance, objectchange)
)
# Record that the object was deleted
if hasattr(instance, 'log_change'):
instance.log_change(request.user, request.id, OBJECTCHANGE_ACTION_DELETE)
# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, OBJECTCHANGE_ACTION_DELETE)
# Increment metric counters
model_deletes.labels(instance._meta.model_name).inc()
def purge_objectchange_cache(sender, **kwargs):
"""
Delete any queued object changes waiting to be written.
"""
_thread_locals.changed_objects = []
class ObjectChangeMiddleware(object):
@@ -67,34 +74,42 @@ class ObjectChangeMiddleware(object):
# the same request.
request.id = uuid.uuid4()
# Signals don't include the request context, so we're currying it into the post_delete function ahead of time.
record_object_deleted = curry(_record_object_deleted, request)
# Connect our receivers to the post_save and post_delete signals.
post_save.connect(cache_changed_object, dispatch_uid='record_object_saved')
post_delete.connect(record_object_deleted, dispatch_uid='record_object_deleted')
post_save.connect(cache_changed_object, dispatch_uid='cache_changed_object')
post_delete.connect(cache_deleted_object, dispatch_uid='cache_deleted_object')
# Provide a hook for purging the change cache
purge_changelog.connect(purge_objectchange_cache)
# Process the request
response = self.get_response(request)
# If the change cache is empty, there's nothing more we need to do.
if not _thread_locals.changed_objects:
return response
# Create records for any cached objects that were created/updated.
for obj, action in _thread_locals.changed_objects:
for obj, objectchange in _thread_locals.changed_objects:
# Record the change
if hasattr(obj, 'log_change'):
obj.log_change(request.user, request.id, action)
objectchange.user = request.user
objectchange.request_id = request.id
objectchange.save()
# Enqueue webhooks
enqueue_webhooks(obj, request.user, request.id, action)
enqueue_webhooks(obj, request.user, request.id, objectchange.action)
# Increment metric counters
if action == OBJECTCHANGE_ACTION_CREATE:
if objectchange.action == OBJECTCHANGE_ACTION_CREATE:
model_inserts.labels(obj._meta.model_name).inc()
elif action == OBJECTCHANGE_ACTION_UPDATE:
elif objectchange.action == OBJECTCHANGE_ACTION_UPDATE:
model_updates.labels(obj._meta.model_name).inc()
elif objectchange.action == OBJECTCHANGE_ACTION_DELETE:
model_deletes.labels(obj._meta.model_name).inc()
# Housekeeping: 1% chance of clearing out expired ObjectChanges
if _thread_locals.changed_objects and settings.CHANGELOG_RETENTION and random.randint(1, 100) == 1:
# Housekeeping: 1% chance of clearing out expired ObjectChanges. This applies only to requests which result in
# one or more changes being logged.
if settings.CHANGELOG_RETENTION and random.randint(1, 100) == 1:
cutoff = timezone.now() - timedelta(days=settings.CHANGELOG_RETENTION)
purged_count, _ = ObjectChange.objects.filter(
time__lt=cutoff

View File

@@ -0,0 +1,23 @@
# Generated by Django 2.2 on 2019-08-12 15:28
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('extras', '0023_fix_tag_sequences'),
]
operations = [
migrations.CreateModel(
name='Script',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False)),
],
options={
'permissions': (('run_script', 'Can run script'),),
'managed': False,
},
),
]

View File

@@ -0,0 +1,18 @@
# Generated by Django 2.2 on 2019-08-28 14:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('extras', '0024_scripts'),
]
operations = [
migrations.AlterField(
model_name='objectchange',
name='time',
field=models.DateTimeField(auto_now_add=True, db_index=True),
),
]

View File

@@ -675,6 +675,21 @@ class ConfigContextModel(models.Model):
return data
#
# Custom scripts
#
class Script(models.Model):
"""
Dummy model used to generate permissions for custom scripts. Does not exist in the database.
"""
class Meta:
managed = False
permissions = (
('run_script', 'Can run script'),
)
#
# Report results
#
@@ -716,7 +731,8 @@ class ObjectChange(models.Model):
"""
time = models.DateTimeField(
auto_now_add=True,
editable=False
editable=False,
db_index=True
)
user = models.ForeignKey(
to=User,
@@ -787,8 +803,10 @@ class ObjectChange(models.Model):
def save(self, *args, **kwargs):
# Record the user's name and the object's representation as static strings
self.user_name = self.user.username
self.object_repr = str(self.changed_object)
if not self.user_name:
self.user_name = self.user.username
if not self.object_repr:
self.object_repr = str(self.changed_object)
return super().save(*args, **kwargs)

343
netbox/extras/scripts.py Normal file
View File

@@ -0,0 +1,343 @@
from collections import OrderedDict
import inspect
import json
import os
import pkgutil
import time
import traceback
import yaml
from django import forms
from django.conf import settings
from django.core.validators import RegexValidator
from django.db import transaction
from mptt.forms import TreeNodeChoiceField
from mptt.models import MPTTModel
from ipam.formfields import IPFormField
from utilities.exceptions import AbortTransaction
from .constants import LOG_DEFAULT, LOG_FAILURE, LOG_INFO, LOG_SUCCESS, LOG_WARNING
from .forms import ScriptForm
from .signals import purge_changelog
__all__ = [
'BaseScript',
'BooleanVar',
'FileVar',
'IntegerVar',
'IPNetworkVar',
'ObjectVar',
'Script',
'StringVar',
'TextVar',
]
#
# Script variables
#
class ScriptVariable:
"""
Base model for script variables
"""
form_field = forms.CharField
def __init__(self, label='', description='', default=None, required=True):
# Default field attributes
self.field_attrs = {
'help_text': description,
'required': required
}
if label:
self.field_attrs['label'] = label
if default:
self.field_attrs['initial'] = default
def as_field(self):
"""
Render the variable as a Django form field.
"""
form_field = self.form_field(**self.field_attrs)
form_field.widget.attrs['class'] = 'form-control'
return form_field
class StringVar(ScriptVariable):
"""
Character string representation. Can enforce minimum/maximum length and/or regex validation.
"""
def __init__(self, min_length=None, max_length=None, regex=None, *args, **kwargs):
super().__init__(*args, **kwargs)
# Optional minimum/maximum lengths
if min_length:
self.field_attrs['min_length'] = min_length
if max_length:
self.field_attrs['max_length'] = max_length
# Optional regular expression validation
if regex:
self.field_attrs['validators'] = [
RegexValidator(
regex=regex,
message='Invalid value. Must match regex: {}'.format(regex),
code='invalid'
)
]
class TextVar(ScriptVariable):
"""
Free-form text data. Renders as a <textarea>.
"""
form_field = forms.CharField
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.field_attrs['widget'] = forms.Textarea
class IntegerVar(ScriptVariable):
"""
Integer representation. Can enforce minimum/maximum values.
"""
form_field = forms.IntegerField
def __init__(self, min_value=None, max_value=None, *args, **kwargs):
super().__init__(*args, **kwargs)
# Optional minimum/maximum values
if min_value:
self.field_attrs['min_value'] = min_value
if max_value:
self.field_attrs['max_value'] = max_value
class BooleanVar(ScriptVariable):
"""
Boolean representation (true/false). Renders as a checkbox.
"""
form_field = forms.BooleanField
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Boolean fields cannot be required
self.field_attrs['required'] = False
class ObjectVar(ScriptVariable):
"""
NetBox object representation. The provided QuerySet will determine the choices available.
"""
form_field = forms.ModelChoiceField
def __init__(self, queryset, *args, **kwargs):
super().__init__(*args, **kwargs)
# Queryset for field choices
self.field_attrs['queryset'] = queryset
# Update form field for MPTT (nested) objects
if issubclass(queryset.model, MPTTModel):
self.form_field = TreeNodeChoiceField
class FileVar(ScriptVariable):
"""
An uploaded file.
"""
form_field = forms.FileField
class IPNetworkVar(ScriptVariable):
"""
An IPv4 or IPv6 prefix.
"""
form_field = IPFormField
#
# Scripts
#
class BaseScript:
"""
Base model for custom scripts. User classes should inherit from this model if they want to extend Script
functionality for use in other subclasses.
"""
class Meta:
pass
def __init__(self):
# Initiate the log
self.log = []
# Grab some info about the script
self.filename = inspect.getfile(self.__class__)
self.source = inspect.getsource(self.__class__)
def __str__(self):
return getattr(self.Meta, 'name', self.__class__.__name__)
def _get_vars(self):
vars = OrderedDict()
# Infer order from Meta.field_order (Python 3.5 and lower)
field_order = getattr(self.Meta, 'field_order', [])
for name in field_order:
vars[name] = getattr(self, name)
# Default to order of declaration on class
for name, attr in self.__class__.__dict__.items():
if name not in vars and issubclass(attr.__class__, ScriptVariable):
vars[name] = attr
return vars
def run(self, data):
raise NotImplementedError("The script must define a run() method.")
def as_form(self, data=None, files=None):
"""
Return a Django form suitable for populating the context data required to run this Script.
"""
vars = self._get_vars()
form = ScriptForm(vars, data, files)
return form
# Logging
def log_debug(self, message):
self.log.append((LOG_DEFAULT, message))
def log_success(self, message):
self.log.append((LOG_SUCCESS, message))
def log_info(self, message):
self.log.append((LOG_INFO, message))
def log_warning(self, message):
self.log.append((LOG_WARNING, message))
def log_failure(self, message):
self.log.append((LOG_FAILURE, message))
# Convenience functions
def load_yaml(self, filename):
"""
Return data from a YAML file
"""
file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
with open(file_path, 'r') as datafile:
data = yaml.load(datafile)
return data
def load_json(self, filename):
"""
Return data from a JSON file
"""
file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
with open(file_path, 'r') as datafile:
data = json.load(datafile)
return data
class Script(BaseScript):
"""
Classes which inherit this model will appear in the list of available scripts.
"""
pass
#
# Functions
#
def is_script(obj):
"""
Returns True if the object is a Script.
"""
try:
return issubclass(obj, Script) and obj != Script
except TypeError:
return False
def is_variable(obj):
"""
Returns True if the object is a ScriptVariable.
"""
return isinstance(obj, ScriptVariable)
def run_script(script, data, files, commit=True):
"""
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside of the Script class to ensure it cannot be overridden by a script author.
"""
output = None
start_time = None
end_time = None
# Add files to form data
for field_name, fileobj in files.items():
data[field_name] = fileobj
try:
with transaction.atomic():
start_time = time.time()
output = script.run(data)
end_time = time.time()
if not commit:
raise AbortTransaction()
except AbortTransaction:
pass
except Exception as e:
stacktrace = traceback.format_exc()
script.log_failure(
"An exception occurred: `{}: {}`\n```\n{}\n```".format(type(e).__name__, e, stacktrace)
)
commit = False
finally:
if not commit:
# Delete all pending changelog entries
purge_changelog.send(Script)
script.log_info(
"Database changes have been reverted automatically."
)
# Calculate execution time
if end_time is not None:
execution_time = end_time - start_time
else:
execution_time = None
return output, execution_time
def get_scripts():
scripts = OrderedDict()
# Iterate through all modules within the reports path. These are the user-created files in which reports are
# defined.
for importer, module_name, _ in pkgutil.iter_modules([settings.SCRIPTS_ROOT]):
module = importer.find_module(module_name).load_module(module_name)
if hasattr(module, 'name'):
module_name = module.name
module_scripts = OrderedDict()
for name, cls in inspect.getmembers(module, is_script):
module_scripts[name] = cls
scripts[module_name] = module_scripts
return scripts

View File

@@ -1,7 +1,12 @@
from cacheops.signals import cache_invalidated, cache_read
from django.dispatch import Signal
from prometheus_client import Counter
#
# Caching
#
cacheops_cache_hit = Counter('cacheops_cache_hit', 'Number of cache hits')
cacheops_cache_miss = Counter('cacheops_cache_miss', 'Number of cache misses')
cacheops_cache_invalidated = Counter('cacheops_cache_invalidated', 'Number of cache invalidations')
@@ -20,3 +25,10 @@ def cache_invalidated_collector(sender, obj_dict, **kwargs):
cache_read.connect(cache_read_collector)
cache_invalidated.connect(cache_invalidated_collector)
#
# Change logging
#
purge_changelog = Signal()

View File

@@ -0,0 +1,37 @@
from django import template
from extras.constants import LOG_DEFAULT, LOG_FAILURE, LOG_INFO, LOG_SUCCESS, LOG_WARNING
register = template.Library()
@register.inclusion_tag('extras/templatetags/log_level.html')
def log_level(level):
"""
Display a label indicating a syslog severity (e.g. info, warning, etc.).
"""
levels = {
LOG_DEFAULT: {
'name': 'Default',
'class': 'default'
},
LOG_SUCCESS: {
'name': 'Success',
'class': 'success',
},
LOG_INFO: {
'name': 'Info',
'class': 'info'
},
LOG_WARNING: {
'name': 'Warning',
'class': 'warning'
},
LOG_FAILURE: {
'name': 'Failure',
'class': 'danger'
}
}
return levels[level]

View File

@@ -0,0 +1,72 @@
from django.urls import reverse
from rest_framework import status
from dcim.models import Site
from extras.constants import OBJECTCHANGE_ACTION_CREATE, OBJECTCHANGE_ACTION_UPDATE, OBJECTCHANGE_ACTION_DELETE
from extras.models import ObjectChange
from utilities.testing import APITestCase
class ChangeLogTest(APITestCase):
def test_create_object(self):
data = {
'name': 'Test Site 1',
'slug': 'test-site-1',
}
self.assertEqual(ObjectChange.objects.count(), 0)
url = reverse('dcim-api:site-list')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(ObjectChange.objects.count(), 1)
oc = ObjectChange.objects.first()
site = Site.objects.get(pk=response.data['id'])
self.assertEqual(oc.changed_object, site)
self.assertEqual(oc.action, OBJECTCHANGE_ACTION_CREATE)
def test_update_object(self):
site = Site(name='Test Site 1', slug='test-site-1')
site.save()
data = {
'name': 'Test Site X',
'slug': 'test-site-x',
}
self.assertEqual(ObjectChange.objects.count(), 0)
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
response = self.client.put(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(ObjectChange.objects.count(), 1)
site = Site.objects.get(pk=response.data['id'])
self.assertEqual(site.name, data['name'])
oc = ObjectChange.objects.first()
self.assertEqual(oc.changed_object, site)
self.assertEqual(oc.action, OBJECTCHANGE_ACTION_UPDATE)
def test_delete_object(self):
site = Site(name='Test Site 1', slug='test-site-1')
site.save()
self.assertEqual(ObjectChange.objects.count(), 0)
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertEqual(Site.objects.count(), 0)
oc = ObjectChange.objects.first()
self.assertEqual(oc.changed_object, None)
self.assertEqual(oc.object_repr, site.name)
self.assertEqual(oc.action, OBJECTCHANGE_ACTION_DELETE)

View File

@@ -0,0 +1,157 @@
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from netaddr import IPNetwork
from dcim.models import DeviceRole
from extras.scripts import *
class ScriptVariablesTest(TestCase):
def test_stringvar(self):
class TestScript(Script):
var1 = StringVar(
min_length=3,
max_length=3,
regex=r'[a-z]+'
)
# Validate min_length enforcement
data = {'var1': 'xx'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate max_length enforcement
data = {'var1': 'xxxx'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate regex enforcement
data = {'var1': 'ABC'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate valid data
data = {'var1': 'abc'}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], data['var1'])
def test_textvar(self):
class TestScript(Script):
var1 = TextVar()
# Validate valid data
data = {'var1': 'This is a test string'}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], data['var1'])
def test_integervar(self):
class TestScript(Script):
var1 = IntegerVar(
min_value=5,
max_value=10
)
# Validate min_value enforcement
data = {'var1': 4}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate max_value enforcement
data = {'var1': 11}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate valid data
data = {'var1': 7}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], data['var1'])
def test_booleanvar(self):
class TestScript(Script):
var1 = BooleanVar()
# Validate True
data = {'var1': True}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], True)
# Validate False
data = {'var1': False}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], False)
def test_objectvar(self):
class TestScript(Script):
var1 = ObjectVar(
queryset=DeviceRole.objects.all()
)
# Populate some objects
for i in range(1, 6):
DeviceRole(
name='Device Role {}'.format(i),
slug='device-role-{}'.format(i)
).save()
# Validate valid data
data = {'var1': DeviceRole.objects.first().pk}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'].pk, data['var1'])
def test_filevar(self):
class TestScript(Script):
var1 = FileVar()
# Dummy file
testfile = SimpleUploadedFile(
name='test_file.txt',
content=b'This is a dummy file for testing'
)
# Validate valid data
file_data = {'var1': testfile}
form = TestScript().as_form(None, file_data)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], testfile)
def test_ipnetworkvar(self):
class TestScript(Script):
var1 = IPNetworkVar()
# Validate IP network enforcement
data = {'var1': '1.2.3'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate valid data
data = {'var1': '192.0.2.0/24'}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], IPNetwork(data['var1']))

View File

@@ -35,9 +35,9 @@ class TaggedItemTest(APITestCase):
site = Site.objects.create(
name='Test Site',
slug='test-site',
tags=['Foo', 'Bar', 'Baz']
slug='test-site'
)
site.tags.add('Foo', 'Bar', 'Baz')
data = {
'tags': ['Foo', 'Bar', 'New Tag']

View File

@@ -6,6 +6,7 @@ from django.test import Client, TestCase
from django.urls import reverse
from dcim.models import Site
from extras.constants import OBJECTCHANGE_ACTION_UPDATE
from extras.models import ConfigContext, ObjectChange, Tag
from utilities.testing import create_test_user
@@ -82,11 +83,10 @@ class ObjectChangeTestCase(TestCase):
# Create three ObjectChanges
for i in range(1, 4):
site.log_change(
user=user,
request_id=uuid.uuid4(),
action=2
)
oc = site.to_objectchange(action=OBJECTCHANGE_ACTION_UPDATE)
oc.user = user
oc.request_id = uuid.uuid4()
oc.save()
def test_objectchange_list(self):

View File

@@ -9,6 +9,7 @@ urlpatterns = [
# Tags
path(r'tags/', views.TagListView.as_view(), name='tag_list'),
path(r'tags/edit/', views.TagBulkEditView.as_view(), name='tag_bulk_edit'),
path(r'tags/delete/', views.TagBulkDeleteView.as_view(), name='tag_bulk_delete'),
path(r'tags/<slug:slug>/', views.TagView.as_view(), name='tag'),
path(r'tags/<slug:slug>/edit/', views.TagEditView.as_view(), name='tag_edit'),
@@ -28,13 +29,17 @@ urlpatterns = [
path(r'image-attachments/<int:pk>/edit/', views.ImageAttachmentEditView.as_view(), name='imageattachment_edit'),
path(r'image-attachments/<int:pk>/delete/', views.ImageAttachmentDeleteView.as_view(), name='imageattachment_delete'),
# Change logging
path(r'changelog/', views.ObjectChangeListView.as_view(), name='objectchange_list'),
path(r'changelog/<int:pk>/', views.ObjectChangeView.as_view(), name='objectchange'),
# Reports
path(r'reports/', views.ReportListView.as_view(), name='report_list'),
path(r'reports/<str:name>/', views.ReportView.as_view(), name='report'),
path(r'reports/<str:name>/run/', views.ReportRunView.as_view(), name='report_run'),
# Change logging
path(r'changelog/', views.ObjectChangeListView.as_view(), name='objectchange_list'),
path(r'changelog/<int:pk>/', views.ObjectChangeView.as_view(), name='objectchange'),
# Scripts
path(r'scripts/', views.ScriptListView.as_view(), name='script_list'),
path(r'scripts/<str:module>/<str:name>/', views.ScriptView.as_view(), name='script'),
]

View File

@@ -4,7 +4,7 @@ from django.contrib import messages
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count, Q
from django.http import Http404
from django.http import Http404, HttpResponseForbidden
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.safestring import mark_safe
from django.views.generic import View
@@ -13,13 +13,10 @@ from django_tables2 import RequestConfig
from utilities.forms import ConfirmationForm
from utilities.paginator import EnhancedPaginator
from utilities.views import BulkDeleteView, BulkEditView, ObjectDeleteView, ObjectEditView, ObjectListView
from . import filters
from .forms import (
ConfigContextForm, ConfigContextBulkEditForm, ConfigContextFilterForm, ImageAttachmentForm, ObjectChangeFilterForm,
TagFilterForm, TagForm,
)
from . import filters, forms
from .models import ConfigContext, ImageAttachment, ObjectChange, ReportResult, Tag, TaggedItem
from .reports import get_report, get_reports
from .scripts import get_scripts, run_script
from .tables import ConfigContextTable, ObjectChangeTable, TagTable, TaggedItemTable
@@ -35,7 +32,7 @@ class TagListView(PermissionRequiredMixin, ObjectListView):
'name'
)
filter = filters.TagFilter
filter_form = TagFilterForm
filter_form = forms.TagFilterForm
table = TagTable
template_name = 'extras/tag_list.html'
@@ -47,10 +44,8 @@ class TagView(View):
tag = get_object_or_404(Tag, slug=slug)
tagged_items = TaggedItem.objects.filter(
tag=tag
).select_related(
'content_type'
).prefetch_related(
'content_object'
'content_type', 'content_object'
)
# Generate a table of all items tagged with this Tag
@@ -71,7 +66,7 @@ class TagView(View):
class TagEditView(PermissionRequiredMixin, ObjectEditView):
permission_required = 'extras.change_tag'
model = Tag
model_form = TagForm
model_form = forms.TagForm
default_return_url = 'extras:tag_list'
template_name = 'extras/tag_edit.html'
@@ -82,6 +77,19 @@ class TagDeleteView(PermissionRequiredMixin, ObjectDeleteView):
default_return_url = 'extras:tag_list'
class TagBulkEditView(PermissionRequiredMixin, BulkEditView):
permission_required = 'extras.change_tag'
queryset = Tag.objects.annotate(
items=Count('extras_taggeditem_items', distinct=True)
).order_by(
'name'
)
# filter = filters.ProviderFilter
table = TagTable
form = forms.TagBulkEditForm
default_return_url = 'circuits:provider_list'
class TagBulkDeleteView(PermissionRequiredMixin, BulkDeleteView):
permission_required = 'extras.delete_tag'
queryset = Tag.objects.annotate(
@@ -101,7 +109,7 @@ class ConfigContextListView(PermissionRequiredMixin, ObjectListView):
permission_required = 'extras.view_configcontext'
queryset = ConfigContext.objects.all()
filter = filters.ConfigContextFilter
filter_form = ConfigContextFilterForm
filter_form = forms.ConfigContextFilterForm
table = ConfigContextTable
template_name = 'extras/configcontext_list.html'
@@ -121,7 +129,7 @@ class ConfigContextView(PermissionRequiredMixin, View):
class ConfigContextCreateView(PermissionRequiredMixin, ObjectEditView):
permission_required = 'extras.add_configcontext'
model = ConfigContext
model_form = ConfigContextForm
model_form = forms.ConfigContextForm
default_return_url = 'extras:configcontext_list'
template_name = 'extras/configcontext_edit.html'
@@ -135,7 +143,7 @@ class ConfigContextBulkEditView(PermissionRequiredMixin, BulkEditView):
queryset = ConfigContext.objects.all()
filter = filters.ConfigContextFilter
table = ConfigContextTable
form = ConfigContextBulkEditForm
form = forms.ConfigContextBulkEditForm
default_return_url = 'extras:configcontext_list'
@@ -178,9 +186,9 @@ class ObjectConfigContextView(View):
class ObjectChangeListView(PermissionRequiredMixin, ObjectListView):
permission_required = 'extras.view_objectchange'
queryset = ObjectChange.objects.select_related('user', 'changed_object_type')
queryset = ObjectChange.objects.prefetch_related('user', 'changed_object_type')
filter = filters.ObjectChangeFilter
filter_form = ObjectChangeFilterForm
filter_form = forms.ObjectChangeFilterForm
table = ObjectChangeTable
template_name = 'extras/objectchange_list.html'
@@ -217,7 +225,7 @@ class ObjectChangeLogView(View):
# Gather all changes for this object (and its related objects)
content_type = ContentType.objects.get_for_model(model)
objectchanges = ObjectChange.objects.select_related(
objectchanges = ObjectChange.objects.prefetch_related(
'user', 'changed_object_type'
).filter(
Q(changed_object_type=content_type, changed_object_id=obj.pk) |
@@ -259,7 +267,7 @@ class ObjectChangeLogView(View):
class ImageAttachmentEditView(PermissionRequiredMixin, ObjectEditView):
permission_required = 'extras.change_imageattachment'
model = ImageAttachment
model_form = ImageAttachmentForm
model_form = forms.ImageAttachmentForm
def alter_obj(self, imageattachment, request, args, kwargs):
if not imageattachment.pk:
@@ -355,3 +363,62 @@ class ReportRunView(PermissionRequiredMixin, View):
messages.success(request, mark_safe(msg))
return redirect('extras:report', name=report.full_name)
#
# Scripts
#
class ScriptListView(PermissionRequiredMixin, View):
permission_required = 'extras.view_script'
def get(self, request):
return render(request, 'extras/script_list.html', {
'scripts': get_scripts(),
})
class ScriptView(PermissionRequiredMixin, View):
permission_required = 'extras.view_script'
def _get_script(self, module, name):
scripts = get_scripts()
try:
return scripts[module][name]()
except KeyError:
raise Http404
def get(self, request, module, name):
script = self._get_script(module, name)
form = script.as_form()
return render(request, 'extras/script.html', {
'module': module,
'script': script,
'form': form,
})
def post(self, request, module, name):
# Permissions check
if not request.user.has_perm('extras.run_script'):
return HttpResponseForbidden()
script = self._get_script(module, name)
form = script.as_form(request.POST, request.FILES)
output = None
execution_time = None
if form.is_valid():
commit = form.cleaned_data.pop('_commit')
output, execution_time = run_script(script, form.cleaned_data, request.FILES, commit)
return render(request, 'extras/script.html', {
'module': module,
'script': script,
'form': form,
'output': output,
'execution_time': execution_time,
})