From 22ae06acc844815dced955400bc84c0242bd84df Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 30 Jan 2024 15:05:44 -0800 Subject: [PATCH] 14729 add tests --- netbox/core/tests/test_views.py | 63 ++++++++++++++++++++++++++++++--- netbox/core/views.py | 2 +- 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/netbox/core/tests/test_views.py b/netbox/core/tests/test_views.py index de23b3c87..dab8a94cd 100644 --- a/netbox/core/tests/test_views.py +++ b/netbox/core/tests/test_views.py @@ -9,7 +9,7 @@ from django_rq.settings import QUEUES_MAP, QUEUES_LIST from django_rq.workers import get_worker from django.urls import reverse from rq import get_current_job -from rq.job import Job as RQ_Job +from rq.job import Job as RQ_Job, JobStatus from rq.registry import ( DeferredJobRegistry, FailedJobRegistry, @@ -116,6 +116,10 @@ def test_job_high(): return 'TestNBJob' +def test_failing_job(): + raise ValueError + + class BackgroundTaskTestCase(TestCase): user_permissions = () @@ -222,13 +226,64 @@ class BackgroundTaskTestCase(TestCase): self.assertNotIn(job.id, queue.job_ids) def test_background_task_requeue(self): - pass + queue = get_queue('default') + queue_index = QUEUES_MAP['default'] + + job = queue.enqueue(test_failing_job) + + # do those jobs = fail them + worker = get_worker('default') + worker.work(burst=True) + + self.assertTrue(job.is_failed) + + # renqueue failed jobs from failed queue + self.client.get(reverse('core:background_task_requeue', args=[job.id])) + self.assertFalse(job.is_failed) def test_background_task_enqueue(self): - pass + queue = get_queue('default') + queue_index = QUEUES_MAP['default'] + + # enqueue some jobs that depends on other + previous_job = None + for _ in range(0, 3): + job = queue.enqueue(test_job_default, depends_on=previous_job) + previous_job = job + + # This job is deferred + last_job = job + self.assertEqual(last_job.get_status(), JobStatus.DEFERRED) + self.assertIsNone(last_job.enqueued_at) + + # We want to force-enqueue this job + self.client.get(reverse('core:background_task_enqueue', args=[last_job.id])) + + # Check that job is updated correctly + last_job = queue.fetch_job(last_job.id) + self.assertEqual(last_job.get_status(), JobStatus.QUEUED) + self.assertIsNotNone(last_job.enqueued_at) def test_background_task_stop(self): - pass + queue = get_queue('default') + queue_index = QUEUES_MAP['default'] + + worker = get_worker('default') + job = queue.enqueue(test_job_default) + worker.prepare_job_execution(job) + + self.assertEqual(job.get_status(), JobStatus.STARTED) + + # Stop those jobs using the view + started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection) + self.assertEqual(len(started_job_registry), 1) + self.client.get(reverse('core:background_task_stop', args=[job.id])) + worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started + self.assertEqual(len(started_job_registry), 0) + + canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection) + self.assertEqual(len(canceled_job_registry), 1) + self.assertIn(job.id, canceled_job_registry) def test_worker_list(self): worker1 = get_worker('default', name=uuid.uuid4().hex) diff --git a/netbox/core/views.py b/netbox/core/views.py index 776b502ce..d60b2fcff 100644 --- a/netbox/core/views.py +++ b/netbox/core/views.py @@ -19,7 +19,7 @@ from netbox.views.generic.mixins import ActionsMixin, TableMixin from rq import requeue_job from rq.exceptions import NoSuchJobError -from rq.job import Job as RQ_Job +from rq.job import Job as RQ_Job, JobStatus from rq.registry import ( DeferredJobRegistry, FailedJobRegistry,