From 9dc6099eaf3be9c6e808610b93b060104bd4eec4 Mon Sep 17 00:00:00 2001 From: Alexander Haase Date: Mon, 1 Jul 2024 15:04:49 +0200 Subject: [PATCH] Introduce reusable ScheduledJob A new abstract class can be used to implement job function classes that specialize in scheduling. These use the same logic as regular BackgroundJobs, but ensure that they are only scheduled once at any given time. --- netbox/netbox/constants.py | 3 ++ netbox/utilities/jobs.py | 78 ++++++++++++++++++++++++++++- netbox/utilities/tests/test_jobs.py | 58 +++++++++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 netbox/utilities/tests/test_jobs.py diff --git a/netbox/netbox/constants.py b/netbox/netbox/constants.py index e797f4f29..d4b825d99 100644 --- a/netbox/netbox/constants.py +++ b/netbox/netbox/constants.py @@ -27,6 +27,9 @@ ADVISORY_LOCK_KEYS = { 'wirelesslangroup': 105600, 'inventoryitem': 105700, 'inventoryitemtemplate': 105800, + + # Jobs + 'job-schedules': 110100, } # Default view action permission mapping diff --git a/netbox/utilities/jobs.py b/netbox/utilities/jobs.py index f47b13b4b..eafd5ce5b 100644 --- a/netbox/utilities/jobs.py +++ b/netbox/utilities/jobs.py @@ -2,13 +2,16 @@ import logging from abc import ABC, abstractmethod from datetime import timedelta +from django_pglocks import advisory_lock from rq.timeouts import JobTimeoutException from core.choices import JobStatusChoices -from core.models import Job +from core.models import Job, ObjectType +from netbox.constants import ADVISORY_LOCK_KEYS __all__ = ( 'BackgroundJob', + 'ScheduledJob', ) @@ -70,3 +73,76 @@ class BackgroundJob(ABC): parameters. """ return Job.enqueue(cls.handle, *args, **kwargs) + + +class ScheduledJob(BackgroundJob): + """ + A periodic `BackgroundJob` that is scheduled only once for each configuration. + + This class can be used to schedule a `BackgroundJob` with a specific configuration. However, it will ensure that + this job is scheduled exactly once in the queue of scheduled jobs, i.e. it will be skipped if an instance of this + job is already scheduled. Like a regular `BackgroundJob`, this class also accepts intervals. + + The purpose of this class is to decouple jobs from the usual request-based approach. A practical example of this is + to schedule a periodic synchronization job for a particular object. All that matters is that the job is scheduled + and executed periodically. However, a new periodic job does not need to be scheduled every time the object is saved. + Calling the `schedule()` method of this class will ensure that the job's schedule is set up no matter how often the + method is called. + """ + + ENQUEUED_STATUS = [ + JobStatusChoices.STATUS_PENDING, + JobStatusChoices.STATUS_SCHEDULED, + JobStatusChoices.STATUS_RUNNING, + ] + + @classmethod + def get_jobs(cls, instance): + """ + Get all jobs of this schedule related to a specific instance. + """ + object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) + return Job.objects.filter( + object_type=object_type, + object_id=instance.pk, + name=cls.__name__, + ) + + @classmethod + def enqueue(cls, *args, **kwargs): + """ + Enqueue a new `BackgroundJob`. + + This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for + parameters. Note that specifying a custom `name` is not supported, as a `ScheduledJob` is identified by the job + class `__name__` automatically. + """ + kwargs.pop('name', None) + return super().enqueue(name=cls.__name__, *args, **kwargs) + + @classmethod + @advisory_lock(ADVISORY_LOCK_KEYS['job-schedules']) + def schedule(cls, instance, interval=None, *args, **kwargs): + """ + Schedule a `ScheduledJob`. + + This method adds a new `ScheduledJob` to the job queue. If the job schedule identified by its `instance` and + name is already active, the existing job will be updated if needed. However, this doesn't forbid running + additional jobs using the `enqueue()` method, e.g. to schedule an immediate synchronization job in addition to + periodic synchronization scheduled by this method. + + For additional parameters see `enqueue()`. + + Args: + instance: The NetBox object to which this `ScheduledJob` pertains + interval: Recurrence interval (in minutes) + """ + job = cls.get_jobs(instance).filter(status__in=cls.ENQUEUED_STATUS).first() + if job: + # If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise, + # delete the existing job and schedule a new job instead. + if job.interval == interval: + return job + job.delete() + + return cls.enqueue(instance=instance, interval=interval, *args, **kwargs) diff --git a/netbox/utilities/tests/test_jobs.py b/netbox/utilities/tests/test_jobs.py new file mode 100644 index 000000000..89a5b9b92 --- /dev/null +++ b/netbox/utilities/tests/test_jobs.py @@ -0,0 +1,58 @@ +from datetime import timedelta + +from django.test import TestCase +from django.utils import timezone +from django_rq import get_queue + +from ..jobs import ScheduledJob +from core.models import Job + + +class BackgroundJobTestCase(TestCase): + def tearDown(self): + super().tearDown() + + # Clear all queues after running each test + get_queue('default').connection.flushall() + get_queue('high').connection.flushall() + get_queue('low').connection.flushall() + + @staticmethod + def get_schedule_at(): + # Schedule jobs a week in advance to avoid accidentally running jobs on worker nodes used for testing. + return timezone.now() + timedelta(weeks=1) + + +class ScheduledJobTest(BackgroundJobTestCase): + """ + Test internal logic of `ScheduledJob`. + """ + + class TestScheduledJob(ScheduledJob): + @classmethod + def run(cls, *args, **kwargs): + pass + + def test_schedule(self): + job = self.TestScheduledJob.schedule(instance=Job(), schedule_at=self.get_schedule_at()) + + self.assertIsInstance(job, Job) + self.assertEqual(job.name, self.TestScheduledJob.__name__) + + def test_schedule_twice_same(self): + instance = Job() + job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at()) + job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at()) + + self.assertEqual(job1, job2) + self.assertEqual(self.TestScheduledJob.get_jobs(instance).count(), 1) + + def test_schedule_twice_different(self): + instance = Job() + job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at()) + job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at(), interval=60) + + self.assertNotEqual(job1, job2) + self.assertEqual(job1.interval, None) + self.assertEqual(job2.interval, 60) + self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)