Implement BackgroundJob for running scripts

The independent implementations of interactive and background script
execution have been merged into a single BackgroundJob implementation.
This commit is contained in:
Alexander Haase 2024-06-22 00:36:12 +02:00
parent db591d422a
commit 53a4420bc3
6 changed files with 141 additions and 208 deletions

View File

@ -1,5 +1,6 @@
from django.http import Http404 from django.http import Http404
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.utils.module_loading import import_string
from django_rq.queues import get_connection from django_rq.queues import get_connection
from rest_framework import status from rest_framework import status
from rest_framework.decorators import action from rest_framework.decorators import action
@ -14,7 +15,6 @@ from rq import Worker
from core.models import Job, ObjectType from core.models import Job, ObjectType
from extras import filtersets from extras import filtersets
from extras.models import * from extras.models import *
from extras.scripts import run_script
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata from netbox.api.metadata import ContentTypeMetadata
@ -252,8 +252,8 @@ class ScriptViewSet(ModelViewSet):
raise RQWorkerNotRunningException() raise RQWorkerNotRunningException()
if input_serializer.is_valid(): if input_serializer.is_valid():
Job.enqueue( ScriptJob = import_string("extras.jobs.ScriptJob")
run_script, ScriptJob.enqueue(
instance=script, instance=script,
name=script.python_class.class_name, name=script.python_class.class_name,
user=request.user, user=request.user,

View File

@ -9,7 +9,6 @@ from django.utils.translation import gettext as _
from django_rq import get_queue from django_rq import get_queue
from core.choices import ObjectChangeActionChoices from core.choices import ObjectChangeActionChoices
from core.models import Job
from netbox.config import get_config from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.registry import registry from netbox.registry import registry
@ -125,8 +124,8 @@ def process_event_rules(event_rules, model_name, event, data, username=None, sna
script = event_rule.action_object.python_class() script = event_rule.action_object.python_class()
# Enqueue a Job to record the script's execution # Enqueue a Job to record the script's execution
Job.enqueue( ScriptJob = import_string("extras.jobs.ScriptJob")
"extras.scripts.run_script", ScriptJob.enqueue(
instance=event_rule.action_object, instance=event_rule.action_object,
name=script.name, name=script.name,
user=user, user=user,

105
netbox/extras/jobs.py Normal file
View File

@ -0,0 +1,105 @@
import logging
import traceback
from contextlib import nullcontext
from django.db import transaction
from django.utils.translation import gettext as _
from extras.models import Script as ScriptModel
from extras.signals import clear_events
from netbox.context_managers import event_tracking
from utilities.exceptions import AbortScript, AbortTransaction
from utilities.jobs import BackgroundJob
from .utils import is_report
class ScriptJob(BackgroundJob):
"""
Script execution job.
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside the Script class to ensure it cannot be overridden by a script author.
"""
@staticmethod
def run_script(script, job, request, data, commit):
"""
Core script execution task. We capture this within a method to allow for conditionally wrapping it with the
event_tracking context manager (which is bypassed if commit == False).
Args:
job: The Job associated with this execution
request: The WSGI request associated with this execution (if any)
data: A dictionary of data to be passed to the script upon execution
commit: Passed through to Script.run()
"""
try:
try:
with transaction.atomic():
script.output = script.run(data, commit)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info(message=_("Database changes have been reverted automatically."))
if script.failed:
logger.warning(f"Script failed")
raise
except Exception as e:
if type(e) is AbortScript:
msg = _("Script aborted with error: ") + str(e)
if is_report(type(script)):
script.log_failure(message=msg)
else:
script.log_failure(msg)
logger.error(f"Script aborted with error: {e}")
else:
stacktrace = traceback.format_exc()
script.log_failure(
message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
logger.error(f"Exception raised during script execution: {e}")
if type(e) is not AbortTransaction:
script.log_info(message=_("Database changes have been reverted due to error."))
# Clear all pending events. Job termination (including setting the status) is handled by the job framework.
if request:
clear_events.send(request)
raise
# Update the job data regardless of the execution status of the job. Successes should be reported as well as
# failures.
finally:
job.data = script.get_job_data()
@classmethod
def run(cls, job, data, request=None, commit=True, **kwargs):
"""
Run the script.
Args:
job: The Job associated with this execution
data: A dictionary of data to be passed to the script upon execution
request: The WSGI request associated with this execution (if any)
commit: Passed through to Script.run()
"""
script = ScriptModel.objects.get(pk=job.object_id).python_class()
logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
logger.info(f"Running script (commit={commit})")
# Add files to form data
if request:
files = request.FILES
for field_name, fileobj in files.items():
data[field_name] = fileobj
# Add the current request as a property of the script
script.request = request
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
# change logging, event rules, etc.
with event_tracking(request) if commit else nullcontext():
cls.run_script(script, job, request, data, commit)

View File

@ -1,19 +1,13 @@
import json import json
import logging import logging
import sys import sys
import traceback
import uuid import uuid
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.db import transaction from django.utils.module_loading import import_string
from core.choices import JobStatusChoices
from core.models import Job
from extras.scripts import get_module_and_script from extras.scripts import get_module_and_script
from extras.signals import clear_events
from netbox.context_managers import event_tracking
from utilities.exceptions import AbortTransaction
from utilities.request import NetBoxFakeRequest from utilities.request import NetBoxFakeRequest
@ -33,44 +27,6 @@ class Command(BaseCommand):
parser.add_argument('script', help="Script to run") parser.add_argument('script', help="Script to run")
def handle(self, *args, **options): def handle(self, *args, **options):
def _run_script():
"""
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
the event_tracking context manager (which is bypassed if commit == False).
"""
try:
try:
with transaction.atomic():
script.output = script.run(data=data, commit=commit)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info("Database changes have been reverted automatically.")
clear_events.send(request)
job.data = script.get_job_data()
job.terminate()
except Exception as e:
stacktrace = traceback.format_exc()
script.log_failure(
f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
script.log_info("Database changes have been reverted due to error.")
logger.error(f"Exception raised during script execution: {e}")
clear_events.send(request)
job.data = script.get_job_data()
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
# Print any test method results
for test_name, attrs in job.data['tests'].items():
self.stdout.write(
"\t{}: {} success, {} info, {} warning, {} failure".format(
test_name, attrs['success'], attrs['info'], attrs['warning'], attrs['failure']
)
)
logger.info(f"Script completed in {job.duration}")
User = get_user_model() User = get_user_model()
# Params # Params
@ -84,8 +40,8 @@ class Command(BaseCommand):
data = {} data = {}
module_name, script_name = script.split('.', 1) module_name, script_name = script.split('.', 1)
module, script = get_module_and_script(module_name, script_name) module, script_obj = get_module_and_script(module_name, script_name)
script = script.python_class script = script_obj.python_class
# Take user from command line if provided and exists, other # Take user from command line if provided and exists, other
if options['user']: if options['user']:
@ -120,15 +76,21 @@ class Command(BaseCommand):
# Initialize the script form # Initialize the script form
script = script() script = script()
form = script.as_form(data, None) form = script.as_form(data, None)
if not form.is_valid():
logger.error('Data is not valid:')
for field, errors in form.errors.get_json_data().items():
for error in errors:
logger.error(f'\t{field}: {error.get("message")}')
raise CommandError()
# Create the job # Execute the script.
job = Job.objects.create( ScriptJob = import_string("extras.jobs.ScriptJob")
object=module, job = ScriptJob.enqueue(
name=script.class_name, instance=script_obj,
user=User.objects.filter(is_superuser=True).order_by('pk')[0], name=script.name,
job_id=uuid.uuid4() user=user,
) run_now=True,
data=data,
request=NetBoxFakeRequest({ request=NetBoxFakeRequest({
'META': {}, 'META': {},
'POST': data, 'POST': data,
@ -136,24 +98,9 @@ class Command(BaseCommand):
'FILES': {}, 'FILES': {},
'user': user, 'user': user,
'path': '', 'path': '',
'id': job.job_id 'id': uuid.uuid4()
}) }),
commit=commit,
)
if form.is_valid(): logger.info(f"Script completed in {job.duration}")
job.status = JobStatusChoices.STATUS_RUNNING
job.save()
logger.info(f"Running script (commit={commit})")
script.request = request
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
# change logging, webhooks, etc.
with event_tracking(request):
_run_script()
else:
logger.error('Data is not valid:')
for field, errors in form.errors.get_json_data().items():
for error in errors:
logger.error(f'\t{field}: {error.get("message")}')
job.status = JobStatusChoices.STATUS_ERRORED
job.save()

View File

@ -2,32 +2,23 @@ import inspect
import json import json
import logging import logging
import os import os
import traceback
from datetime import timedelta
import yaml import yaml
from django import forms from django import forms
from django.conf import settings from django.conf import settings
from django.core.validators import RegexValidator from django.core.validators import RegexValidator
from django.db import transaction
from django.utils import timezone from django.utils import timezone
from django.utils.functional import classproperty from django.utils.functional import classproperty
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from core.choices import JobStatusChoices
from core.models import Job
from extras.choices import LogLevelChoices from extras.choices import LogLevelChoices
from extras.models import ScriptModule, Script as ScriptModel from extras.models import ScriptModule
from extras.signals import clear_events
from ipam.formfields import IPAddressFormField, IPNetworkFormField from ipam.formfields import IPAddressFormField, IPNetworkFormField
from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
from netbox.context_managers import event_tracking
from utilities.exceptions import AbortScript, AbortTransaction
from utilities.forms import add_blank_choice from utilities.forms import add_blank_choice
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField
from utilities.forms.widgets import DatePicker, DateTimePicker from utilities.forms.widgets import DatePicker, DateTimePicker
from .forms import ScriptForm from .forms import ScriptForm
from .utils import is_report
__all__ = ( __all__ = (
@ -48,7 +39,6 @@ __all__ = (
'StringVar', 'StringVar',
'TextVar', 'TextVar',
'get_module_and_script', 'get_module_and_script',
'run_script',
) )
@ -609,111 +599,3 @@ def get_module_and_script(module_name, script_name):
module = ScriptModule.objects.get(file_path=f'{module_name}.py') module = ScriptModule.objects.get(file_path=f'{module_name}.py')
script = module.scripts.get(name=script_name) script = module.scripts.get(name=script_name)
return module, script return module, script
def run_script(data, job, request=None, commit=True, **kwargs):
"""
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside the Script class to ensure it cannot be overridden by a script author.
Args:
data: A dictionary of data to be passed to the script upon execution
job: The Job associated with this execution
request: The WSGI request associated with this execution (if any)
commit: Passed through to Script.run()
"""
job.start()
script = ScriptModel.objects.get(pk=job.object_id).python_class()
logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
logger.info(f"Running script (commit={commit})")
# Add files to form data
if request:
files = request.FILES
for field_name, fileobj in files.items():
data[field_name] = fileobj
# Add the current request as a property of the script
script.request = request
def set_job_data(script):
job.data = {
'log': script.messages,
'output': script.output,
'tests': script.tests,
}
return job
def _run_script(job):
"""
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
the event_tracking context manager (which is bypassed if commit == False).
"""
try:
try:
with transaction.atomic():
script.output = script.run(data, commit)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info(message=_("Database changes have been reverted automatically."))
if request:
clear_events.send(request)
job.data = script.get_job_data()
if script.failed:
logger.warning(f"Script failed")
job.terminate(status=JobStatusChoices.STATUS_FAILED)
else:
job.terminate()
except Exception as e:
if type(e) is AbortScript:
msg = _("Script aborted with error: ") + str(e)
if is_report(type(script)):
script.log_failure(message=msg)
else:
script.log_failure(msg)
logger.error(f"Script aborted with error: {e}")
else:
stacktrace = traceback.format_exc()
script.log_failure(
message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
logger.error(f"Exception raised during script execution: {e}")
script.log_info(message=_("Database changes have been reverted due to error."))
job.data = script.get_job_data()
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if request:
clear_events.send(request)
logger.info(f"Script completed in {job.duration}")
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
# change logging, event rules, etc.
if commit:
with event_tracking(request):
_run_script(job)
else:
_run_script(job)
# Schedule the next job if an interval has been set
if job.interval:
new_scheduled_time = job.scheduled + timedelta(minutes=job.interval)
Job.enqueue(
run_script,
instance=job.object,
name=job.name,
user=job.user,
schedule_at=new_scheduled_time,
interval=job.interval,
job_timeout=script.job_timeout,
data=data,
request=request,
commit=commit
)

View File

@ -6,6 +6,7 @@ from django.db.models import Count, Q
from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponse from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponse
from django.shortcuts import get_object_or_404, redirect, render from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse from django.urls import reverse
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from django.views.generic import View from django.views.generic import View
@ -31,7 +32,6 @@ from utilities.views import ContentTypePermissionRequiredMixin, get_viewname, re
from virtualization.models import VirtualMachine from virtualization.models import VirtualMachine
from . import filtersets, forms, tables from . import filtersets, forms, tables
from .models import * from .models import *
from .scripts import run_script
from .tables import ReportResultsTable, ScriptResultsTable from .tables import ReportResultsTable, ScriptResultsTable
@ -1030,8 +1030,8 @@ class ScriptView(BaseScriptView):
if not get_workers_for_queue('default'): if not get_workers_for_queue('default'):
messages.error(request, _("Unable to run script: RQ worker process not running.")) messages.error(request, _("Unable to run script: RQ worker process not running."))
elif form.is_valid(): elif form.is_valid():
job = Job.enqueue( ScriptJob = import_string("extras.jobs.ScriptJob")
run_script, job = ScriptJob.enqueue(
instance=script, instance=script,
name=script_class.class_name, name=script_class.class_name,
user=request.user, user=request.user,