Merge ScheduledJob into BackgroundJob

Instead of using separate classes, the logic of ScheduledJob is now
merged into the generic BackgroundJob class. This allows reusing the
same logic, but dynamically deciding whether to enqueue the same job
once or multiple times.
This commit is contained in:
Alexander Haase 2024-07-24 14:37:30 +02:00
parent 257976d815
commit 58089c726a
3 changed files with 48 additions and 67 deletions

View File

@ -27,11 +27,9 @@ You can schedule the background job from within your code (e.g. from a model's `
::: core.models.Job.enqueue
### Scheduled Job
#### Scheduled Jobs
During execution, a scheduled job behaves like a background job and is therefore implemented in the same way, but must be subclassed from NetBox's `ScheduledJob` class.
However, for management purposes, a `schedule()` method allows a schedule to be set exactly once to avoid duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `schedule()` method are identical to those of `enqueue()`. Note that this class doesn't allow you to pass the `name` parameter for both methods, but uses a generic name instead.
As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`. Note that this class doesn't allow you to pass the `name` parameter, but uses a generic name instead.
!!! tip
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.

View File

@ -12,7 +12,6 @@ from netbox.constants import ADVISORY_LOCK_KEYS
__all__ = (
'BackgroundJob',
'ScheduledJob',
'SystemJob',
)
@ -66,36 +65,10 @@ class BackgroundJob(ABC):
**kwargs,
)
@classmethod
def enqueue(cls, *args, **kwargs):
"""
Enqueue a new `BackgroundJob`.
This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for
parameters.
"""
return Job.enqueue(cls.handle, *args, **kwargs)
class ScheduledJob(BackgroundJob):
"""
A periodic `BackgroundJob` that is scheduled only once for each configuration.
This class can be used to schedule a `BackgroundJob` with a specific configuration. However, it will ensure that
this job is scheduled exactly once in the queue of scheduled jobs, i.e. it will be skipped if an instance of this
job is already scheduled. Like a regular `BackgroundJob`, this class also accepts intervals.
The purpose of this class is to decouple jobs from the usual request-based approach. A practical example of this is
to schedule a periodic synchronization job for a particular object. All that matters is that the job is scheduled
and executed periodically. However, a new periodic job does not need to be scheduled every time the object is saved.
Calling the `schedule()` method of this class will ensure that the job's schedule is set up no matter how often the
method is called.
"""
@classmethod
def get_jobs(cls, instance):
"""
Get all jobs of this schedule related to a specific instance.
Get all jobs of this `BackgroundJob` related to a specific instance.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
return Job.objects.filter(
@ -110,27 +83,28 @@ class ScheduledJob(BackgroundJob):
Enqueue a new `BackgroundJob`.
This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for
parameters. Note that specifying a custom `name` is not supported, as a `ScheduledJob` is identified by the job
class `__name__` automatically.
parameters.
"""
kwargs.pop('name', None)
return super().enqueue(name=cls.__name__, *args, **kwargs)
return Job.enqueue(cls.handle, *args, **kwargs)
@classmethod
@advisory_lock(ADVISORY_LOCK_KEYS['job-schedules'])
def schedule(cls, instance, interval=None, *args, **kwargs):
def enqueue_once(cls, instance, interval=None, *args, **kwargs):
"""
Schedule a `ScheduledJob`.
Enqueue a new `BackgroundJob` once, i.e. skip duplicate jobs.
This method adds a new `ScheduledJob` to the job queue. If the job schedule identified by its `instance` and
name is already active, the existing job will be updated if needed. However, this doesn't forbid running
additional jobs using the `enqueue()` method, e.g. to schedule an immediate synchronization job in addition to
periodic synchronization scheduled by this method.
Like `enqueue()`, this method adds a new `BackgroundJob` to the job queue. However, if there's already a
`BackgroundJob` of this class scheduled for `instance`, the existing job will be updated if necessary. This
ensures that a particular schedule is only set up once at any given time, i.e. multiple calls to this method are
idempotent.
Note that this does not forbid running additional jobs with the `enqueue()` method, e.g. to schedule an
immediate synchronization job in addition to a periodic synchronization schedule.
For additional parameters see `enqueue()`.
Args:
instance: The NetBox object to which this `ScheduledJob` pertains
instance: The NetBox object to which this `BackgroundJob` pertains
interval: Recurrence interval (in minutes)
"""
job = cls.get_jobs(instance).filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).first()
@ -141,10 +115,10 @@ class ScheduledJob(BackgroundJob):
return job
job.delete()
return cls.enqueue(instance=instance, interval=interval, *args, **kwargs)
return cls.enqueue(instance=instance, name=cls.__name__, interval=interval, *args, **kwargs)
class SystemJob(ScheduledJob):
class SystemJob(BackgroundJob):
"""
A `ScheduledJob` not being bound to any particular NetBox object.
@ -166,9 +140,9 @@ class SystemJob(ScheduledJob):
return super().enqueue(instance=Job(), *args, **kwargs)
@classmethod
def schedule(cls, *args, **kwargs):
def enqueue_once(cls, *args, **kwargs):
kwargs.pop('instance', None)
return super().schedule(instance=Job(), *args, **kwargs)
return super().enqueue_once(instance=Job(), *args, **kwargs)
@classmethod
def handle(cls, job, *args, **kwargs):
@ -185,6 +159,6 @@ class SystemJob(ScheduledJob):
Setup a new `SystemJob` during plugin initialization.
This method should be called from the plugins `ready()` function to setup the schedule as early as possible. For
interactive setup of schedules (e.g. on user requests), either use `schedule()` or `enqueue()` instead.
interactive setup of schedules (e.g. on user requests), either use `enqueue()` or `enqueue_once()` instead.
"""
connection_created.connect(lambda sender, **signal_kwargs: cls.schedule(*args, **kwargs))
connection_created.connect(lambda sender, **signal_kwargs: cls.enqueue_once(*args, **kwargs))

View File

@ -8,6 +8,12 @@ from ..jobs import *
from core.models import Job
class TestBackgroundJob(BackgroundJob):
@classmethod
def run(cls, *args, **kwargs):
pass
class BackgroundJobTestCase(TestCase):
def tearDown(self):
super().tearDown()
@ -23,34 +29,37 @@ class BackgroundJobTestCase(TestCase):
return timezone.now() + timedelta(weeks=1)
class ScheduledJobTest(BackgroundJobTestCase):
class EnqueueTest(BackgroundJobTestCase):
"""
Test internal logic of `ScheduledJob`.
Test enqueuing of `BackgroundJob`.
"""
class TestScheduledJob(ScheduledJob):
@classmethod
def run(cls, *args, **kwargs):
pass
def test_enqueue(self):
instance = Job()
for i in range(1, 3):
job = TestBackgroundJob.enqueue(instance, schedule_at=self.get_schedule_at())
def test_schedule(self):
job = self.TestScheduledJob.schedule(instance=Job(), schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(Job.objects.count(), i)
def test_enqueue_once(self):
job = TestBackgroundJob.enqueue_once(instance=Job(), schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(job.name, self.TestScheduledJob.__name__)
self.assertEqual(job.name, TestBackgroundJob.__name__)
def test_schedule_twice_same(self):
def test_enqueue_once_twice_same(self):
instance = Job()
job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at())
job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at())
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
self.assertEqual(job1, job2)
self.assertEqual(self.TestScheduledJob.get_jobs(instance).count(), 1)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
def test_schedule_twice_different(self):
def test_enqueue_once_twice_different(self):
instance = Job()
job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at())
job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at(), interval=60)
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(), interval=60)
self.assertNotEqual(job1, job2)
self.assertEqual(job1.interval, None)
@ -68,8 +77,8 @@ class SystemJobTest(BackgroundJobTestCase):
def run(cls, *args, **kwargs):
pass
def test_schedule(self):
job = self.TestSystemJob.schedule(schedule_at=self.get_schedule_at())
def test_enqueue_once(self):
job = self.TestSystemJob.enqueue_once(schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(job.object, None)