diff --git a/netbox/core/tests/test_views.py b/netbox/core/tests/test_views.py index 6546fcbb1..b7a951a0f 100644 --- a/netbox/core/tests/test_views.py +++ b/netbox/core/tests/test_views.py @@ -1,25 +1,16 @@ +import logging import uuid from datetime import datetime -from django.contrib.auth.models import User + +from django.urls import reverse from django.utils import timezone from django_rq import get_queue -from django.test import override_settings -from django.test.client import Client -from django_rq.settings import QUEUES_MAP, QUEUES_LIST +from django_rq.settings import QUEUES_MAP from django_rq.workers import get_worker -from django.urls import reverse -from rq import get_current_job from rq.job import Job as RQ_Job, JobStatus -from rq.registry import ( - DeferredJobRegistry, - FailedJobRegistry, - FinishedJobRegistry, - ScheduledJobRegistry, - StartedJobRegistry, -) +from rq.registry import DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry from utilities.testing import TestCase, ViewTestCases, create_tags - from ..models import * @@ -108,21 +99,22 @@ class DataFileTestCase( DataFile.objects.bulk_create(data_files) -def test_job_default(): - return 'TestNBJob' - - -def test_job_high(): - return 'TestNBJob' - - -def test_failing_job(): - raise ValueError - - class BackgroundTaskTestCase(TestCase): user_permissions = () + # Dummy worker functions + @staticmethod + def dummy_job_default(): + return "Job finished" + + @staticmethod + def dummy_job_high(): + return "Job finished" + + @staticmethod + def dummy_job_failing(): + raise Exception("Job failed") + def setUp(self): super().setUp() self.user.is_staff = True @@ -154,64 +146,67 @@ class BackgroundTaskTestCase(TestCase): def test_background_tasks_list_default(self): queue = get_queue('default') - job = queue.enqueue(test_job_default) + queue.enqueue(self.dummy_job_default) queue_index = QUEUES_MAP['default'] response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued'])) self.assertEqual(response.status_code, 200) - self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content)) def test_background_tasks_list_high(self): queue = get_queue('high') - job = queue.enqueue(test_job_high) + queue.enqueue(self.dummy_job_high) queue_index = QUEUES_MAP['high'] response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued'])) self.assertEqual(response.status_code, 200) - self.assertIn('core.tests.test_views.test_job_high', str(response.content)) + self.assertIn('BackgroundTaskTestCase.dummy_job_high', str(response.content)) def test_background_tasks_list_finished(self): queue = get_queue('default') - job = queue.enqueue(test_job_default) + job = queue.enqueue(self.dummy_job_default) queue_index = QUEUES_MAP['default'] registry = FinishedJobRegistry(queue.name, queue.connection) registry.add(job, 2) response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'finished'])) - self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + self.assertEqual(response.status_code, 200) + self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content)) def test_background_tasks_list_failed(self): queue = get_queue('default') - job = queue.enqueue(test_job_default) + job = queue.enqueue(self.dummy_job_default) queue_index = QUEUES_MAP['default'] registry = FailedJobRegistry(queue.name, queue.connection) registry.add(job, 2) response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'failed'])) - self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + self.assertEqual(response.status_code, 200) + self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content)) def test_background_tasks_scheduled(self): queue = get_queue('default') - job = queue.enqueue_at(datetime.now(), test_job_default) + queue.enqueue_at(datetime.now(), self.dummy_job_default) queue_index = QUEUES_MAP['default'] response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled'])) - self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + self.assertEqual(response.status_code, 200) + self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content)) def test_background_tasks_list_deferred(self): queue = get_queue('default') - job = queue.enqueue(test_job_default) + job = queue.enqueue(self.dummy_job_default) queue_index = QUEUES_MAP['default'] registry = DeferredJobRegistry(queue.name, queue.connection) registry.add(job, 2) response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'deferred'])) - self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + self.assertEqual(response.status_code, 200) + self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content)) def test_background_task(self): queue = get_queue('default') - job = queue.enqueue(test_job_default) - queue_index = QUEUES_MAP['default'] + job = queue.enqueue(self.dummy_job_default) response = self.client.get(reverse('core:background_task', args=[job.id])) self.assertEqual(response.status_code, 200) @@ -223,58 +218,55 @@ class BackgroundTaskTestCase(TestCase): def test_background_task_delete(self): queue = get_queue('default') - job = queue.enqueue(test_job_default) - queue_index = QUEUES_MAP['default'] + job = queue.enqueue(self.dummy_job_default) response = self.client.post(reverse('core:background_task_delete', args=[job.id]), {'confirm': True}) + self.assertEqual(response.status_code, 302) self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection)) self.assertNotIn(job.id, queue.job_ids) def test_background_task_requeue(self): queue = get_queue('default') - queue_index = QUEUES_MAP['default'] - job = queue.enqueue(test_failing_job) - - # do those jobs = fail them + # Enqueue & run a job that will fail + job = queue.enqueue(self.dummy_job_failing) worker = get_worker('default') worker.work(burst=True) - self.assertTrue(job.is_failed) - # renqueue failed jobs from failed queue - self.client.get(reverse('core:background_task_requeue', args=[job.id])) + # Re-enqueue the failed job and check that its status has been reset + response = self.client.get(reverse('core:background_task_requeue', args=[job.id])) + self.assertEqual(response.status_code, 302) self.assertFalse(job.is_failed) def test_background_task_enqueue(self): queue = get_queue('default') - queue_index = QUEUES_MAP['default'] - # enqueue some jobs that depends on other - previous_job = None + # Enqueue some jobs that each depends on its predecessor + job = previous_job = None for _ in range(0, 3): - job = queue.enqueue(test_job_default, depends_on=previous_job) + job = queue.enqueue(self.dummy_job_default, depends_on=previous_job) previous_job = job - # This job is deferred - last_job = job - self.assertEqual(last_job.get_status(), JobStatus.DEFERRED) - self.assertIsNone(last_job.enqueued_at) + # Check that the last job to be enqueued has a status of deferred + self.assertIsNotNone(job) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertIsNone(job.enqueued_at) - # We want to force-enqueue this job - self.client.get(reverse('core:background_task_enqueue', args=[last_job.id])) + # Force-enqueue the deferred job + response = self.client.get(reverse('core:background_task_enqueue', args=[job.id])) + self.assertEqual(response.status_code, 302) - # Check that job is updated correctly - last_job = queue.fetch_job(last_job.id) - self.assertEqual(last_job.get_status(), JobStatus.QUEUED) - self.assertIsNotNone(last_job.enqueued_at) + # Check that job's status is updated correctly + job = queue.fetch_job(job.id) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertIsNotNone(job.enqueued_at) def test_background_task_stop(self): queue = get_queue('default') - queue_index = QUEUES_MAP['default'] worker = get_worker('default') - job = queue.enqueue(test_job_default) + job = queue.enqueue(self.dummy_job_default) worker.prepare_job_execution(job) self.assertEqual(job.get_status(), JobStatus.STARTED) @@ -282,7 +274,8 @@ class BackgroundTaskTestCase(TestCase): # Stop those jobs using the view started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection) self.assertEqual(len(started_job_registry), 1) - self.client.get(reverse('core:background_task_stop', args=[job.id])) + response = self.client.get(reverse('core:background_task_stop', args=[job.id])) + self.assertEqual(response.status_code, 302) worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started self.assertEqual(len(started_job_registry), 0) @@ -299,6 +292,7 @@ class BackgroundTaskTestCase(TestCase): queue_index = QUEUES_MAP['default'] response = self.client.get(reverse('core:worker_list', args=[queue_index])) + self.assertEqual(response.status_code, 200) self.assertIn(str(worker1.name), str(response.content)) self.assertNotIn(str(worker2.name), str(response.content)) @@ -306,8 +300,8 @@ class BackgroundTaskTestCase(TestCase): worker1 = get_worker('default', name=uuid.uuid4().hex) worker1.register_birth() - queue_index = QUEUES_MAP['default'] response = self.client.get(reverse('core:worker', args=[worker1.name])) + self.assertEqual(response.status_code, 200) self.assertIn(str(worker1.name), str(response.content)) self.assertIn('Birth', str(response.content)) self.assertIn('Total working time', str(response.content))