Files
netbox/netbox/utilities/jobs.py
Alexander Haase faf67b3437 15692: Introduce background jobs (#16927)
* Introduce reusable BackgroundJob framework

A new abstract class can be used to implement job function classes. It
handles the necessary logic for starting and stopping jobs, including
exception handling and rescheduling of recurring jobs.

This commit also includes the migration of data source jobs to the new
framework.

* Restore using import_string for jobs

Using the 'import_string()' utility from Django allows the job script
class to be simplified, as module imports no longer need to avoid loops.
This should make it easier to queue and maintain jobs.

* Use SyncDataSourceJob for management command

Instead of maintaining two separate job execution logics, the same job
is now used for both background and interactive execution.

* Implement BackgroundJob for running scripts

The independent implementations of interactive and background script
execution have been merged into a single BackgroundJob implementation.

* Fix documentation of model features

* Ensure consitent code style

* Introduce reusable ScheduledJob

A new abstract class can be used to implement job function classes that
specialize in scheduling. These use the same logic as regular
BackgroundJobs, but ensure that they are only scheduled once at any given
time.

* Introduce reusable SystemJob

A new abstract class can be used to implement job function classes that
specialize in system background tasks (e.g. synchronization or
housekeeping). In addition to the features of the BackgroundJob and
ScheduledJob classes, these implement additional logic to not need to be
bound to an existing NetBox object and to setup job schedules on plugin
load instead of an interactive request.

* Add documentation for jobs framework

* Revert "Use SyncDataSourceJob for management"

This partially reverts commit db591d4. The 'run_now' parameter of
'enqueue()' remains, as its being used by following commits.

* Merge enqueued status into JobStatusChoices

* Fix logger for ScriptJob

* Remove job name for scripts

Because scripts are already linked through the Job Instance field, the
name is displayed twice. Removing this reduces redundancy and opens up
the possibility of simplifying the BackgroundJob framework in future
commits.

* Merge ScheduledJob into BackgroundJob

Instead of using separate classes, the logic of ScheduledJob is now
merged into the generic BackgroundJob class. This allows reusing the
same logic, but dynamically deciding whether to enqueue the same job
once or multiple times.

* Add name attribute for BackgroundJob

Instead of defining individual names on enqueue, BackgroundJob classes
can now set a job name in their meta class. This is equivalent to other
Django classes and NetBox scripts.

* Drop enqueue_sync_job() method from DataSource

* Import ScriptJob directly

* Relax requirement for Jobs to reference a specific object

* Rename 'run_now' arg on Job.enqueue() to 'immediate'

* Fix queue lookup in Job enqueue

* Collapse SystemJob into BackgroundJob

* Remove legacy JobResultStatusChoices

ChoiceSet was moved to core in 6a8ffd9.

* Use queue 'low' for system jobs by default

System jobs usually perform low-priority background tasks and therefore
can use a different queue than 'default', which is used for regular jobs
related to specific objects.

* Add test cases for BackgroundJob handling

* Fix enqueue interval jobs

As the job's name is set by enqueue(), it must not be passed in handle()
to avoid duplicate kwargs with the same name.

* Honor schedule_at for job's enqueue_once

Not only can a job's interval change, but so can the time at which it is
scheduled to run. If a specific scheduled time is set, it will also be
checked against the current job schedule. If there are any changes, the
job is rescheduled with the new time.

* Switch BackgroundJob to regular methods

Instead of using a class method for run(), a regular method is used for
this purpose. This gives the possibility to add more convenience methods
in the future, e.g. for interacting with the job object or for logging,
as implemented for scripts.

* Fix background tasks documentation

* Test enqueue in combination with enqueue_once

* Rename background jobs to tasks (to differentiate from RQ)

* Touch up docs

* Revert "Use queue 'low' for system jobs by default"

This reverts commit b17b2050df.

* Remove system background job

This commit reverts commits 4880d81 and 0b15ecf. Using the database
'connection_created' signal for job registration feels a little wrong at
this point, as it would trigger registration very often. However, the
background job framework is prepared for this use case and can be used
by plugins once the auto-registration of jobs is solved.

* Fix runscript management command

Defining names for background jobs was disabled with fb75389. The
preceeding changes in 257976d did forget the management command.

* Use regular imports for ScriptJob

* Rename BackgroundJob to JobRunner

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
2024-07-30 13:31:21 -04:00

134 lines
4.6 KiB
Python

import logging
from abc import ABC, abstractmethod
from datetime import timedelta
from django.utils.functional import classproperty
from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices
from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS
__all__ = (
'JobRunner',
)
class JobRunner(ABC):
"""
Background Job helper class.
This class handles the execution of a background job. It is responsible for maintaining its state, reporting errors,
and scheduling recurring jobs.
"""
class Meta:
pass
def __init__(self, job):
"""
Args:
job: The specific `Job` this `JobRunner` is executing.
"""
self.job = job
@classproperty
def name(cls):
return getattr(cls.Meta, 'name', cls.__name__)
@abstractmethod
def run(self, *args, **kwargs):
"""
Run the job.
A `JobRunner` class needs to implement this method to execute all commands of the job.
"""
pass
@classmethod
def handle(cls, job, *args, **kwargs):
"""
Handle the execution of a `Job`.
This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval`.
"""
try:
job.start()
cls(job).run(*args, **kwargs)
job.terminate()
except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if type(e) is JobTimeoutException:
logging.error(e)
# If the executed job is a periodic job, schedule its next execution at the specified interval.
finally:
if job.interval:
new_scheduled_time = (job.scheduled or job.started) + timedelta(minutes=job.interval)
cls.enqueue(
instance=job.object,
user=job.user,
schedule_at=new_scheduled_time,
interval=job.interval,
**kwargs,
)
@classmethod
def get_jobs(cls, instance=None):
"""
Get all jobs of this `JobRunner` related to a specific instance.
"""
jobs = Job.objects.filter(name=cls.name)
if instance:
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
jobs = jobs.filter(
object_type=object_type,
object_id=instance.pk,
)
return jobs
@classmethod
def enqueue(cls, *args, **kwargs):
"""
Enqueue a new `Job`.
This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for
parameters.
"""
return Job.enqueue(cls.handle, name=cls.name, *args, **kwargs)
@classmethod
@advisory_lock(ADVISORY_LOCK_KEYS['job-schedules'])
def enqueue_once(cls, instance=None, schedule_at=None, interval=None, *args, **kwargs):
"""
Enqueue a new `Job` once, i.e. skip duplicate jobs.
Like `enqueue()`, this method adds a new `Job` to the job queue. However, if there's already a job of this
class scheduled for `instance`, the existing job will be updated if necessary. This ensures that a particular
schedule is only set up once at any given time, i.e. multiple calls to this method are idempotent.
Note that this does not forbid running additional jobs with the `enqueue()` method, e.g. to schedule an
immediate synchronization job in addition to a periodic synchronization schedule.
For additional parameters see `enqueue()`.
Args:
instance: The NetBox object to which this job pertains (optional)
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
"""
job = cls.get_jobs(instance).filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).first()
if job:
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
# delete the existing job and schedule a new job instead.
if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval):
return job
job.delete()
return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs)