7848 add worker support

This commit is contained in:
Arthur Hanson 2024-11-05 14:50:13 -08:00
parent a327c916c0
commit 21a61dcb66
4 changed files with 78 additions and 9 deletions

View File

@ -3,6 +3,7 @@ from rest_framework import serializers
__all__ = (
'BackgroundTaskSerializer',
'BackgroundQueueSerializer',
'BackgroundWorkerSerializer',
)
@ -10,12 +11,22 @@ class BackgroundTaskSerializer(serializers.Serializer):
id = serializers.CharField()
description = serializers.CharField()
origin = serializers.CharField()
func_name = serializers.CharField()
args = serializers.ListField(child=serializers.CharField())
kwargs = serializers.DictField()
result = serializers.CharField()
timeout = serializers.IntegerField()
result_ttl = serializers.IntegerField()
created_at = serializers.CharField()
enqueued_at = serializers.CharField()
started_at = serializers.CharField()
ended_at = serializers.DictField()
worker_name = serializers.DictField()
position = serializers.SerializerMethodField()
status = serializers.SerializerMethodField()
meta = serializers.DictField()
last_heartbeat = serializers.CharField()
is_finished = serializers.BooleanField()
is_queued = serializers.BooleanField()
is_failed = serializers.BooleanField()
@ -45,3 +56,17 @@ class BackgroundQueueSerializer(serializers.Serializer):
deferred_jobs = serializers.IntegerField()
failed_jobs = serializers.IntegerField()
scheduled_jobs = serializers.IntegerField()
class BackgroundWorkerSerializer(serializers.Serializer):
name = serializers.CharField()
state = serializers.SerializerMethodField()
birth_date = serializers.CharField()
queue_names = serializers.ListField(child=serializers.CharField())
pid = serializers.CharField()
successful_job_count = serializers.IntegerField()
failed_job_count = serializers.IntegerField()
total_working_time = serializers.IntegerField()
def get_state(self, obj):
return obj.get_state()

View File

@ -9,6 +9,7 @@ router.register('data-sources', views.DataSourceViewSet)
router.register('data-files', views.DataFileViewSet)
router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet)
router.register('background-workers', views.BackgroundWorkerViewSet, basename='BackgroundWorkers')
router.register('background-queues', views.BackgroundQueueViewSet, basename='BackgroundQueues')
router.register('background-tasks/(?P<queue_name>[\w-]+)', views.BackgroundTaskViewSet, basename='BackgroundTasks')
router.register('background-tasks/(?P<queue_name>[\w-]+)/deferred', views.BackgroundTaskDeferredViewSet, basename='BackgroundTaskDeferred')
@ -16,6 +17,7 @@ router.register('background-tasks/(?P<queue_name>[\w-]+)/failed', views.Backgrou
router.register('background-tasks/(?P<queue_name>[\w-]+)/finished', views.BackgroundTaskFinishedViewSet, basename='BackgroundTaskFinished')
router.register('background-tasks/(?P<queue_name>[\w-]+)/started', views.BackgroundTaskStartedViewSet, basename='BackgroundTaskStarted')
router.register('background-tasks/(?P<queue_name>[\w-]+)/queued', views.BackgroundTaskQueuedViewSet, basename='BackgroundTaskQueued')
router.register('background-tasks/(?P<queue_name>[\w-]+)/workers', views.BackgroundTaskWorkerViewSet, basename='BackgroundTaskWorkers')
app_name = 'core-api'
urlpatterns = router.urls

View File

@ -13,11 +13,14 @@ from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob
from core.models import *
from core.utils import get_rq_jobs_from_status
from django_rq.queues import get_queue
from django_rq.queues import get_queue, get_redis_connection
from django_rq.utils import get_statistics
from django_rq.settings import QUEUES_LIST
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
from rest_framework.permissions import IsAdminUser
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry
from . import serializers
@ -88,14 +91,27 @@ class BackgroundQueueViewSet(ViewSet):
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request):
"""
Return the UserConfig for the currently authenticated User.
"""
data = get_statistics(run_maintenance_tasks=True)["queues"]
serializer = serializers.BackgroundQueueSerializer(data, many=True)
return Response(serializer.data)
class BackgroundWorkerViewSet(ViewSet):
serializer_class = serializers.BackgroundQueueSerializer
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Workers"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
workers = Worker.all(get_redis_connection(config['connection_config']))
serializer = serializers.BackgroundWorkerSerializer(workers, many=True)
return Response(serializer.data)
class BackgroundTaskViewSet(ViewSet):
serializer_class = serializers.BackgroundTaskSerializer
permission_classes = [IsAdminUser]
@ -120,13 +136,10 @@ class BaseBackgroundTaskViewSet(ViewSet):
registry = None
def get_view_name(self):
return "BackgroundTaskViewSet"
return "Background Tasks"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request, queue_name):
"""
Return the UserConfig for the currently authenticated User.
"""
queue = get_queue(queue_name)
data = get_rq_jobs_from_status(queue, self.registry)
serializer = serializers.BackgroundTaskSerializer(data, many=True)
@ -136,18 +149,30 @@ class BaseBackgroundTaskViewSet(ViewSet):
class BackgroundTaskDeferredViewSet(BaseBackgroundTaskViewSet):
registry = "deferred"
def get_view_name(self):
return "Deferred Tasks"
class BackgroundTaskFailedViewSet(BaseBackgroundTaskViewSet):
registry = "failed"
def get_view_name(self):
return "Failed Tasks"
class BackgroundTaskFinishedViewSet(BaseBackgroundTaskViewSet):
registry = "finished"
def get_view_name(self):
return "Finished Tasks"
class BackgroundTaskStartedViewSet(BaseBackgroundTaskViewSet):
registry = "started"
def get_view_name(self):
return "Started Tasks"
class BackgroundTaskQueuedViewSet(BaseBackgroundTaskViewSet):
registry = "queued"
@ -161,3 +186,20 @@ class BackgroundTaskQueuedViewSet(BaseBackgroundTaskViewSet):
data = queue.get_jobs()
serializer = serializers.BackgroundTaskSerializer(data, many=True)
return Response(serializer.data)
class BackgroundTaskWorkerViewSet(ViewSet):
serializer_class = serializers.BackgroundQueueSerializer
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Workers"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request, queue_name):
queue = get_queue(queue_name)
clean_worker_registry(queue)
all_workers = Worker.all(queue.connection)
workers = [worker for worker in all_workers if queue.name in worker.queue_names()]
serializer = serializers.BackgroundWorkerSerializer(workers, many=True)
return Response(serializer.data)

View File

@ -17,7 +17,7 @@ from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.utils import get_statistics, stop_jobs
from rq import requeue_job
from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
from rq.registry import (
DeferredJobRegistry, FinishedJobRegistry, ScheduledJobRegistry,
)