Closes #14438: Database representation of scripts

- Introduces the Script model to represent individual Python classes within a ScriptModule file
- Automatically migrates jobs & event rules

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
This commit is contained in:
Arthur Hanson 2024-02-23 05:27:37 -08:00 committed by GitHub
parent 7e7e5d5eb0
commit ca2ee436a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 569 additions and 336 deletions

View File

@ -44,9 +44,6 @@ __all__ = (
'ImageAttachmentSerializer',
'JournalEntrySerializer',
'ObjectChangeSerializer',
'ReportDetailSerializer',
'ReportSerializer',
'ReportInputSerializer',
'SavedFilterSerializer',
'ScriptDetailSerializer',
'ScriptInputSerializer',
@ -85,9 +82,9 @@ class EventRuleSerializer(NetBoxModelSerializer):
context = {'request': self.context['request']}
# We need to manually instantiate the serializer for scripts
if instance.action_type == EventRuleActionChoices.SCRIPT:
script_name = instance.action_parameters['script_name']
script = instance.action_object.scripts[script_name]()
return NestedScriptSerializer(script, context=context).data
script = instance.action_object
instance = script.python_class() if script.python_class else None
return NestedScriptSerializer(instance, context=context).data
else:
serializer = get_serializer_for_model(
model=instance.action_object_type.model_class(),
@ -512,79 +509,54 @@ class ConfigTemplateSerializer(TaggableModelSerializer, ValidatedModelSerializer
]
#
# Reports
#
class ReportSerializer(serializers.Serializer):
url = serializers.HyperlinkedIdentityField(
view_name='extras-api:report-detail',
lookup_field='full_name',
lookup_url_kwarg='pk'
)
id = serializers.CharField(read_only=True, source="full_name")
module = serializers.CharField(max_length=255)
name = serializers.CharField(max_length=255)
description = serializers.CharField(max_length=255, required=False)
test_methods = serializers.ListField(child=serializers.CharField(max_length=255), read_only=True)
result = NestedJobSerializer()
display = serializers.SerializerMethodField(read_only=True)
@extend_schema_field(serializers.CharField())
def get_display(self, obj):
return f'{obj.name} ({obj.module})'
class ReportDetailSerializer(ReportSerializer):
result = JobSerializer()
class ReportInputSerializer(serializers.Serializer):
schedule_at = serializers.DateTimeField(required=False, allow_null=True)
interval = serializers.IntegerField(required=False, allow_null=True)
def validate_schedule_at(self, value):
if value and not self.context['report'].scheduling_enabled:
raise serializers.ValidationError(_("Scheduling is not enabled for this report."))
return value
def validate_interval(self, value):
if value and not self.context['report'].scheduling_enabled:
raise serializers.ValidationError(_("Scheduling is not enabled for this report."))
return value
#
# Scripts
#
class ScriptSerializer(serializers.Serializer):
url = serializers.HyperlinkedIdentityField(
view_name='extras-api:script-detail',
lookup_field='full_name',
lookup_url_kwarg='pk'
)
id = serializers.CharField(read_only=True, source="full_name")
module = serializers.CharField(max_length=255)
name = serializers.CharField(read_only=True)
description = serializers.CharField(read_only=True)
class ScriptSerializer(ValidatedModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='extras-api:script-detail')
description = serializers.SerializerMethodField(read_only=True)
vars = serializers.SerializerMethodField(read_only=True)
result = NestedJobSerializer()
display = serializers.SerializerMethodField(read_only=True)
result = NestedJobSerializer(read_only=True)
class Meta:
model = Script
fields = [
'id', 'url', 'module', 'name', 'description', 'vars', 'result', 'display', 'is_executable',
]
@extend_schema_field(serializers.JSONField(allow_null=True))
def get_vars(self, instance):
return {
k: v.__class__.__name__ for k, v in instance._get_vars().items()
}
def get_vars(self, obj):
if obj.python_class:
return {
k: v.__class__.__name__ for k, v in obj.python_class()._get_vars().items()
}
else:
return {}
@extend_schema_field(serializers.CharField())
def get_display(self, obj):
return f'{obj.name} ({obj.module})'
@extend_schema_field(serializers.CharField())
def get_description(self, obj):
if obj.python_class:
return obj.python_class().description
else:
return None
class ScriptDetailSerializer(ScriptSerializer):
result = JobSerializer()
result = serializers.SerializerMethodField(read_only=True)
@extend_schema_field(JobSerializer())
def get_result(self, obj):
job = obj.jobs.all().order_by('-created').first()
context = {
'request': self.context['request']
}
data = JobSerializer(job, context=context).data
return data
class ScriptInputSerializer(serializers.Serializer):

View File

@ -1,5 +1,4 @@
from django.contrib.contenttypes.models import ContentType
from django.http import Http404
from django.shortcuts import get_object_or_404
from django_rq.queues import get_connection
from rest_framework import status
@ -9,14 +8,13 @@ from rest_framework.generics import RetrieveUpdateDestroyAPIView
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.routers import APIRootView
from rest_framework.viewsets import ReadOnlyModelViewSet, ViewSet
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
from rq import Worker
from core.choices import JobStatusChoices
from core.models import Job
from extras import filtersets
from extras.models import *
from extras.scripts import get_module_and_script, run_script
from extras.scripts import run_script
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
@ -209,66 +207,30 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
# Scripts
#
class ScriptViewSet(ViewSet):
class ScriptViewSet(ModelViewSet):
permission_classes = [IsAuthenticatedOrLoginNotRequired]
queryset = Script.objects.prefetch_related('jobs')
serializer_class = serializers.ScriptSerializer
filterset_class = filtersets.ScriptFilterSet
_ignore_model_permissions = True
schema = None
lookup_value_regex = '[^/]+' # Allow dots
def _get_script(self, pk):
try:
module_name, script_name = pk.split('.', maxsplit=1)
except ValueError:
raise Http404
module, script = get_module_and_script(module_name, script_name)
if script is None:
raise Http404
return module, script
def list(self, request):
results = {
job.name: job
for job in Job.objects.filter(
object_type=ContentType.objects.get(app_label='extras', model='scriptmodule'),
status__in=JobStatusChoices.TERMINAL_STATE_CHOICES
).order_by('name', '-created').distinct('name').defer('data')
}
script_list = []
for script_module in ScriptModule.objects.restrict(request.user):
script_list.extend(script_module.scripts.values())
# Attach Job objects to each script (if any)
for script in script_list:
script.result = results.get(script.class_name, None)
serializer = serializers.ScriptSerializer(script_list, many=True, context={'request': request})
return Response({'count': len(script_list), 'results': serializer.data})
def retrieve(self, request, pk):
module, script = self._get_script(pk)
object_type = ContentType.objects.get(app_label='extras', model='scriptmodule')
script.result = Job.objects.filter(
object_type=object_type,
name=script.class_name,
status__in=JobStatusChoices.TERMINAL_STATE_CHOICES
).first()
script = get_object_or_404(self.queryset, pk=pk)
serializer = serializers.ScriptDetailSerializer(script, context={'request': request})
return Response(serializer.data)
def post(self, request, pk):
"""
Run a Script identified as "<module>.<script>" and return the pending Job as the result
Run a Script identified by the id and return the pending Job as the result
"""
if not request.user.has_perm('extras.run_script'):
raise PermissionDenied("This user does not have permission to run scripts.")
module, script = self._get_script(pk)
script = get_object_or_404(self.queryset, pk=pk)
input_serializer = serializers.ScriptInputSerializer(
data=request.data,
context={'script': script}
@ -281,13 +243,13 @@ class ScriptViewSet(ViewSet):
if input_serializer.is_valid():
script.result = Job.enqueue(
run_script,
instance=module,
name=script.class_name,
instance=script.module,
name=script.python_class.class_name,
user=request.user,
data=input_serializer.data['data'],
request=copy_safe_request(request),
commit=input_serializer.data['commit'],
job_timeout=script.job_timeout,
job_timeout=script.python_class.job_timeout,
schedule_at=input_serializer.validated_data.get('schedule_at'),
interval=input_serializer.validated_data.get('interval')
)

View File

@ -116,15 +116,13 @@ def process_event_rules(event_rules, model_name, event, data, username=None, sna
# Scripts
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
# Resolve the script from action parameters
script_module = event_rule.action_object
script_name = event_rule.action_parameters['script_name']
script = script_module.scripts[script_name]()
script = event_rule.action_object.python_class()
# Enqueue a Job to record the script's execution
Job.enqueue(
"extras.scripts.run_script",
instance=script_module,
name=script.class_name,
instance=script.module,
name=script.name,
user=user,
data=data
)

View File

@ -29,11 +29,32 @@ __all__ = (
'LocalConfigContextFilterSet',
'ObjectChangeFilterSet',
'SavedFilterFilterSet',
'ScriptFilterSet',
'TagFilterSet',
'WebhookFilterSet',
)
class ScriptFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)
class Meta:
model = Script
fields = [
'id', 'name',
]
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(name__icontains=value)
)
class WebhookFilterSet(NetBoxModelFilterSet):
q = django_filters.CharFilter(
method='search',

View File

@ -212,11 +212,8 @@ class EventRuleImportForm(NetBoxModelImportForm):
module, script = get_module_and_script(module_name, script_name)
except ObjectDoesNotExist:
raise forms.ValidationError(_("Script {name} not found").format(name=action_object))
self.instance.action_object = module
self.instance.action_object_type = ContentType.objects.get_for_model(module, for_concrete_model=False)
self.instance.action_parameters = {
'script_name': script_name,
}
self.instance.action_object = script
self.instance.action_object_type = ContentType.objects.get_for_model(script, for_concrete_model=False)
class TagImportForm(CSVModelForm):

View File

@ -297,20 +297,16 @@ class EventRuleForm(NetBoxModelForm):
}
def init_script_choice(self):
choices = []
for module in ScriptModule.objects.all():
scripts = []
for script_name in module.scripts.keys():
name = f"{str(module.pk)}:{script_name}"
scripts.append((name, script_name))
if scripts:
choices.append((str(module), scripts))
self.fields['action_choice'].choices = choices
if self.instance.action_type == EventRuleActionChoices.SCRIPT and self.instance.action_parameters:
scriptmodule_id = self.instance.action_object_id
script_name = self.instance.action_parameters.get('script_name')
self.fields['action_choice'].initial = f'{scriptmodule_id}:{script_name}'
initial = None
if self.instance.action_type == EventRuleActionChoices.SCRIPT:
script_id = get_field_value(self, 'action_object_id')
initial = Script.objects.get(pk=script_id) if script_id else None
self.fields['action_choice'] = DynamicModelChoiceField(
label=_('Script'),
queryset=Script.objects.all(),
required=True,
initial=initial
)
def init_webhook_choice(self):
initial = None
@ -348,26 +344,13 @@ class EventRuleForm(NetBoxModelForm):
# Script
elif self.cleaned_data.get('action_type') == EventRuleActionChoices.SCRIPT:
self.cleaned_data['action_object_type'] = ContentType.objects.get_for_model(
ScriptModule,
Script,
for_concrete_model=False
)
module_id, script_name = action_choice.split(":", maxsplit=1)
self.cleaned_data['action_object_id'] = module_id
self.cleaned_data['action_object_id'] = action_choice.id
return self.cleaned_data
def save(self, *args, **kwargs):
# Set action_parameters on the instance
if self.cleaned_data['action_type'] == EventRuleActionChoices.SCRIPT:
module_id, script_name = self.cleaned_data.get('action_choice').split(":", maxsplit=1)
self.instance.action_parameters = {
'script_name': script_name,
}
else:
self.instance.action_parameters = None
return super().save(*args, **kwargs)
class TagForm(forms.ModelForm):
slug = SlugField()

View File

@ -0,0 +1,159 @@
import inspect
import os
from importlib.machinery import SourceFileLoader
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
#
# Note: This has a couple dependencies on the codebase if doing future modifications:
# There are imports from extras.scripts and extras.reports as well as expecting
# settings.SCRIPTS_ROOT and settings.REPORTS_ROOT to be in settings
#
ROOT_PATHS = {
'scripts': settings.SCRIPTS_ROOT,
'reports': settings.REPORTS_ROOT,
}
def get_full_path(scriptmodule):
"""
Return the full path to a ScriptModule's file on disk.
"""
root_path = ROOT_PATHS[scriptmodule.file_root]
return os.path.join(root_path, scriptmodule.file_path)
def get_python_name(scriptmodule):
"""
Return the Python name of a ScriptModule's file on disk.
"""
path, filename = os.path.split(scriptmodule.file_path)
return os.path.splitext(filename)[0]
def is_script(obj):
"""
Returns True if the passed Python object is a Script or Report.
"""
from extras.scripts import Script
from extras.reports import Report
try:
if issubclass(obj, Report) and obj != Report:
return True
if issubclass(obj, Script) and obj != Script:
return True
except TypeError:
pass
return False
def get_module_scripts(scriptmodule):
"""
Return a dictionary mapping of name and script class inside the passed ScriptModule.
"""
def get_name(cls):
# For child objects in submodules use the full import path w/o the root module as the name
return cls.full_name.split(".", maxsplit=1)[1]
loader = SourceFileLoader(get_python_name(scriptmodule), get_full_path(scriptmodule))
module = loader.load_module()
scripts = {}
ordered = getattr(module, 'script_order', [])
for cls in ordered:
scripts[get_name(cls)] = cls
for name, cls in inspect.getmembers(module, is_script):
if cls not in ordered:
scripts[get_name(cls)] = cls
return scripts
def update_scripts(apps, schema_editor):
"""
Create a new Script object for each script inside each existing ScriptModule, and update any related jobs to
reference the new Script object.
"""
ContentType = apps.get_model('contenttypes', 'ContentType')
Script = apps.get_model('extras', 'Script')
ScriptModule = apps.get_model('extras', 'ScriptModule')
Job = apps.get_model('core', 'Job')
script_ct = ContentType.objects.get_for_model(Script)
scriptmodule_ct = ContentType.objects.get_for_model(ScriptModule)
for module in ScriptModule.objects.all():
for script_name in get_module_scripts(module):
script = Script.objects.create(
name=script_name,
module=module,
)
# Update all Jobs associated with this ScriptModule & script name to point to the new Script object
Job.objects.filter(
object_type=scriptmodule_ct,
object_id=module.pk,
name=script_name
).update(object_type=script_ct, object_id=script.pk)
def update_event_rules(apps, schema_editor):
"""
Update any existing EventRules for scripts. Change action_object_type from ScriptModule to Script, and populate
the ID of the related Script object.
"""
ContentType = apps.get_model('contenttypes', 'ContentType')
Script = apps.get_model('extras', 'Script')
ScriptModule = apps.get_model('extras', 'ScriptModule')
EventRule = apps.get_model('extras', 'EventRule')
script_ct = ContentType.objects.get_for_model(Script)
scriptmodule_ct = ContentType.objects.get_for_model(ScriptModule)
for eventrule in EventRule.objects.filter(action_object_type=scriptmodule_ct):
name = eventrule.action_parameters.get('script_name')
obj, created = Script.objects.get_or_create(
module_id=eventrule.action_object_id,
name=name,
defaults={'is_executable': False}
)
EventRule.objects.filter(pk=eventrule.pk).update(action_object_type=script_ct, action_object_id=obj.id)
class Migration(migrations.Migration):
dependencies = [
('extras', '0108_convert_reports_to_scripts'),
]
operations = [
migrations.CreateModel(
name='Script',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False)),
('name', models.CharField(editable=False, max_length=79)),
('module', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='scripts', to='extras.scriptmodule')),
('is_executable', models.BooleanField(editable=False, default=True))
],
options={
'ordering': ('module', 'name'),
},
),
migrations.AddConstraint(
model_name='script',
constraint=models.UniqueConstraint(fields=('name', 'module'), name='extras_script_unique_name_module'),
),
migrations.RunPython(
code=update_scripts,
reverse_code=migrations.RunPython.noop
),
migrations.RunPython(
code=update_event_rules,
reverse_code=migrations.RunPython.noop
),
]

