diff --git a/netbox/netbox/constants.py b/netbox/netbox/constants.py index e797f4f29..d4b825d99 100644 --- a/netbox/netbox/constants.py +++ b/netbox/netbox/constants.py @@ -27,6 +27,9 @@ ADVISORY_LOCK_KEYS = { 'wirelesslangroup': 105600, 'inventoryitem': 105700, 'inventoryitemtemplate': 105800, + + # Jobs + 'job-schedules': 110100, } # Default view action permission mapping diff --git a/netbox/utilities/jobs.py b/netbox/utilities/jobs.py index f47b13b4b..eafd5ce5b 100644 --- a/netbox/utilities/jobs.py +++ b/netbox/utilities/jobs.py @@ -2,13 +2,16 @@ import logging from abc import ABC, abstractmethod from datetime import timedelta +from django_pglocks import advisory_lock from rq.timeouts import JobTimeoutException from core.choices import JobStatusChoices -from core.models import Job +from core.models import Job, ObjectType +from netbox.constants import ADVISORY_LOCK_KEYS __all__ = ( 'BackgroundJob', + 'ScheduledJob', ) @@ -70,3 +73,76 @@ class BackgroundJob(ABC): parameters. """ return Job.enqueue(cls.handle, *args, **kwargs) + + +class ScheduledJob(BackgroundJob): + """ + A periodic `BackgroundJob` that is scheduled only once for each configuration. + + This class can be used to schedule a `BackgroundJob` with a specific configuration. However, it will ensure that + this job is scheduled exactly once in the queue of scheduled jobs, i.e. it will be skipped if an instance of this + job is already scheduled. Like a regular `BackgroundJob`, this class also accepts intervals. + + The purpose of this class is to decouple jobs from the usual request-based approach. A practical example of this is + to schedule a periodic synchronization job for a particular object. All that matters is that the job is scheduled + and executed periodically. However, a new periodic job does not need to be scheduled every time the object is saved. + Calling the `schedule()` method of this class will ensure that the job's schedule is set up no matter how often the + method is called. + """ + + ENQUEUED_STATUS = [ + JobStatusChoices.STATUS_PENDING, + JobStatusChoices.STATUS_SCHEDULED, + JobStatusChoices.STATUS_RUNNING, + ] + + @classmethod + def get_jobs(cls, instance): + """ + Get all jobs of this schedule related to a specific instance. + """ + object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) + return Job.objects.filter( + object_type=object_type, + object_id=instance.pk, + name=cls.__name__, + ) + + @classmethod + def enqueue(cls, *args, **kwargs): + """ + Enqueue a new `BackgroundJob`. + + This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for + parameters. Note that specifying a custom `name` is not supported, as a `ScheduledJob` is identified by the job + class `__name__` automatically. + """ + kwargs.pop('name', None) + return super().enqueue(name=cls.__name__, *args, **kwargs) + + @classmethod + @advisory_lock(ADVISORY_LOCK_KEYS['job-schedules']) + def schedule(cls, instance, interval=None, *args, **kwargs): + """ + Schedule a `ScheduledJob`. + + This method adds a new `ScheduledJob` to the job queue. If the job schedule identified by its `instance` and + name is already active, the existing job will be updated if needed. However, this doesn't forbid running + additional jobs using the `enqueue()` method, e.g. to schedule an immediate synchronization job in addition to + periodic synchronization scheduled by this method. + + For additional parameters see `enqueue()`. + + Args: + instance: The NetBox object to which this `ScheduledJob` pertains + interval: Recurrence interval (in minutes) + """ + job = cls.get_jobs(instance).filter(status__in=cls.ENQUEUED_STATUS).first() + if job: + # If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise, + # delete the existing job and schedule a new job instead. + if job.interval == interval: + return job + job.delete() + + return cls.enqueue(instance=instance, interval=interval, *args, **kwargs) diff --git a/netbox/utilities/tests/test_jobs.py b/netbox/utilities/tests/test_jobs.py new file mode 100644 index 000000000..89a5b9b92 --- /dev/null +++ b/netbox/utilities/tests/test_jobs.py @@ -0,0 +1,58 @@ +from datetime import timedelta + +from django.test import TestCase +from django.utils import timezone +from django_rq import get_queue + +from ..jobs import ScheduledJob +from core.models import Job + + +class BackgroundJobTestCase(TestCase): + def tearDown(self): + super().tearDown() + + # Clear all queues after running each test + get_queue('default').connection.flushall() + get_queue('high').connection.flushall() + get_queue('low').connection.flushall() + + @staticmethod + def get_schedule_at(): + # Schedule jobs a week in advance to avoid accidentally running jobs on worker nodes used for testing. + return timezone.now() + timedelta(weeks=1) + + +class ScheduledJobTest(BackgroundJobTestCase): + """ + Test internal logic of `ScheduledJob`. + """ + + class TestScheduledJob(ScheduledJob): + @classmethod + def run(cls, *args, **kwargs): + pass + + def test_schedule(self): + job = self.TestScheduledJob.schedule(instance=Job(), schedule_at=self.get_schedule_at()) + + self.assertIsInstance(job, Job) + self.assertEqual(job.name, self.TestScheduledJob.__name__) + + def test_schedule_twice_same(self): + instance = Job() + job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at()) + job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at()) + + self.assertEqual(job1, job2) + self.assertEqual(self.TestScheduledJob.get_jobs(instance).count(), 1) + + def test_schedule_twice_different(self): + instance = Job() + job1 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at()) + job2 = self.TestScheduledJob.schedule(instance, schedule_at=self.get_schedule_at(), interval=60) + + self.assertNotEqual(job1, job2) + self.assertEqual(job1.interval, None) + self.assertEqual(job2.interval, 60) + self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)