Honor schedule_at for job's enqueue_once

Not only can a job's interval change, but so can the time at which it is
scheduled to run. If a specific scheduled time is set, it will also be
checked against the current job schedule. If there are any changes, the
job is rescheduled with the new time.
This commit is contained in:
Alexander Haase 2024-07-25 15:27:11 +02:00
parent bd4a21cf5c
commit 4c2ba0959f
2 changed files with 22 additions and 9 deletions

View File

@ -99,7 +99,7 @@ class BackgroundJob(ABC):
@classmethod
@advisory_lock(ADVISORY_LOCK_KEYS['job-schedules'])
def enqueue_once(cls, instance=None, interval=None, *args, **kwargs):
def enqueue_once(cls, instance=None, schedule_at=None, interval=None, *args, **kwargs):
"""
Enqueue a new `BackgroundJob` once, i.e. skip duplicate jobs.
@ -115,17 +115,18 @@ class BackgroundJob(ABC):
Args:
instance: The NetBox object to which this `BackgroundJob` pertains (optional)
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
"""
job = cls.get_jobs(instance).filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).first()
if job:
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
# delete the existing job and schedule a new job instead.
if job.interval == interval:
if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval):
return job
job.delete()
return cls.enqueue(instance=instance, interval=interval, *args, **kwargs)
return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs)
@classmethod
def setup(cls, *args, **kwargs):

View File

@ -25,9 +25,9 @@ class BackgroundJobTestCase(TestCase):
get_queue('low').connection.flushall()
@staticmethod
def get_schedule_at():
def get_schedule_at(offset=1):
# Schedule jobs a week in advance to avoid accidentally running jobs on worker nodes used for testing.
return timezone.now() + timedelta(weeks=1)
return timezone.now() + timedelta(weeks=offset)
class BackgroundJobTest(BackgroundJobTestCase):
@ -85,21 +85,33 @@ class EnqueueTest(BackgroundJobTestCase):
def test_enqueue_once_twice_same(self):
instance = Job()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
schedule_at = self.get_schedule_at()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at)
self.assertEqual(job1, job2)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
def test_enqueue_once_twice_different(self):
def test_enqueue_once_twice_different_schedule_at(self):
instance = Job()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at())
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(), interval=60)
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
self.assertNotEqual(job1, job2)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
def test_enqueue_once_twice_different_interval(self):
instance = Job()
schedule_at = self.get_schedule_at()
job1 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestBackgroundJob.enqueue_once(instance, schedule_at=schedule_at, interval=60)
self.assertNotEqual(job1, job2)
self.assertEqual(job1.interval, None)
self.assertEqual(job2.interval, 60)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestBackgroundJob.get_jobs(instance).count(), 1)
def test_enqueue_system(self):
job = TestBackgroundJob.enqueue_once(schedule_at=self.get_schedule_at())