7848 tests

This commit is contained in:
Arthur Hanson 2024-11-06 16:25:33 -08:00
parent 38132a55f3
commit b2f367d329
4 changed files with 219 additions and 21 deletions

View File

@ -17,11 +17,11 @@ class BackgroundTaskSerializer(serializers.Serializer):
result = serializers.CharField()
timeout = serializers.IntegerField()
result_ttl = serializers.IntegerField()
created_at = serializers.CharField()
enqueued_at = serializers.CharField()
started_at = serializers.CharField()
ended_at = serializers.DictField()
worker_name = serializers.DictField()
created_at = serializers.DateTimeField()
enqueued_at = serializers.DateTimeField()
started_at = serializers.DateTimeField()
ended_at = serializers.DateTimeField()
worker_name = serializers.CharField()
position = serializers.SerializerMethodField()
status = serializers.SerializerMethodField()
meta = serializers.DictField()

View File

@ -14,20 +14,20 @@ router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet)
urlpatterns = (
path('background-queues/', views.QueueListView.as_view()),
path('background-workers/', views.WorkerListView.as_view()),
path('background-workers/<str:worker_name>/', views.WorkerDetailView.as_view()),
path('background-tasks/<str:queue_name>/', views.TaskListView.as_view()),
path('background-tasks/<str:queue_name>/deferred/', views.DeferredTaskListView.as_view()),
path('background-tasks/<str:queue_name>/failed/', views.FailedTaskListView.as_view()),
path('background-tasks/<str:queue_name>/finished/', views.FinishedTaskListView.as_view()),
path('background-tasks/<str:queue_name>/started/', views.StartedTaskListView.as_view()),
path('background-tasks/<str:queue_name>/queued/', views.QueuedTaskListView.as_view()),
path('background-task/<str:task_id>/', views.TaskDetailView.as_view()),
path('background-task/<str:task_id>/delete/', views.TaskDeleteView.as_view()),
path('background-task/<str:task_id>/requeue/', views.TaskRequeueView.as_view()),
path('background-task/<str:task_id>/enqueue/', views.TaskEnqueueView.as_view()),
path('background-task/<str:task_id>/stop/', views.TaskStopView.as_view()),
path('background-queues/', views.QueueListView.as_view(), name="background_queue_list"),
path('background-workers/', views.WorkerListView.as_view(), name="background_worker_list"),
path('background-workers/<str:worker_name>/', views.WorkerDetailView.as_view(), name="background_worker_detail"),
path('background-tasks/<str:queue_name>/', views.TaskListView.as_view(), name="background_task_list"),
path('background-tasks/<str:queue_name>/deferred/', views.DeferredTaskListView.as_view(), name="background_tasks_deferred"),
path('background-tasks/<str:queue_name>/failed/', views.FailedTaskListView.as_view(), name="background_tasks_failed"),
path('background-tasks/<str:queue_name>/finished/', views.FinishedTaskListView.as_view(), name="background_tasks_finished"),
path('background-tasks/<str:queue_name>/started/', views.StartedTaskListView.as_view(), name="background_tasks_started"),
path('background-tasks/<str:queue_name>/queued/', views.QueuedTaskListView.as_view(), name="background_tasks_queued"),
path('background-task/<str:task_id>/', views.TaskDetailView.as_view(), name="background_task_detail"),
path('background-task/<str:task_id>/delete/', views.TaskDeleteView.as_view(), name="background_task_delete"),
path('background-task/<str:task_id>/requeue/', views.TaskRequeueView.as_view(), name="background_task_requeue"),
path('background-task/<str:task_id>/enqueue/', views.TaskEnqueueView.as_view(), name="background_task_enqueue"),
path('background-task/<str:task_id>/stop/', views.TaskStopView.as_view(), name="background_task_stop"),
path('', include(router.urls)),
)

View File

@ -221,7 +221,7 @@ class TaskStopView(APIView):
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
stopped_jobs = stop_rq_job(job_id)
stopped_jobs = stop_rq_job(task_id)
if len(stopped_jobs) == 1:
return HttpResponse(status=200)
else:

View File

