14729 add tests

This commit is contained in:
Arthur 2024-01-30 15:05:44 -08:00
parent 74bb0fca01
commit 22ae06acc8
2 changed files with 60 additions and 5 deletions

View File

@ -9,7 +9,7 @@ from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.workers import get_worker from django_rq.workers import get_worker
from django.urls import reverse from django.urls import reverse
from rq import get_current_job from rq import get_current_job
from rq.job import Job as RQ_Job from rq.job import Job as RQ_Job, JobStatus
from rq.registry import ( from rq.registry import (
DeferredJobRegistry, DeferredJobRegistry,
FailedJobRegistry, FailedJobRegistry,
@ -116,6 +116,10 @@ def test_job_high():
return 'TestNBJob' return 'TestNBJob'
def test_failing_job():
raise ValueError
class BackgroundTaskTestCase(TestCase): class BackgroundTaskTestCase(TestCase):
user_permissions = () user_permissions = ()
@ -222,13 +226,64 @@ class BackgroundTaskTestCase(TestCase):
self.assertNotIn(job.id, queue.job_ids) self.assertNotIn(job.id, queue.job_ids)
def test_background_task_requeue(self): def test_background_task_requeue(self):
pass queue = get_queue('default')
queue_index = QUEUES_MAP['default']
job = queue.enqueue(test_failing_job)
# do those jobs = fail them
worker = get_worker('default')
worker.work(burst=True)
self.assertTrue(job.is_failed)
# renqueue failed jobs from failed queue
self.client.get(reverse('core:background_task_requeue', args=[job.id]))
self.assertFalse(job.is_failed)
def test_background_task_enqueue(self): def test_background_task_enqueue(self):
pass queue = get_queue('default')
queue_index = QUEUES_MAP['default']
# enqueue some jobs that depends on other
previous_job = None
for _ in range(0, 3):
job = queue.enqueue(test_job_default, depends_on=previous_job)
previous_job = job
# This job is deferred
last_job = job
self.assertEqual(last_job.get_status(), JobStatus.DEFERRED)
self.assertIsNone(last_job.enqueued_at)
# We want to force-enqueue this job
self.client.get(reverse('core:background_task_enqueue', args=[last_job.id]))
# Check that job is updated correctly
last_job = queue.fetch_job(last_job.id)
self.assertEqual(last_job.get_status(), JobStatus.QUEUED)
self.assertIsNotNone(last_job.enqueued_at)
def test_background_task_stop(self): def test_background_task_stop(self):
pass queue = get_queue('default')
queue_index = QUEUES_MAP['default']
worker = get_worker('default')
job = queue.enqueue(test_job_default)
worker.prepare_job_execution(job)
self.assertEqual(job.get_status(), JobStatus.STARTED)
# Stop those jobs using the view
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(started_job_registry), 1)
self.client.get(reverse('core:background_task_stop', args=[job.id]))
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
self.assertEqual(len(started_job_registry), 0)
canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(canceled_job_registry), 1)
self.assertIn(job.id, canceled_job_registry)
def test_worker_list(self): def test_worker_list(self):
worker1 = get_worker('default', name=uuid.uuid4().hex) worker1 = get_worker('default', name=uuid.uuid4().hex)

View File

@ -19,7 +19,7 @@ from netbox.views.generic.mixins import ActionsMixin, TableMixin
from rq import requeue_job from rq import requeue_job
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job from rq.job import Job as RQ_Job, JobStatus
from rq.registry import ( from rq.registry import (
DeferredJobRegistry, DeferredJobRegistry,
FailedJobRegistry, FailedJobRegistry,