From 4c2ba0959fd723b458d2fc934f0fbd534360b43e Mon Sep 17 00:00:00 2001 From: Alexander Haase Date: Thu, 25 Jul 2024 15:27:11 +0200 Subject: [PATCH] Honor schedule_at for job's enqueue_once Not only can a job's interval change, but so can the time at which it is scheduled to run. If a specific scheduled time is set, it will also be checked against the current job schedule. If there are any changes, the job is rescheduled with the new time. --- netbox/utilities/jobs.py | 7 ++++--- netbox/utilities/tests/test_jobs.py | 24 ++++++++++++++++++------ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/netbox/utilities/jobs.py b/netbox/utilities/jobs.py index b08dbe760..251eb395c 100644 --- a/netbox/utilities/jobs.py +++ b/netbox/utilities/jobs.py @@ -99,7 +99,7 @@ class BackgroundJob(ABC): @classmethod @advisory_lock(ADVISORY_LOCK_KEYS['job-schedules']) - def enqueue_once(cls, instance=None, interval=None, *args, **kwargs): + def enqueue_once(cls, instance=None, schedule_at=None, interval=None, *args, **kwargs): """ Enqueue a new `BackgroundJob` once, i.e. skip duplicate jobs. @@ -115,17 +115,18 @@ class BackgroundJob(ABC): Args: instance: The NetBox object to which this `BackgroundJob` pertains (optional) + schedule_at: Schedule the job to be executed at the passed date and time interval: Recurrence interval (in minutes) """ job = cls.get_jobs(instance).filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).first() if job: # If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise, # delete the existing job and schedule a new job instead. - if job.interval == interval: + if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval): return job job.delete() - return cls.enqueue(instance=instance, interval=interval, *args, **kwargs) + return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs) @classmethod def setup(cls, *args, **kwargs): diff --git a/netbox/utilities/tests/test_jobs.py b/netbox/utilities/tests/test_jobs.py index 422265032..cb0acd76a 100644 --- a/netbox/utilities/tests/test_jobs.py +++ b/netbox/utilities/tests/test_jobs.py @@ -25,9 +25,9 @@ class BackgroundJobTestCase(TestCase): get_queue('low').connection.flushall() @staticmethod - def get_schedule_at(): + def get_schedule_at(offset=1): # Schedule jobs a week in advance to avoid accidentally running jobs on worker nodes used for testing. - return timezone.now() + timedelta(weeks=1) + return timezone.now() + timedelta(weeks=offset) class BackgroundJobTest(BackgroundJobTestCase): @@ -85,21 +85,33 @@ class EnqueueTest(BackgroundJobTestCase): def test_enqueue_once_twice_same(self): instance = Job() - job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at()) - job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at()) + schedule_at = self.get_schedule_at() + job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at) + job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at) self.assertEqual(job1, job2) self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1) - def test_enqueue_once_twice_different(self): + def test_enqueue_once_twice_different_schedule_at(self): instance = Job() job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at()) - job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(), interval=60) + job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(2)) + + self.assertNotEqual(job1, job2) + self.assertRaises(Job.DoesNotExist, job1.refresh_from_db) + self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1) + + def test_enqueue_once_twice_different_interval(self): + instance = Job() + schedule_at = self.get_schedule_at() + job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at) + job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at, interval=60) self.assertNotEqual(job1, job2) self.assertEqual(job1.interval, None) self.assertEqual(job2.interval, 60) self.assertRaises(Job.DoesNotExist, job1.refresh_from_db) + self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1) def test_enqueue_system(self): job = TestBackgroundJob.enqueue_once(schedule_at=self.get_schedule_at())