diff --git a/netbox/core/tests/test_views.py b/netbox/core/tests/test_views.py index 16d07f376..de23b3c87 100644 --- a/netbox/core/tests/test_views.py +++ b/netbox/core/tests/test_views.py @@ -1,6 +1,25 @@ +import uuid +from datetime import datetime +from django.contrib.auth.models import User from django.utils import timezone +from django_rq import get_queue +from django.test import override_settings +from django.test.client import Client +from django_rq.settings import QUEUES_MAP, QUEUES_LIST +from django_rq.workers import get_worker +from django.urls import reverse +from rq import get_current_job +from rq.job import Job as RQ_Job +from rq.registry import ( + DeferredJobRegistry, + FailedJobRegistry, + FinishedJobRegistry, + ScheduledJobRegistry, + StartedJobRegistry, +) + +from utilities.testing import TestCase, ViewTestCases, create_tags -from utilities.testing import ViewTestCases, create_tags from ..models import * @@ -87,3 +106,148 @@ class DataFileTestCase( ), ) DataFile.objects.bulk_create(data_files) + + +def test_job_default(): + return 'TestNBJob' + + +def test_job_high(): + return 'TestNBJob' + + +class BackgroundTaskTestCase(TestCase): + user_permissions = () + + def setUp(self): + super().setUp() + self.user.is_staff = True + self.user.is_active = True + self.user.save() + get_queue('default').connection.flushall() + get_queue('high').connection.flushall() + get_queue('low').connection.flushall() + + def test_background_queue_list(self): + response = self.client.get(reverse('core:background_queue_list')) + self.assertEqual(response.status_code, 200) + self.assertIn('default', str(response.content)) + self.assertIn('high', str(response.content)) + self.assertIn('low', str(response.content)) + + def test_background_queue_list_no_perm(self): + self.user.is_staff = False + self.user.save() + response = self.client.get(reverse('core:background_queue_list')) + self.assertEqual(response.status_code, 403) + self.user.is_staff = True + self.user.save() + + def test_background_tasks_list_default(self): + queue = get_queue('default') + job = queue.enqueue(test_job_default) + queue_index = QUEUES_MAP['default'] + + response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued'])) + self.assertEqual(response.status_code, 200) + self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + + def test_background_tasks_list_high(self): + queue = get_queue('high') + job = queue.enqueue(test_job_high) + queue_index = QUEUES_MAP['high'] + + response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued'])) + self.assertEqual(response.status_code, 200) + self.assertIn('core.tests.test_views.test_job_high', str(response.content)) + + def test_background_tasks_list_finished(self): + queue = get_queue('default') + job = queue.enqueue(test_job_default) + queue_index = QUEUES_MAP['default'] + + registry = FinishedJobRegistry(queue.name, queue.connection) + registry.add(job, 2) + response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'finished'])) + self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + + def test_background_tasks_list_failed(self): + queue = get_queue('default') + job = queue.enqueue(test_job_default) + queue_index = QUEUES_MAP['default'] + + registry = FailedJobRegistry(queue.name, queue.connection) + registry.add(job, 2) + response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'failed'])) + self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + + def test_background_tasks_scheduled(self): + queue = get_queue('default') + job = queue.enqueue_at(datetime.now(), test_job_default) + queue_index = QUEUES_MAP['default'] + + response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled'])) + self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + + def test_background_tasks_list_deferred(self): + queue = get_queue('default') + job = queue.enqueue(test_job_default) + queue_index = QUEUES_MAP['default'] + + registry = DeferredJobRegistry(queue.name, queue.connection) + registry.add(job, 2) + response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'deferred'])) + self.assertIn('core.tests.test_views.test_job_default', str(response.content)) + + def test_background_task(self): + queue = get_queue('default') + job = queue.enqueue(test_job_default) + queue_index = QUEUES_MAP['default'] + + response = self.client.get(reverse('core:background_task', args=[job.id])) + self.assertEqual(response.status_code, 200) + self.assertIn('Background Tasks', str(response.content)) + self.assertIn(str(job.id), str(response.content)) + self.assertIn('Callable', str(response.content)) + self.assertIn('Meta', str(response.content)) + self.assertIn('Kwargs', str(response.content)) + + def test_background_task_delete(self): + queue = get_queue('default') + job = queue.enqueue(test_job_default) + queue_index = QUEUES_MAP['default'] + + response = self.client.post(reverse('core:background_task_delete', args=[job.id]), {'confirm': True}) + self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection)) + self.assertNotIn(job.id, queue.job_ids) + + def test_background_task_requeue(self): + pass + + def test_background_task_enqueue(self): + pass + + def test_background_task_stop(self): + pass + + def test_worker_list(self): + worker1 = get_worker('default', name=uuid.uuid4().hex) + worker1.register_birth() + + worker2 = get_worker('high') + worker2.register_birth() + + queue_index = QUEUES_MAP['default'] + response = self.client.get(reverse('core:worker_list', args=[queue_index])) + self.assertIn(str(worker1.name), str(response.content)) + self.assertNotIn(str(worker2.name), str(response.content)) + + def test_worker(self): + worker1 = get_worker('default', name=uuid.uuid4().hex) + worker1.register_birth() + + queue_index = QUEUES_MAP['default'] + response = self.client.get(reverse('core:worker', args=[worker1.name])) + self.assertIn(str(worker1.name), str(response.content)) + self.assertIn('Birth', str(response.content)) + self.assertIn('Total working time (seconds)', str(response.content)) diff --git a/netbox/core/views.py b/netbox/core/views.py index b40d0c7af..776b502ce 100644 --- a/netbox/core/views.py +++ b/netbox/core/views.py @@ -311,7 +311,7 @@ class BackgroundTaskListView(BaseTaskListView): for job_id in job_ids: try: - jobs.append(Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer)) + jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer)) except NoSuchJobError: pass