@ -1,7 +1,14 @@
import uuid
from django_rq import get_queue
from django_rq.workers import get_worker
from django.urls import reverse
from django.utils import timezone
from rq.job import Job as RQ_Job, JobStatus
from rq.registry import DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry
from utilities.testing import APITestCase, APIViewTestCases
from users.models import Token, User
from utilities.testing import APITestCase, APIViewTestCases, TestCase
from ..models import *
@ -91,3 +98,194 @@ class DataFileTest(
),
)
DataFile.objects.bulk_create(data_files)
class BackgroundTaskTestCase(TestCase):
user_permissions = ()
# Dummy worker functions
@staticmethod
def dummy_job_default():
return "Job finished"
@staticmethod
def dummy_job_high():
return "Job finished"
@staticmethod
def dummy_job_failing():
raise Exception("Job failed")
def setUp(self):
"""
Create a user and token for API calls.
"""
# Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser')
self.user.is_staff = True
self.user.is_active = True
self.user.save()
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
# Clear all queues prior to running each test
get_queue('default').connection.flushall()
get_queue('high').connection.flushall()
get_queue('low').connection.flushall()
def test_background_queue_list(self):
url = reverse('core-api:background_queue_list')
# Attempt to load view without permission
self.user.is_staff = False
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 403)
# Load view with permission
self.user.is_staff = True
self.user.save()
response = self.client.get(url, **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn('default', str(response.content))
self.assertIn('high', str(response.content))
self.assertIn('low', str(response.content))
def test_background_tasks_list_default(self):
queue = get_queue('default')
queue.enqueue(self.dummy_job_default)
response = self.client.get(reverse('core-api:background_task_list', args=["default",]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
def test_background_tasks_list_finished(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
registry = FinishedJobRegistry(queue.name, queue.connection)
registry.add(job, 2)
response = self.client.get(reverse('core-api:background_tasks_finished', args=["default",]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(job.id, str(response.content))
def test_background_tasks_list_failed(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
registry = FailedJobRegistry(queue.name, queue.connection)
registry.add(job, 2)
response = self.client.get(reverse('core-api:background_tasks_failed', args=["default"]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(job.id, str(response.content))
def test_background_tasks_list_deferred(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
registry = DeferredJobRegistry(queue.name, queue.connection)
registry.add(job, 2)
response = self.client.get(reverse('core-api:background_tasks_deferred', args=["default",]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(job.id, str(response.content))
def test_background_task(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
response = self.client.get(reverse('core-api:background_task_detail', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(job.id), str(response.content))
self.assertIn('origin', str(response.content))
self.assertIn('meta', str(response.content))
self.assertIn('kwargs', str(response.content))
def test_background_task_delete(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
response = self.client.get(reverse('core-api:background_task_delete', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
queue = get_queue('default')
self.assertNotIn(job.id, queue.job_ids)
def test_background_task_requeue(self):
queue = get_queue('default')
# Enqueue & run a job that will fail
job = queue.enqueue(self.dummy_job_failing)
worker = get_worker('default')
worker.work(burst=True)
self.assertTrue(job.is_failed)
# Re-enqueue the failed job and check that its status has been reset
response = self.client.get(reverse('core-api:background_task_requeue', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
job = RQ_Job.fetch(job.id, queue.connection)
self.assertFalse(job.is_failed)
def test_background_task_enqueue(self):
queue = get_queue('default')
# Enqueue some jobs that each depends on its predecessor
job = previous_job = None
for _ in range(0, 3):
job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
previous_job = job
# Check that the last job to be enqueued has a status of deferred
self.assertIsNotNone(job)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertIsNone(job.enqueued_at)
# Force-enqueue the deferred job
response = self.client.get(reverse('core-api:background_task_enqueue', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
# Check that job's status is updated correctly
job = queue.fetch_job(job.id)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertIsNotNone(job.enqueued_at)
def test_background_task_stop(self):
queue = get_queue('default')
worker = get_worker('default')
job = queue.enqueue(self.dummy_job_default)
worker.prepare_job_execution(job)
self.assertEqual(job.get_status(), JobStatus.STARTED)
# Stop those jobs using the view
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
# self.assertEqual(len(started_job_registry), 1)
response = self.client.get(reverse('core-api:background_task_stop', args=[job.id]), **self.header)
self.assertEqual(response.status_code, 200)
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
self.assertEqual(len(started_job_registry), 0)
canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
self.assertEqual(len(canceled_job_registry), 1)
self.assertIn(job.id, canceled_job_registry)
def test_worker_list(self):
worker1 = get_worker('default', name=uuid.uuid4().hex)
worker1.register_birth()
worker2 = get_worker('high')
worker2.register_birth()
response = self.client.get(reverse('core-api:background_worker_list'), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))
def test_worker(self):
worker1 = get_worker('default', name=uuid.uuid4().hex)
worker1.register_birth()
response = self.client.get(reverse('core-api:background_worker_detail', args=[worker1.name]), **self.header)
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))
self.assertIn('birth_date', str(response.content))
self.assertIn('total_working_time', str(response.content))