7848 review comments - get all tasks

This commit is contained in:
Arthur Hanson 2024-11-19 09:13:39 -08:00
parent 3ec3bef5fa
commit c6482d88e2
4 changed files with 59 additions and 59 deletions

View File

@ -9,6 +9,7 @@ __all__ = (
class BackgroundTaskSerializer(serializers.Serializer):
id = serializers.CharField()
url = serializers.HyperlinkedIdentityField(view_name='core-api:rqtask-detail', lookup_field='id', lookup_url_kwarg='pk')
description = serializers.CharField()
origin = serializers.CharField()
func_name = serializers.CharField()
@ -59,8 +60,8 @@ class BackgroundQueueSerializer(serializers.Serializer):
class BackgroundWorkerSerializer(serializers.Serializer):
name = serializers.HyperlinkedIdentityField(view_name='core-api:rqworker-detail', lookup_field='name')
name = serializers.CharField()
url = serializers.HyperlinkedIdentityField(view_name='core-api:rqworker-detail', lookup_field='name')
state = serializers.SerializerMethodField()
birth_date = serializers.CharField()
queue_names = serializers.ListField(child=serializers.CharField())

View File

@ -12,7 +12,7 @@ router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet)
router.register('background-queues', views.QueueViewSet, basename='rqqueue')
router.register('background-workers', views.WorkerViewSet, basename='rqworker')
router.register('background-tasks/(?P<queue_name>.+)/', views.TaskViewSet, basename='rqtask')
router.register('background-task', views.TaskDetailViewSet, basename='rqtaskdetail')
# router.register('background-tasks/(?P<queue_name>.+)/', views.TaskViewSet, basename='rqtask')
router.register('background-tasks', views.TaskViewSet, basename='rqtask')
urlpatterns = router.urls

View File

@ -13,7 +13,7 @@ from core import filtersets
from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob
from core.models import *
from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
from django_rq.queues import get_queue, get_redis_connection
from django_rq.utils import get_statistics
from django_rq.settings import QUEUES_LIST
@ -92,6 +92,9 @@ class BaseRQListView(viewsets.ViewSet):
permission_classes = [IsAdminUser]
serializer_class = None
def get_data(self):
raise NotImplementedError()
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request):
data = self.get_data()
@ -115,6 +118,17 @@ class QueueViewSet(BaseRQListView):
def get_data(self):
return get_statistics(run_maintenance_tasks=True)["queues"]
def retrieve(self, request, name):
queues = self.get_data()
if not queues:
raise Http404
for queue in queues:
if queue.name == name:
return Response(self.serializer_class(data, context={'request': request}))
raise Http404
class WorkerViewSet(BaseRQListView):
"""
@ -142,14 +156,40 @@ class WorkerViewSet(BaseRQListView):
return Response(serializer.data)
class TaskDetailViewSet(viewsets.ViewSet):
class TaskViewSet(viewsets.ViewSet):
"""
Retrieve the details of the specified RQ Task.
"""
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Task"
return "Background Tasks"
def get_response(self, request, queue_name, status=None):
try:
queue = get_queue(queue_name)
except KeyError:
raise Http404
if status:
data = get_rq_jobs_from_status(queue, status)
else:
data = queue.get_jobs()
paginator = LimitOffsetListPagination()
data = paginator.paginate_list(data, request)
serializer = serializers.BackgroundTaskSerializer(data, many=True, context={'request': request})
return Response(serializer.data)
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request):
data = get_rq_jobs()
paginator = LimitOffsetListPagination()
data = paginator.paginate_list(data, request)
serializer = serializers.BackgroundTaskSerializer(data, many=True, context={'request': request})
return Response(serializer.data)
def get_task_from_id(self, task_id):
config = QUEUES_LIST[0]
@ -187,54 +227,3 @@ class TaskDetailViewSet(viewsets.ViewSet):
return HttpResponse(status=200)
else:
return HttpResponse(status=204)
class TaskViewSet(viewsets.ViewSet):
"""
Background Task API.
"""
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Tasks"
def get_response(self, request, queue_name, status=None):
try:
queue = get_queue(queue_name)
except KeyError:
raise Http404
if status:
data = get_rq_jobs_from_status(queue, status)
else:
data = queue.get_jobs()
paginator = LimitOffsetListPagination()
data = paginator.paginate_list(data, request)
serializer = serializers.BackgroundTaskSerializer(data, many=True, context={'request': request})
return Response(serializer.data)
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request, queue_name):
return self.get_response(request, queue_name, None)
@action(methods=["GET"], detail=False)
def deferred(self, request, queue_name):
return self.get_response(request, queue_name, "deferred")
@action(methods=["GET"], detail=False)
def failed(self, request, queue_name):
return self.get_response(request, queue_name, "failed")
@action(methods=["GET"], detail=False)
def finished(self, request, queue_name):
return self.get_response(request, queue_name, "finished")
@action(methods=["GET"], detail=False)
def started(self, request, queue_name):
return self.get_response(request, queue_name, "started")
@action(methods=["GET"], detail=False)
def queued(self, request, queue_name):
return self.get_response(request, queue_name, None)

View File

@ -1,6 +1,6 @@
from django.http import Http404
from django.utils.translation import gettext_lazy as _
from django_rq.queues import get_queue_by_index, get_redis_connection
from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.utils import get_jobs, stop_jobs
from rq import requeue_job
@ -51,6 +51,16 @@ def get_rq_jobs_from_status(queue, status):
return jobs
def get_rq_jobs():
jobs = set()
for queue in QUEUES_LIST:
queue = get_queue(queue['name'])
jobs.update(queue.get_jobs())
return list(jobs)
def delete_rq_job(job_id):
"""
Deletes the specified RQ job.