From 53a4420bc3c7a2c42f793c5f36c8f446928f1b0b Mon Sep 17 00:00:00 2001 From: Alexander Haase Date: Sat, 22 Jun 2024 00:36:12 +0200 Subject: [PATCH] Implement BackgroundJob for running scripts The independent implementations of interactive and background script execution have been merged into a single BackgroundJob implementation. --- netbox/extras/api/views.py | 6 +- netbox/extras/events.py | 5 +- netbox/extras/jobs.py | 105 +++++++++++++++ .../extras/management/commands/runscript.py | 107 ++++------------ netbox/extras/scripts.py | 120 +----------------- netbox/extras/views.py | 6 +- 6 files changed, 141 insertions(+), 208 deletions(-) create mode 100644 netbox/extras/jobs.py diff --git a/netbox/extras/api/views.py b/netbox/extras/api/views.py index 34565384b..a82099335 100644 --- a/netbox/extras/api/views.py +++ b/netbox/extras/api/views.py @@ -1,5 +1,6 @@ from django.http import Http404 from django.shortcuts import get_object_or_404 +from django.utils.module_loading import import_string from django_rq.queues import get_connection from rest_framework import status from rest_framework.decorators import action @@ -14,7 +15,6 @@ from rq import Worker from core.models import Job, ObjectType from extras import filtersets from extras.models import * -from extras.scripts import run_script from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired from netbox.api.features import SyncedDataMixin from netbox.api.metadata import ContentTypeMetadata @@ -252,8 +252,8 @@ class ScriptViewSet(ModelViewSet): raise RQWorkerNotRunningException() if input_serializer.is_valid(): - Job.enqueue( - run_script, + ScriptJob = import_string("extras.jobs.ScriptJob") + ScriptJob.enqueue( instance=script, name=script.python_class.class_name, user=request.user, diff --git a/netbox/extras/events.py b/netbox/extras/events.py index dae3f29cf..be7a18fc5 100644 --- a/netbox/extras/events.py +++ b/netbox/extras/events.py @@ -9,7 +9,6 @@ from django.utils.translation import gettext as _ from django_rq import get_queue from core.choices import ObjectChangeActionChoices -from core.models import Job from netbox.config import get_config from netbox.constants import RQ_QUEUE_DEFAULT from netbox.registry import registry @@ -125,8 +124,8 @@ def process_event_rules(event_rules, model_name, event, data, username=None, sna script = event_rule.action_object.python_class() # Enqueue a Job to record the script's execution - Job.enqueue( - "extras.scripts.run_script", + ScriptJob = import_string("extras.jobs.ScriptJob") + ScriptJob.enqueue( instance=event_rule.action_object, name=script.name, user=user, diff --git a/netbox/extras/jobs.py b/netbox/extras/jobs.py new file mode 100644 index 000000000..5ee2c351b --- /dev/null +++ b/netbox/extras/jobs.py @@ -0,0 +1,105 @@ +import logging +import traceback +from contextlib import nullcontext + +from django.db import transaction +from django.utils.translation import gettext as _ + +from extras.models import Script as ScriptModel +from extras.signals import clear_events +from netbox.context_managers import event_tracking +from utilities.exceptions import AbortScript, AbortTransaction +from utilities.jobs import BackgroundJob +from .utils import is_report + + +class ScriptJob(BackgroundJob): + """ + Script execution job. + + A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It + exists outside the Script class to ensure it cannot be overridden by a script author. + """ + + @staticmethod + def run_script(script, job, request, data, commit): + """ + Core script execution task. We capture this within a method to allow for conditionally wrapping it with the + event_tracking context manager (which is bypassed if commit == False). + + Args: + job: The Job associated with this execution + request: The WSGI request associated with this execution (if any) + data: A dictionary of data to be passed to the script upon execution + commit: Passed through to Script.run() + """ + try: + try: + with transaction.atomic(): + script.output = script.run(data, commit) + if not commit: + raise AbortTransaction() + except AbortTransaction: + script.log_info(message=_("Database changes have been reverted automatically.")) + if script.failed: + logger.warning(f"Script failed") + raise + + except Exception as e: + if type(e) is AbortScript: + msg = _("Script aborted with error: ") + str(e) + if is_report(type(script)): + script.log_failure(message=msg) + else: + script.log_failure(msg) + logger.error(f"Script aborted with error: {e}") + + else: + stacktrace = traceback.format_exc() + script.log_failure( + message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```" + ) + logger.error(f"Exception raised during script execution: {e}") + + if type(e) is not AbortTransaction: + script.log_info(message=_("Database changes have been reverted due to error.")) + + # Clear all pending events. Job termination (including setting the status) is handled by the job framework. + if request: + clear_events.send(request) + raise + + # Update the job data regardless of the execution status of the job. Successes should be reported as well as + # failures. + finally: + job.data = script.get_job_data() + + @classmethod + def run(cls, job, data, request=None, commit=True, **kwargs): + """ + Run the script. + + Args: + job: The Job associated with this execution + data: A dictionary of data to be passed to the script upon execution + request: The WSGI request associated with this execution (if any) + commit: Passed through to Script.run() + """ + script = ScriptModel.objects.get(pk=job.object_id).python_class() + + logger = logging.getLogger(f"netbox.scripts.{script.full_name}") + logger.info(f"Running script (commit={commit})") + + # Add files to form data + if request: + files = request.FILES + for field_name, fileobj in files.items(): + data[field_name] = fileobj + + # Add the current request as a property of the script + script.request = request + + # Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process + # change logging, event rules, etc. + with event_tracking(request) if commit else nullcontext(): + cls.run_script(script, job, request, data, commit) diff --git a/netbox/extras/management/commands/runscript.py b/netbox/extras/management/commands/runscript.py index dbfbb40d9..ad828971c 100644 --- a/netbox/extras/management/commands/runscript.py +++ b/netbox/extras/management/commands/runscript.py @@ -1,19 +1,13 @@ import json import logging import sys -import traceback import uuid from django.contrib.auth import get_user_model from django.core.management.base import BaseCommand, CommandError -from django.db import transaction +from django.utils.module_loading import import_string -from core.choices import JobStatusChoices -from core.models import Job from extras.scripts import get_module_and_script -from extras.signals import clear_events -from netbox.context_managers import event_tracking -from utilities.exceptions import AbortTransaction from utilities.request import NetBoxFakeRequest @@ -33,44 +27,6 @@ class Command(BaseCommand): parser.add_argument('script', help="Script to run") def handle(self, *args, **options): - - def _run_script(): - """ - Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with - the event_tracking context manager (which is bypassed if commit == False). - """ - try: - try: - with transaction.atomic(): - script.output = script.run(data=data, commit=commit) - if not commit: - raise AbortTransaction() - except AbortTransaction: - script.log_info("Database changes have been reverted automatically.") - clear_events.send(request) - job.data = script.get_job_data() - job.terminate() - except Exception as e: - stacktrace = traceback.format_exc() - script.log_failure( - f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```" - ) - script.log_info("Database changes have been reverted due to error.") - logger.error(f"Exception raised during script execution: {e}") - clear_events.send(request) - job.data = script.get_job_data() - job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) - - # Print any test method results - for test_name, attrs in job.data['tests'].items(): - self.stdout.write( - "\t{}: {} success, {} info, {} warning, {} failure".format( - test_name, attrs['success'], attrs['info'], attrs['warning'], attrs['failure'] - ) - ) - - logger.info(f"Script completed in {job.duration}") - User = get_user_model() # Params @@ -84,8 +40,8 @@ class Command(BaseCommand): data = {} module_name, script_name = script.split('.', 1) - module, script = get_module_and_script(module_name, script_name) - script = script.python_class + module, script_obj = get_module_and_script(module_name, script_name) + script = script_obj.python_class # Take user from command line if provided and exists, other if options['user']: @@ -120,40 +76,31 @@ class Command(BaseCommand): # Initialize the script form script = script() form = script.as_form(data, None) - - # Create the job - job = Job.objects.create( - object=module, - name=script.class_name, - user=User.objects.filter(is_superuser=True).order_by('pk')[0], - job_id=uuid.uuid4() - ) - - request = NetBoxFakeRequest({ - 'META': {}, - 'POST': data, - 'GET': {}, - 'FILES': {}, - 'user': user, - 'path': '', - 'id': job.job_id - }) - - if form.is_valid(): - job.status = JobStatusChoices.STATUS_RUNNING - job.save() - - logger.info(f"Running script (commit={commit})") - script.request = request - - # Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process - # change logging, webhooks, etc. - with event_tracking(request): - _run_script() - else: + if not form.is_valid(): logger.error('Data is not valid:') for field, errors in form.errors.get_json_data().items(): for error in errors: logger.error(f'\t{field}: {error.get("message")}') - job.status = JobStatusChoices.STATUS_ERRORED - job.save() + raise CommandError() + + # Execute the script. + ScriptJob = import_string("extras.jobs.ScriptJob") + job = ScriptJob.enqueue( + instance=script_obj, + name=script.name, + user=user, + run_now=True, + data=data, + request=NetBoxFakeRequest({ + 'META': {}, + 'POST': data, + 'GET': {}, + 'FILES': {}, + 'user': user, + 'path': '', + 'id': uuid.uuid4() + }), + commit=commit, + ) + + logger.info(f"Script completed in {job.duration}") diff --git a/netbox/extras/scripts.py b/netbox/extras/scripts.py index b4a1d6de1..47fe44205 100644 --- a/netbox/extras/scripts.py +++ b/netbox/extras/scripts.py @@ -2,32 +2,23 @@ import inspect import json import logging import os -import traceback -from datetime import timedelta import yaml from django import forms from django.conf import settings from django.core.validators import RegexValidator -from django.db import transaction from django.utils import timezone from django.utils.functional import classproperty from django.utils.translation import gettext as _ -from core.choices import JobStatusChoices -from core.models import Job from extras.choices import LogLevelChoices -from extras.models import ScriptModule, Script as ScriptModel -from extras.signals import clear_events +from extras.models import ScriptModule from ipam.formfields import IPAddressFormField, IPNetworkFormField from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator -from netbox.context_managers import event_tracking -from utilities.exceptions import AbortScript, AbortTransaction from utilities.forms import add_blank_choice from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField from utilities.forms.widgets import DatePicker, DateTimePicker from .forms import ScriptForm -from .utils import is_report __all__ = ( @@ -48,7 +39,6 @@ __all__ = ( 'StringVar', 'TextVar', 'get_module_and_script', - 'run_script', ) @@ -609,111 +599,3 @@ def get_module_and_script(module_name, script_name): module = ScriptModule.objects.get(file_path=f'{module_name}.py') script = module.scripts.get(name=script_name) return module, script - - -def run_script(data, job, request=None, commit=True, **kwargs): - """ - A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It - exists outside the Script class to ensure it cannot be overridden by a script author. - - Args: - data: A dictionary of data to be passed to the script upon execution - job: The Job associated with this execution - request: The WSGI request associated with this execution (if any) - commit: Passed through to Script.run() - """ - job.start() - - script = ScriptModel.objects.get(pk=job.object_id).python_class() - - logger = logging.getLogger(f"netbox.scripts.{script.full_name}") - logger.info(f"Running script (commit={commit})") - - # Add files to form data - if request: - files = request.FILES - for field_name, fileobj in files.items(): - data[field_name] = fileobj - - # Add the current request as a property of the script - script.request = request - - def set_job_data(script): - job.data = { - 'log': script.messages, - 'output': script.output, - 'tests': script.tests, - } - - return job - - def _run_script(job): - """ - Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with - the event_tracking context manager (which is bypassed if commit == False). - """ - try: - try: - with transaction.atomic(): - script.output = script.run(data, commit) - if not commit: - raise AbortTransaction() - except AbortTransaction: - script.log_info(message=_("Database changes have been reverted automatically.")) - if request: - clear_events.send(request) - - job.data = script.get_job_data() - if script.failed: - logger.warning(f"Script failed") - job.terminate(status=JobStatusChoices.STATUS_FAILED) - else: - job.terminate() - - except Exception as e: - if type(e) is AbortScript: - msg = _("Script aborted with error: ") + str(e) - if is_report(type(script)): - script.log_failure(message=msg) - else: - script.log_failure(msg) - - logger.error(f"Script aborted with error: {e}") - else: - stacktrace = traceback.format_exc() - script.log_failure( - message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```" - ) - logger.error(f"Exception raised during script execution: {e}") - script.log_info(message=_("Database changes have been reverted due to error.")) - - job.data = script.get_job_data() - job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) - if request: - clear_events.send(request) - - logger.info(f"Script completed in {job.duration}") - - # Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process - # change logging, event rules, etc. - if commit: - with event_tracking(request): - _run_script(job) - else: - _run_script(job) - - # Schedule the next job if an interval has been set - if job.interval: - new_scheduled_time = job.scheduled + timedelta(minutes=job.interval) - Job.enqueue( - run_script, - instance=job.object, - name=job.name, - user=job.user, - schedule_at=new_scheduled_time, - interval=job.interval, - job_timeout=script.job_timeout, - data=data, - request=request, - commit=commit - ) diff --git a/netbox/extras/views.py b/netbox/extras/views.py index efbf2c73a..a9515d549 100644 --- a/netbox/extras/views.py +++ b/netbox/extras/views.py @@ -6,6 +6,7 @@ from django.db.models import Count, Q from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponse from django.shortcuts import get_object_or_404, redirect, render from django.urls import reverse +from django.utils.module_loading import import_string from django.utils.translation import gettext as _ from django.views.generic import View @@ -31,7 +32,6 @@ from utilities.views import ContentTypePermissionRequiredMixin, get_viewname, re from virtualization.models import VirtualMachine from . import filtersets, forms, tables from .models import * -from .scripts import run_script from .tables import ReportResultsTable, ScriptResultsTable @@ -1030,8 +1030,8 @@ class ScriptView(BaseScriptView): if not get_workers_for_queue('default'): messages.error(request, _("Unable to run script: RQ worker process not running.")) elif form.is_valid(): - job = Job.enqueue( - run_script, + ScriptJob = import_string("extras.jobs.ScriptJob") + job = ScriptJob.enqueue( instance=script, name=script_class.class_name, user=request.user,