Rename BackgroundJob to JobRunner

This commit is contained in:
Jeremy Stretch 2024-07-30 13:14:51 -04:00
parent 7f0a4e3c34
commit 85b9f65115
5 changed files with 62 additions and 60 deletions

View File

@ -2,20 +2,21 @@
NetBox plugins can defer certain operations by enqueuing [background jobs](../../features/background-jobs.md), which are executed asynchronously by background workers. This is helpful for decoupling long-running processes from the user-facing request-response cycle.
For example, your plugin might need to fetch data from a remote system. Depending on the amount of data and the responsiveness of the remote server, this could take a few minutes. Deferring this task to a background job ensures that it can be completed in the background, without interrupting the user. The data it fetches can be made available once the job has completed.
For example, your plugin might need to fetch data from a remote system. Depending on the amount of data and the responsiveness of the remote server, this could take a few minutes. Deferring this task to a queued job ensures that it can be completed in the background, without interrupting the user. The data it fetches can be made available once the job has completed.
## Background Job
## Job Runners
A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `BackgroundJob` class.
A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `JobRunner` class.
::: utilities.jobs.BackgroundJob
::: utilities.jobs.JobRunner
#### Example
```python title="jobs.py"
from utilities.jobs import BackgroundJob
from utilities.jobs import JobRunner
class MyTestJob(BackgroundJob):
class MyTestJob(JobRunner):
class Meta:
name = "My Test Job"
@ -26,9 +27,9 @@ class MyTestJob(BackgroundJob):
You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.
### Job Attributes
### Attributes
Background job attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.
`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.
#### `name`
@ -44,15 +45,17 @@ As described above, jobs can be scheduled for immediate execution or at any late
#### Example
```python title="jobs.py"
from utilities.jobs import BackgroundJob
from utilities.jobs import JobRunner
class MyHousekeepingJob(BackgroundJob):
class MyHousekeepingJob(JobRunner):
class Meta:
name = "Housekeeping"
def run(self, *args, **kwargs):
# your logic goes here
```
```python title="__init__.py"
from netbox.plugins import PluginConfig

View File

@ -1,7 +1,7 @@
import logging
from netbox.search.backends import search_backend
from utilities.jobs import BackgroundJob
from utilities.jobs import JobRunner
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
@ -9,7 +9,7 @@ from .models import DataSource
logger = logging.getLogger(__name__)
class SyncDataSourceJob(BackgroundJob):
class SyncDataSourceJob(JobRunner):
"""
Call sync() on a DataSource.
"""

View File

@ -9,11 +9,11 @@ from extras.models import Script as ScriptModel
from extras.signals import clear_events
from netbox.context_managers import event_tracking
from utilities.exceptions import AbortScript, AbortTransaction
from utilities.jobs import BackgroundJob
from utilities.jobs import JobRunner
from .utils import is_report
class ScriptJob(BackgroundJob):
class ScriptJob(JobRunner):
"""
Script execution job.

View File

@ -11,11 +11,11 @@ from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS
__all__ = (
'BackgroundJob',
'JobRunner',
)
class BackgroundJob(ABC):
class JobRunner(ABC):
"""
Background Job helper class.
@ -29,7 +29,7 @@ class BackgroundJob(ABC):
def __init__(self, job):
"""
Args:
job: The specific `Job` this `BackgroundJob` helper class is executing.
job: The specific `Job` this `JobRunner` is executing.
"""
self.job = job
@ -42,17 +42,17 @@ class BackgroundJob(ABC):
"""
Run the job.
A `BackgroundJob` class needs to implement this method to execute all commands of the job.
A `JobRunner` class needs to implement this method to execute all commands of the job.
"""
pass
@classmethod
def handle(cls, job, *args, **kwargs):
"""
Handle the execution of a `BackgroundJob`.
Handle the execution of a `Job`.
This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval'.
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval`.
"""
try:
job.start()
@ -79,7 +79,7 @@ class BackgroundJob(ABC):
@classmethod
def get_jobs(cls, instance=None):
"""
Get all jobs of this `BackgroundJob` related to a specific instance.
Get all jobs of this `JobRunner` related to a specific instance.
"""
jobs = Job.objects.filter(name=cls.name)
@ -95,7 +95,7 @@ class BackgroundJob(ABC):
@classmethod
def enqueue(cls, *args, **kwargs):
"""
Enqueue a new `BackgroundJob`.
Enqueue a new `Job`.
This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for
parameters.
@ -106,12 +106,11 @@ class BackgroundJob(ABC):
@advisory_lock(ADVISORY_LOCK_KEYS['job-schedules'])
def enqueue_once(cls, instance=None, schedule_at=None, interval=None, *args, **kwargs):
"""
Enqueue a new `BackgroundJob` once, i.e. skip duplicate jobs.
Enqueue a new `Job` once, i.e. skip duplicate jobs.
Like `enqueue()`, this method adds a new `BackgroundJob` to the job queue. However, if there's already a
`BackgroundJob` of this class scheduled for `instance`, the existing job will be updated if necessary. This
ensures that a particular schedule is only set up once at any given time, i.e. multiple calls to this method are
idempotent.
Like `enqueue()`, this method adds a new `Job` to the job queue. However, if there's already a job of this
class scheduled for `instance`, the existing job will be updated if necessary. This ensures that a particular
schedule is only set up once at any given time, i.e. multiple calls to this method are idempotent.
Note that this does not forbid running additional jobs with the `enqueue()` method, e.g. to schedule an
immediate synchronization job in addition to a periodic synchronization schedule.
@ -119,7 +118,7 @@ class BackgroundJob(ABC):
For additional parameters see `enqueue()`.
Args:
instance: The NetBox object to which this `BackgroundJob` pertains (optional)
instance: The NetBox object to which this job pertains (optional)
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
"""