View File

@ -0,0 +1,15 @@
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('extras', '0109_script_model'),
]
operations = [
migrations.RemoveField(
model_name='eventrule',
name='action_parameters',
),
]

View File

@ -115,10 +115,6 @@ class EventRule(CustomFieldsMixin, ExportTemplatesMixin, TagsMixin, ChangeLogged
ct_field='action_object_type',
fk_field='action_object_id'
)
action_parameters = models.JSONField(
blank=True,
null=True
)
action_data = models.JSONField(
verbose_name=_('data'),
blank=True,

View File

@ -2,8 +2,11 @@ import inspect
import logging
from functools import cached_property
from django.contrib.contenttypes.fields import GenericRelation
from django.db import models
from django.db.models import Q
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
@ -22,12 +25,63 @@ __all__ = (
logger = logging.getLogger('netbox.data_backends')
class Script(EventRulesMixin, models.Model):
"""
Dummy model used to generate permissions for custom scripts. Does not exist in the database.
"""
class Script(EventRulesMixin, JobsMixin):
name = models.CharField(
verbose_name=_('name'),
max_length=79, # Maximum length for a Python class name
editable=False,
)
module = models.ForeignKey(
to='extras.ScriptModule',
on_delete=models.CASCADE,
related_name='scripts',
editable=False
)
is_executable = models.BooleanField(
default=True,
verbose_name=_('is executable'),
editable=False
)
events = GenericRelation(
'extras.EventRule',
content_type_field='action_object_type',
object_id_field='action_object_id'
)
def __str__(self):
return self.name
objects = RestrictedQuerySet.as_manager()
class Meta:
managed = False
ordering = ('module', 'name')
constraints = (
models.UniqueConstraint(
fields=('name', 'module'),
name='extras_script_unique_name_module'
),
)
verbose_name = _('script')
verbose_name_plural = _('scripts')
def get_absolute_url(self):
return reverse('extras:script', args=[self.pk])
@property
def result(self):
return self.jobs.all().order_by('-created').first()
@cached_property
def python_class(self):
return self.module.module_scripts.get(self.name)
def delete(self, soft_delete=False, **kwargs):
if soft_delete and self.jobs.exists():
self.is_executable = False
self.save()
else:
super().delete(**kwargs)
self.id = None
class ScriptModuleManager(models.Manager.from_queryset(RestrictedQuerySet)):
@ -55,7 +109,7 @@ class ScriptModule(PythonModuleMixin, JobsMixin, ManagedFile):
return self.python_name
@cached_property
def scripts(self):
def module_scripts(self):
def _get_name(cls):
# For child objects in submodules use the full import path w/o the root module as the name
@ -78,6 +132,39 @@ class ScriptModule(PythonModuleMixin, JobsMixin, ManagedFile):
return scripts
def sync_classes(self):
"""
Syncs the file-based module to the database, adding and removing individual Script objects
in the database as needed.
"""
db_classes = {
script.name: script for script in self.scripts.all()
}
db_classes_set = set(db_classes.keys())
module_classes_set = set(self.module_scripts.keys())
# remove any existing db classes if they are no longer in the file
removed = db_classes_set - module_classes_set
for name in removed:
db_classes[name].delete(soft_delete=True)
added = module_classes_set - db_classes_set
for name in added:
Script.objects.create(
module=self,
name=name,
is_executable=True,
)
def sync_data(self):
super().sync_data()
self.sync_classes()
def save(self, *args, **kwargs):
self.file_root = ManagedFileRootPathChoices.SCRIPTS
return super().save(*args, **kwargs)
@receiver(post_save, sender=ScriptModule)
def script_module_post_save_handler(instance, created, **kwargs):
instance.sync_classes()

View File

@ -6,6 +6,7 @@ __all__ = (
)
# Required by extras/migrations/0109_script_models.py
class Report(BaseScript):
#

View File

@ -17,7 +17,7 @@ from django.utils.translation import gettext as _
from core.choices import JobStatusChoices
from core.models import Job
from extras.choices import LogLevelChoices
from extras.models import ScriptModule
from extras.models import ScriptModule, Script as ScriptModel
from extras.signals import clear_events
from ipam.formfields import IPAddressFormField, IPNetworkFormField
from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
@ -582,7 +582,7 @@ def is_variable(obj):
def get_module_and_script(module_name, script_name):
module = ScriptModule.objects.get(file_path=f'{module_name}.py')
script = module.scripts.get(script_name)
script = module.scripts.get(name=script_name)
return module, script
@ -599,8 +599,7 @@ def run_script(data, job, request=None, commit=True, **kwargs):
"""
job.start()
module = ScriptModule.objects.get(pk=job.object_id)
script = module.scripts.get(job.name)()
script = ScriptModel.objects.get(pk=job.object_id).python_class()
logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
logger.info(f"Running script (commit={commit})")

View File

@ -11,7 +11,7 @@ from dcim.models import Device, DeviceRole, DeviceType, Manufacturer, Rack, Loca
from extras.choices import *
from extras.models import *
from extras.reports import Report
from extras.scripts import BooleanVar, IntegerVar, Script, StringVar
from extras.scripts import BooleanVar, IntegerVar, Script as PythonClass, StringVar
from utilities.testing import APITestCase, APIViewTestCases
User = get_user_model()
@ -748,7 +748,7 @@ class ConfigTemplateTest(APIViewTestCases.APIViewTestCase):
class ScriptTest(APITestCase):
class TestScript(Script):
class TestScriptClass(PythonClass):
class Meta:
name = "Test script"
@ -767,27 +767,36 @@ class ScriptTest(APITestCase):
@classmethod
def setUpTestData(cls):
ScriptModule.objects.create(
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='/var/tmp/script.py'
)
Script.objects.create(
module=module,
name="Test script",
is_executable=True,
)
def get_test_script(self, *args):
return ScriptModule.objects.first(), self.TestScript
def python_class(self):
return self.TestScriptClass
def setUp(self):
super().setUp()
# Monkey-patch the API viewset's _get_script() method to return our test Script above
# Monkey-patch the Script model to return our TestScriptClass above
from extras.api.views import ScriptViewSet
ScriptViewSet._get_script = self.get_test_script
Script.python_class = self.python_class
def test_get_script(self):
url = reverse('extras-api:script-detail', kwargs={'pk': None})
module = ScriptModule.objects.get(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='/var/tmp/script.py'
)
script = module.scripts.all().first()
url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
response = self.client.get(url, **self.header)
self.assertEqual(response.data['name'], self.TestScript.Meta.name)
self.assertEqual(response.data['name'], self.TestScriptClass.Meta.name)
self.assertEqual(response.data['vars']['var1'], 'StringVar')
self.assertEqual(response.data['vars']['var2'], 'IntegerVar')
self.assertEqual(response.data['vars']['var3'], 'BooleanVar')

View File

@ -120,10 +120,15 @@ urlpatterns = [
path('scripts/', views.ScriptListView.as_view(), name='script_list'),
path('scripts/add/', views.ScriptModuleCreateView.as_view(), name='scriptmodule_add'),
path('scripts/results/<int:job_pk>/', views.ScriptResultView.as_view(), name='script_result'),
path('scripts/<int:pk>/', include(get_model_urls('extras', 'scriptmodule'))),
path('scripts/<str:module>/<str:name>/', views.ScriptView.as_view(), name='script'),
path('scripts/<str:module>/<str:name>/source/', views.ScriptSourceView.as_view(), name='script_source'),
path('scripts/<str:module>/<str:name>/jobs/', views.ScriptJobsView.as_view(), name='script_jobs'),
path('scripts/<int:pk>/', views.ScriptView.as_view(), name='script'),
path('scripts/<int:pk>/source/', views.ScriptSourceView.as_view(), name='script_source'),
path('scripts/<int:pk>/jobs/', views.ScriptJobsView.as_view(), name='script_jobs'),
path('script-modules/<int:pk>/', include(get_model_urls('extras', 'scriptmodule'))),
# Redirects for legacy script URLs
# TODO: Remove in NetBox v4.1
path('scripts/<str:module>/<str:name>/', views.LegacyScriptRedirectView.as_view()),
path('scripts/<str:module>/<str:name>/<path:path>/', views.LegacyScriptRedirectView.as_view()),
# Markdown
path('render/markdown/', views.RenderMarkdownView.as_view(), name="render_markdown"),

View File

@ -920,7 +920,7 @@ class DashboardWidgetAddView(LoginRequiredMixin, View):
widget = widget_class(**data)
request.user.dashboard.add_widget(widget)
request.user.dashboard.save()
messages.success(request, f'Added widget {widget.id}')
messages.success(request, _('Added widget: ') + str(widget.id))
return HttpResponse(headers={
'HX-Redirect': reverse('home'),
@ -961,7 +961,7 @@ class DashboardWidgetConfigView(LoginRequiredMixin, View):
data['config'] = config_form.cleaned_data
request.user.dashboard.config[str(id)].update(data)
request.user.dashboard.save()
messages.success(request, f'Updated widget {widget.id}')
messages.success(request, _('Updated widget: ') + str(widget.id))
return HttpResponse(headers={
'HX-Redirect': reverse('home'),
@ -997,9 +997,9 @@ class DashboardWidgetDeleteView(LoginRequiredMixin, View):
if form.is_valid():
request.user.dashboard.delete_widget(id)
request.user.dashboard.save()
messages.success(request, f'Deleted widget {id}')
messages.success(request, _('Deleted widget: ') + str(id))
else:
messages.error(request, f'Error deleting widget: {form.errors[0]}')
messages.error(request, _('Error deleting widget: ') + str(form.errors[0]))
return redirect(reverse('home'))
@ -1030,7 +1030,7 @@ class ScriptListView(ContentTypePermissionRequiredMixin, View):
return 'extras.view_script'
def get(self, request):
script_modules = ScriptModule.objects.restrict(request.user)
script_modules = ScriptModule.objects.restrict(request.user).prefetch_related('jobs')
return render(request, 'extras/script_list.html', {
'model': ScriptModule,
@ -1038,123 +1038,122 @@ class ScriptListView(ContentTypePermissionRequiredMixin, View):
})
def get_script_module(module, request):
return get_object_or_404(ScriptModule.objects.restrict(request.user), file_path__regex=f"^{module}\\.")
class ScriptView(generic.ObjectView):
queryset = Script.objects.all()
class ScriptView(ContentTypePermissionRequiredMixin, View):
def get_required_permission(self):
return 'extras.view_script'
def get(self, request, module, name):
module = get_script_module(module, request)
script = module.scripts[name]()
jobs = module.get_jobs(script.class_name)
form = script.as_form(initial=normalize_querydict(request.GET))
def get(self, request, **kwargs):
script = self.get_object(**kwargs)
script_class = script.python_class()
form = script_class.as_form(initial=normalize_querydict(request.GET))
return render(request, 'extras/script.html', {
'job_count': jobs.count(),
'module': module,
'script': script,
'script_class': script_class,
'form': form,
'job_count': script.jobs.count(),
})
def post(self, request, module, name):
if not request.user.has_perm('extras.run_script'):
def post(self, request, **kwargs):
script = self.get_object(**kwargs)
script_class = script.python_class()
if not request.user.has_perm('extras.run_script', obj=script):
return HttpResponseForbidden()
module = get_script_module(module, request)
script = module.scripts[name]()
jobs = module.get_jobs(script.class_name)
form = script.as_form(request.POST, request.FILES)
form = script_class.as_form(request.POST, request.FILES)
# Allow execution only if RQ worker process is running
if not get_workers_for_queue('default'):
messages.error(request, "Unable to run script: RQ worker process not running.")
messages.error(request, _("Unable to run script: RQ worker process not running."))
elif form.is_valid():
job = Job.enqueue(
run_script,
instance=module,
name=script.class_name,
instance=script,
name=script_class.class_name,
user=request.user,
schedule_at=form.cleaned_data.pop('_schedule_at'),
interval=form.cleaned_data.pop('_interval'),
data=form.cleaned_data,
request=copy_safe_request(request),
job_timeout=script.job_timeout,
job_timeout=script.python_class.job_timeout,
commit=form.cleaned_data.pop('_commit')
)
return redirect('extras:script_result', job_pk=job.pk)
return render(request, 'extras/script.html', {
'job_count': jobs.count(),
'module': module,
'script': script,
'script_class': script.python_class(),
'form': form,
'job_count': script.jobs.count(),
})
class ScriptSourceView(ContentTypePermissionRequiredMixin, View):
class ScriptSourceView(generic.ObjectView):
queryset = Script.objects.all()
def get_required_permission(self):
return 'extras.view_script'
def get(self, request, module, name):
module = get_script_module(module, request)
script = module.scripts[name]()
jobs = module.get_jobs(script.class_name)
def get(self, request, **kwargs):
script = self.get_object(**kwargs)
return render(request, 'extras/script/source.html', {
'job_count': jobs.count(),
'module': module,
'script': script,
'script_class': script.python_class(),
'job_count': script.jobs.count(),
'tab': 'source',
})
class ScriptJobsView(ContentTypePermissionRequiredMixin, View):
class ScriptJobsView(generic.ObjectView):
queryset = Script.objects.all()
def get_required_permission(self):
return 'extras.view_script'
def get(self, request, module, name):
module = get_script_module(module, request)
script = module.scripts[name]()
jobs = module.get_jobs(script.class_name)
def get(self, request, **kwargs):
script = self.get_object(**kwargs)
jobs_table = JobTable(
data=jobs,
data=script.jobs.all(),
orderable=False,
user=request.user
)
jobs_table.configure(request)
return render(request, 'extras/script/jobs.html', {
'job_count': jobs.count(),
'module': module,
'script': script,
'table': jobs_table,
'job_count': script.jobs.count(),
'tab': 'jobs',
})
class ScriptResultView(ContentTypePermissionRequiredMixin, View):
class LegacyScriptRedirectView(ContentTypePermissionRequiredMixin, View):
"""
Redirect legacy (pre-v4.0) script URLs. Examples:
/extras/scripts/<module>/<name>/ --> /extras/scripts/<id>/
/extras/scripts/<module>/<name>/source/ --> /extras/scripts/<id>/source/
/extras/scripts/<module>/<name>/jobs/ --> /extras/scripts/<id>/jobs/
"""
def get_required_permission(self):
return 'extras.view_script'
def get(self, request, module, name, path=''):
module = get_object_or_404(ScriptModule.objects.restrict(request.user), file_path__regex=f"^{module}\\.")
script = get_object_or_404(Script.objects.all(), module=module, name=name)
url = reverse('extras:script', kwargs={'pk': script.pk})
return redirect(f'{url}{path}')
class ScriptResultView(generic.ObjectView):
queryset = Job.objects.all()
def get_required_permission(self):
return 'extras.view_script'
def get(self, request, job_pk):
object_type = ContentType.objects.get_by_natural_key(app_label='extras', model='scriptmodule')
job = get_object_or_404(Job.objects.all(), pk=job_pk, object_type=object_type)
module = job.object
script = module.scripts[job.name]()
def get(self, request, **kwargs):
job = get_object_or_404(Job.objects.all(), pk=kwargs.get('job_pk'))
context = {
'script': script,
'script': job.object,
'job': job,
}
if job.data and 'log' in job.data:

View File

@ -156,6 +156,7 @@ REMOTE_AUTH_SUPERUSERS = getattr(configuration, 'REMOTE_AUTH_SUPERUSERS', [])
REMOTE_AUTH_STAFF_GROUPS = getattr(configuration, 'REMOTE_AUTH_STAFF_GROUPS', [])
REMOTE_AUTH_STAFF_USERS = getattr(configuration, 'REMOTE_AUTH_STAFF_USERS', [])
REMOTE_AUTH_GROUP_SEPARATOR = getattr(configuration, 'REMOTE_AUTH_GROUP_SEPARATOR', '|')
# Required by extras/migrations/0109_script_models.py
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)
RQ_RETRY_INTERVAL = getattr(configuration, 'RQ_RETRY_INTERVAL', 60)

View File

@ -83,13 +83,7 @@
<tr>
<th scope="row">{% trans "Object" %}</th>
<td>
{% if object.action_type == 'script' %}
<a href="{% url 'extras:script' module=object.action_object.python_name name=object.action_parameters.script_name %}">
{{ object.action_object }} / {{ object.action_parameters.script_name }}
</a>
{% else %}
{{ object.action_object|linkify }}
{% endif %}
</td>
</tr>
<tr>

View File

@ -2,6 +2,7 @@
{% load helpers %}
{% load form_helpers %}
{% load log_levels %}
{% load perms %}
{% load i18n %}
{% block content %}
@ -17,7 +18,7 @@
{% csrf_token %}
<div class="field-group my-4">
{# Render grouped fields according to declared fieldsets #}
{% for group, fields in script.get_fieldsets %}
{% for group, fields in script_class.get_fieldsets %}
{% if fields %}
<div class="field-group mb-5">
<div class="row">
@ -32,9 +33,17 @@
{% endif %}
{% endfor %}
</div>
<div class="float-end">
<div class="text-end">
<a href="{% url 'extras:script_list' %}" class="btn btn-outline-secondary">{% trans "Cancel" %}</a>
<button type="submit" name="_run" class="btn btn-primary"{% if not perms.extras.run_script %} disabled="disabled"{% endif %}><i class="mdi mdi-play"></i> {% trans "Run Script" %}</button>
{% if not request.user|can_run:script or not script.is_executable %}
<button class="btn btn-primary" disabled>
<i class="mdi mdi-play"></i> {% trans "Run Script" %}
</button>
{% else %}
<button type="submit" name="_run" class="btn btn-primary">
<i class="mdi mdi-play"></i> {% trans "Run Script" %}
</button>
{% endif %}
</div>
</form>
</div>

View File

@ -12,7 +12,7 @@
{% block breadcrumbs %}
<li class="breadcrumb-item"><a href="{% url 'extras:script_list' %}">{% trans "Scripts" %}</a></li>
<li class="breadcrumb-item"><a href="{% url 'extras:script_list' %}#module{{ module.pk }}">{{ module|bettertitle }}</a></li>
<li class="breadcrumb-item"><a href="{% url 'extras:script_list' %}#module{{ script.module.pk }}">{{ script.module|bettertitle }}</a></li>
{% endblock breadcrumbs %}
{% block subtitle %}
@ -26,13 +26,13 @@
{% block tabs %}
<ul class="nav nav-tabs">
<li class="nav-item" role="presentation">
<a class="nav-link{% if not tab %} active{% endif %}" href="{% url 'extras:script' module=script.module name=script.class_name %}">{% trans "Script" %}</a>
<a class="nav-link{% if not tab %} active{% endif %}{% if not script.is_executable %} disabled{% endif %}" href="{{ script.get_absolute_url }}">{% trans "Script" %}</a>
</li>
<li class="nav-item" role="presentation">
<a class="nav-link{% if tab == 'source' %} active{% endif %}" href="{% url 'extras:script_source' module=script.module name=script.class_name %}">{% trans "Source" %}</a>
<a class="nav-link{% if tab == 'source' %} active{% endif %}{% if not script.is_executable %} disabled{% endif %}" href="{% url 'extras:script_source' script.id %}">{% trans "Source" %}</a>
</li>
<li class="nav-item" role="presentation">
<a class="nav-link{% if tab == 'jobs' %} active{% endif %}" href="{% url 'extras:script_jobs' module=script.module name=script.class_name %}">
<a class="nav-link{% if tab == 'jobs' %} active{% endif %}" href="{% url 'extras:script_jobs' script.id %}">
{% trans "Jobs" %} {% badge job_count %}
</a>
</li>

View File

@ -1,11 +1,24 @@
{% extends 'extras/script/base.html' %}
{% load render_table from django_tables2 %}
{% load i18n %}
{% block content %}
<div class="row mb-3">
<div class="col col-md-12">
<div class="card">
<div class="card-body table-responsive">
{% if not script.is_executable %}
<div class="alert alert-warning" role="alert">
<div class="d-flex justify-content-between">
<div>
<i class="mdi mdi-alert"></i>
{% trans "Script no longer exists in the source file." %}
</div>
</div>
</div>
{% endif %}
{% render_table table 'inc/table.html' %}
{% include 'inc/paginator.html' with paginator=table.paginator page=table.page %}
</div>

View File

@ -1,6 +1,6 @@
{% extends 'extras/script/base.html' %}
{% block content %}
<code class="h6 my-3 d-block">{{ script.filename }}</code>
<pre class="block">{{ script.source }}</pre>
<code class="h6 my-3 d-block">{{ script_class.filename }}</code>
<pre class="block">{{ script_class.source }}</pre>
{% endblock %}

View File

@ -26,11 +26,18 @@
<div>
<i class="mdi mdi-file-document-outline"></i> {{ module }}
</div>
{% if perms.extras.delete_scriptmodule %}
<a href="{% url 'extras:scriptmodule_delete' pk=module.pk %}" class="btn btn-danger btn-sm">
<i class="mdi mdi-trash-can-outline" aria-hidden="true"></i> {% trans "Delete" %}
</a>
{% endif %}
<div>
{% if perms.extras.edit_scriptmodule %}
<a href="{% url 'extras:scriptmodule_edit' pk=module.pk %}" class="btn btn-warning btn-sm">
<i class="mdi mdi-pencil" aria-hidden="true"></i> {% trans "Edit" %}
</a>
{% endif %}
{% if perms.extras.delete_scriptmodule %}
<a href="{% url 'extras:scriptmodule_delete' pk=module.pk %}" class="btn btn-danger btn-sm">
<i class="mdi mdi-trash-can-outline" aria-hidden="true"></i> {% trans "Delete" %}
</a>
{% endif %}
</div>
</h5>
{% if module.scripts %}
<table class="table table-hover scripts">
@ -44,75 +51,80 @@
</tr>
</thead>
<tbody>
{% with jobs=module.get_latest_jobs %}
{% for script_name, script in module.scripts.items %}
{% with last_job=jobs|get_key:script.class_name %}
<tr>
<td>
<a href="{% url 'extras:script' module=module.python_name name=script.class_name %}" id="{{ script.module }}.{{ script.class_name }}">{{ script.name }}</a>
</td>
<td>{{ script.description|markdown|placeholder }}</td>
{% if last_job %}
<td>
<a href="{% url 'extras:script_result' job_pk=last_job.pk %}">{{ last_job.created|annotated_date }}</a>
</td>
<td>
{% badge last_job.get_status_display last_job.get_status_color %}
</td>
{% for script in module.scripts.all %}
{% with last_job=script.get_latest_jobs|get_key:script.name %}
<tr>
<td>
{% if script.is_executable %}
<a href="{% url 'extras:script' script.pk %}" id="{{ script.module }}.{{ script.class_name }}">{{ script.name }}</a>
{% else %}
<td class="text-muted">{% trans "Never" %}</td>
<td>{{ ''|placeholder }}</td>
<a href="{% url 'extras:script_jobs' script.pk %}" id="{{ script.module }}.{{ script.class_name }}">{{ script.name }}</a>
<span class="text-danger">
<i class="mdi mdi-alert" title="{% trans "Script is no longer present in the source file" %}"></i>
</span>
{% endif %}
<td>
{% if perms.extras.run_script %}
<div class="float-end d-print-none">
<form action="{% url 'extras:script' module=script.module name=script.class_name %}" method="post">
{% csrf_token %}
<button type="submit" name="_run" class="btn btn-primary btn-sm">
{% if last_job %}
<i class="mdi mdi-replay"></i> {% trans "Run Again" %}
{% else %}
<i class="mdi mdi-play"></i> {% trans "Run Script" %}
{% endif %}
</button>
</form>
</div>
{% endif %}
</td>
</tr>
</td>
<td>{{ script.description|markdown|placeholder }}</td>
{% if last_job %}
{% for test_name, data in last_job.data.tests.items %}
<tr>
<td colspan="4" class="method">
<span class="ps-3">{{ test_name }}</span>
</td>
<td class="text-end text-nowrap script-stats">
<span class="badge text-bg-success">{{ data.success }}</span>
<span class="badge text-bg-info">{{ data.info }}</span>
<span class="badge text-bg-warning">{{ data.warning }}</span>
<span class="badge text-bg-danger">{{ data.failure }}</span>
</td>
</tr>
{% endfor %}
{% elif not last_job.data.log %}
{# legacy #}
{% for method, stats in last_job.data.items %}
<tr>
<td colspan="4" class="method">
<span class="ps-3">{{ method }}</span>
</td>
<td class="text-end text-nowrap report-stats">
<span class="badge bg-success">{{ stats.success }}</span>
<span class="badge bg-info">{{ stats.info }}</span>
<span class="badge bg-warning">{{ stats.warning }}</span>
<span class="badge bg-danger">{{ stats.failure }}</span>
</td>
</tr>
{% endfor %}
<td>
<a href="{% url 'extras:script_result' job_pk=last_job.pk %}">{{ last_job.created|annotated_date }}</a>
</td>
<td>
{% badge last_job.get_status_display last_job.get_status_color %}
</td>
{% else %}
<td class="text-muted">{% trans "Never" %}</td>
<td>{{ ''|placeholder }}</td>
{% endif %}
{% endwith %}
{% endfor %}
{% endwith %}
<td>
{% if request.user|can_run:script and script.is_executable %}
<div class="float-end d-print-none">
<form action="{% url 'extras:script' script.pk %}" method="post">
{% csrf_token %}
<button type="submit" name="_run" class="btn btn-primary btn-sm">
{% if last_job %}
<i class="mdi mdi-replay"></i> {% trans "Run Again" %}
{% else %}
<i class="mdi mdi-play"></i> {% trans "Run Script" %}
{% endif %}
</button>
</form>
</div>
{% endif %}
</td>
</tr>
{% if last_job %}
{% for test_name, data in last_job.data.tests.items %}
<tr>
<td colspan="4" class="method">
<span class="ps-3">{{ test_name }}</span>
</td>
<td class="text-end text-nowrap script-stats">
<span class="badge text-bg-success">{{ data.success }}</span>
<span class="badge text-bg-info">{{ data.info }}</span>
<span class="badge text-bg-warning">{{ data.warning }}</span>
<span class="badge text-bg-danger">{{ data.failure }}</span>
</td>
</tr>
{% endfor %}
{% elif not last_job.data.log %}
{# legacy #}
{% for method, stats in last_job.data.items %}
<tr>
<td colspan="4" class="method">
<span class="ps-3">{{ method }}</span>
</td>
<td class="text-end text-nowrap report-stats">
<span class="badge bg-success">{{ stats.success }}</span>
<span class="badge bg-info">{{ stats.info }}</span>
<span class="badge bg-warning">{{ stats.warning }}</span>
<span class="badge bg-danger">{{ stats.failure }}</span>
</td>
</tr>
{% endfor %}
{% endif %}
{% endwith %}
{% endfor %}
</tbody>
</table>
{% else %}

View File

@ -16,7 +16,7 @@
<ol class="breadcrumb">
<li class="breadcrumb-item"><a href="{% url 'extras:script_list' %}">{% trans "Scripts" %}</a></li>
<li class="breadcrumb-item"><a href="{% url 'extras:script_list' %}#module.{{ script.module }}">{{ script.module|bettertitle }}</a></li>
<li class="breadcrumb-item"><a href="{% url 'extras:script' module=script.module name=script.class_name %}">{{ script }}</a></li>
<li class="breadcrumb-item"><a href="{{ script.get_absolute_url }}">{{ script }}</a></li>
<li class="breadcrumb-item">{{ job.created|annotated_date }}</li>
</ol>
</nav>

View File

@ -376,11 +376,6 @@ class ObjectPermissionForm(forms.ModelForm):
for ct in object_types:
model = ct.model_class()
if model._meta.model_name in ['script', 'report']:
raise forms.ValidationError({
'constraints': _('Constraints are not supported for this object type.')
})
try:
tokens = {
CONSTRAINT_TOKEN_USER: 0, # Replace token with a null user ID

View File

@ -6,6 +6,7 @@ __all__ = (
'can_add',
'can_change',
'can_delete',
'can_run',
'can_sync',
'can_view',
)
@ -42,3 +43,8 @@ def can_delete(user, instance):
@register.filter()
def can_sync(user, instance):
return _check_permission(user, instance, 'sync')
@register.filter()
def can_run(user, instance):
return _check_permission(user, instance, 'run')