Introduce reusable ScheduledJob

A new abstract class can be used to implement job function classes that
specialize in scheduling. These use the same logic as regular
BackgroundJobs, but ensure that they are only scheduled once at any given
time.
This commit is contained in:
Alexander Haase 2024-07-01 15:04:49 +02:00
parent 212262dc7d
commit 9dc6099eaf
3 changed files with 138 additions and 1 deletions

View File

@ -27,6 +27,9 @@ ADVISORY_LOCK_KEYS = {
'wirelesslangroup': 105600,
'inventoryitem': 105700,
'inventoryitemtemplate': 105800,
# Jobs
'job-schedules': 110100,
}
# Default view action permission mapping

View File

@ -2,13 +2,16 @@ import logging
from abc import ABC, abstractmethod
from datetime import timedelta
from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices
from core.models import Job
from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS
__all__ = (
'BackgroundJob',
'ScheduledJob',
)
@ -70,3 +73,76 @@ class BackgroundJob(ABC):
parameters.
"""
return Job.enqueue(cls.handle, *args, **kwargs)
class ScheduledJob(BackgroundJob):
"""
A periodic `BackgroundJob` that is scheduled only once for each configuration.
This class can be used to schedule a `BackgroundJob` with a specific configuration. However, it will ensure that
this job is scheduled exactly once in the queue of scheduled jobs, i.e. it will be skipped if an instance of this
job is already scheduled. Like a regular `BackgroundJob`, this class also accepts intervals.
The purpose of this class is to decouple jobs from the usual request-based approach. A practical example of this is
to schedule a periodic synchronization job for a particular object. All that matters is that the job is scheduled
and executed periodically. However, a new periodic job does not need to be scheduled every time the object is saved.
Calling the `schedule()` method of this class will ensure that the job's schedule is set up no matter how often the
method is called.
"""
ENQUEUED_STATUS = [
JobStatusChoices.STATUS_PENDING,
JobStatusChoices.STATUS_SCHEDULED,
JobStatusChoices.STATUS_RUNNING,
]
@classmethod
def get_jobs(cls, instance):
"""
Get all jobs of this schedule related to a specific instance.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
return Job.objects.filter(
object_type=object_type,
object_id=instance.pk,
name=cls.__name__,
)
@classmethod
def enqueue(cls, *args, **kwargs):
"""
Enqueue a new `BackgroundJob`.
This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for
parameters. Note that specifying a custom `name` is not supported, as a `ScheduledJob` is identified by the job
class `__name__` automatically.
"""
kwargs.pop('name', None)
return super().enqueue(name=cls.__name__, *args, **kwargs)
@classmethod
@advisory_lock(ADVISORY_LOCK_KEYS['job-schedules'])
def schedule(cls, instance, interval=None, *args, **kwargs):
"""
Schedule a `ScheduledJob`.
This method adds a new `ScheduledJob` to the job queue. If the job schedule identified by its `instance` and
name is already active, the existing job will be updated if needed. However, this doesn't forbid running
additional jobs using the `enqueue()` method, e.g. to schedule an immediate synchronization job in addition to
periodic synchronization scheduled by this method.
For additional parameters see `enqueue()`.
Args:
instance: The NetBox object to which this `ScheduledJob` pertains
interval: Recurrence interval (in minutes)
"""
job = cls.get_jobs(instance).filter(status__in=cls.ENQUEUED_STATUS).first()
if job:
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
# delete the existing job and schedule a new job instead.
if job.interval == interval:
return job
job.delete()
return cls.enqueue(instance=instance, interval=interval, *args, **kwargs)

View File

@ -0,0 +1,58 @@
from datetime import timedelta
from django.test import TestCase
from django.utils import timezone
from django_rq import get_queue
from ..jobs import ScheduledJob
from core.models import Job
class BackgroundJobTestCase(TestCase):
def tearDown(self):
super().tearDown()
# Clear all queues after running each test
get_queue('default').connection.flushall()
get_queue('high').connection.flushall()
get_queue('low').connection.flushall()
@staticmethod
def get_schedule_at():
# Schedule jobs a week in advance to avoid accidentally running jobs on worker nodes used for testing.
return timezone.now() + timedelta(weeks=1)
class ScheduledJobTest(BackgroundJobTestCase):
"""
Test internal logic of `ScheduledJob`.
"""
class TestScheduledJob(ScheduledJob):
@classmethod
def run(cls, *args, **kwargs):
pass
def test_schedule(self):
job = self.TestScheduledJob.schedule(instance=Job(), schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(job.name, self.TestScheduledJob.__name__)
def test_schedule_twice_same(self):
instance = Job()
job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at())
job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at())
self.assertEqual(job1, job2)
self.assertEqual(self.TestScheduledJob.get_jobs(instance).count(), 1)
def test_schedule_twice_different(self):
instance = Job()
job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at())
job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at(), interval=60)
self.assertNotEqual(job1, job2)
self.assertEqual(job1.interval, None)
self.assertEqual(job2.interval, 60)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)