View File

@ -9,12 +9,12 @@ from core.models import Job
from core.choices import JobStatusChoices
class TestBackgroundJob(BackgroundJob):
class TestJobRunner(JobRunner):
def run(self, *args, **kwargs):
pass
class BackgroundJobTestCase(TestCase):
class JobRunnerTestCase(TestCase):
def tearDown(self):
super().tearDown()
@ -29,101 +29,101 @@ class BackgroundJobTestCase(TestCase):
return timezone.now() + timedelta(weeks=offset)
class BackgroundJobTest(BackgroundJobTestCase):
class JobRunnerTest(JobRunnerTestCase):
"""
Test internal logic of `BackgroundJob`.
Test internal logic of `JobRunner`.
"""
def test_name_default(self):
self.assertEqual(TestBackgroundJob.name, TestBackgroundJob.__name__)
self.assertEqual(TestJobRunner.name, TestJobRunner.__name__)
def test_name_set(self):
class NamedBackgroundJob(TestBackgroundJob):
class NamedJobRunner(TestJobRunner):
class Meta:
name = 'TestName'
self.assertEqual(NamedBackgroundJob.name, 'TestName')
self.assertEqual(NamedJobRunner.name, 'TestName')
def test_handle(self):
job = TestBackgroundJob.enqueue(immediate=True)
job = TestJobRunner.enqueue(immediate=True)
self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
def test_handle_errored(self):
class ErroredBackgroundJob(TestBackgroundJob):
class ErroredJobRunner(TestJobRunner):
EXP = Exception('Test error')
def run(self, *args, **kwargs):
raise self.EXP
job = ErroredBackgroundJob.enqueue(immediate=True)
job = ErroredJobRunner.enqueue(immediate=True)
self.assertEqual(job.status, JobStatusChoices.STATUS_ERRORED)
self.assertEqual(job.error, repr(ErroredBackgroundJob.EXP))
self.assertEqual(job.error, repr(ErroredJobRunner.EXP))
class EnqueueTest(BackgroundJobTestCase):
class EnqueueTest(JobRunnerTestCase):
"""
Test enqueuing of `BackgroundJob`.
Test enqueuing of `JobRunner`.
"""
def test_enqueue(self):
instance = Job()
for i in range(1, 3):
job = TestBackgroundJob.enqueue(instance, schedule_at=self.get_schedule_at())
job = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), i)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), i)
def test_enqueue_once(self):
job = TestBackgroundJob.enqueue_once(instance=Job(), schedule_at=self.get_schedule_at())
job = TestJobRunner.enqueue_once(instance=Job(), schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(job.name, TestBackgroundJob.__name__)
self.assertEqual(job.name, TestJobRunner.__name__)
def test_enqueue_once_twice_same(self):
instance = Job()
schedule_at = self.get_schedule_at()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at)
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
self.assertEqual(job1, job2)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
def test_enqueue_once_twice_different_schedule_at(self):
instance = Job()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at())
job2 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
self.assertNotEqual(job1, job2)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
def test_enqueue_once_twice_different_interval(self):
instance = Job()
schedule_at = self.get_schedule_at()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at, interval=60)
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at, interval=60)
self.assertNotEqual(job1, job2)
self.assertEqual(job1.interval, None)
self.assertEqual(job2.interval, 60)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
def test_enqueue_once_with_enqueue(self):
instance = Job()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
job2 = TestBackgroundJob.enqueue(instance, schedule_at=self.get_schedule_at())
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
job2 = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
self.assertNotEqual(job1, job2)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 2)
def test_enqueue_once_after_enqueue(self):
instance = Job()
job1 = TestBackgroundJob.enqueue(instance, schedule_at=self.get_schedule_at())
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
job1 = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
job2 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
self.assertNotEqual(job1, job2)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)