7848 Tasks

This commit is contained in:
Arthur Hanson 2024-11-01 13:36:07 -07:00
parent 1ab38c3a45
commit c8f13ad2a5
4 changed files with 133 additions and 2 deletions

View File

@ -1,10 +1,37 @@
from rest_framework import serializers
__all__ = (
'BackgroundTaskSerializer',
'BackgroundQueueSerializer',
)
class BackgroundTaskSerializer(serializers.Serializer):
id = serializers.CharField()
description = serializers.CharField()
origin = serializers.CharField()
enqueued_at = serializers.CharField()
started_at = serializers.CharField()
ended_at = serializers.DictField()
worker_name = serializers.DictField()
position = serializers.SerializerMethodField()
status = serializers.SerializerMethodField()
is_finished = serializers.BooleanField()
is_queued = serializers.BooleanField()
is_failed = serializers.BooleanField()
is_started = serializers.BooleanField()
is_deferred = serializers.BooleanField()
is_canceled = serializers.BooleanField()
is_scheduled = serializers.BooleanField()
is_stopped = serializers.BooleanField()
def get_position(self, obj):
return obj.get_position()
def get_status(self, obj):
return obj.get_status()
class BackgroundQueueSerializer(serializers.Serializer):
name = serializers.CharField()
jobs = serializers.IntegerField()

View File

@ -9,7 +9,12 @@ router.register('data-sources', views.DataSourceViewSet)
router.register('data-files', views.DataFileViewSet)
router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet)
router.register('background-queues', views.BackgroundQueueViewSet, basename='RQ')
router.register('background-queues', views.BackgroundQueueViewSet, basename='BackgroundQueues')
router.register('background-tasks/(?P<queue_name>[\w-]+)', views.BackgroundTaskViewSet, basename='BackgroundTasks')
router.register('background-tasks/(?P<queue_name>[\w-]+)/deferred', views.BackgroundTaskDeferredViewSet, basename='BackgroundTaskDeferred')
router.register('background-tasks/(?P<queue_name>[\w-]+)/failed', views.BackgroundTaskFailedViewSet, basename='BackgroundTaskFailed')
router.register('background-tasks/(?P<queue_name>[\w-]+)/finished', views.BackgroundTaskFinishedViewSet, basename='BackgroundTaskFinished')
router.register('background-tasks/(?P<queue_name>[\w-]+)/started', views.BackgroundTaskStartedViewSet, basename='BackgroundTaskStarted')
app_name = 'core-api'
urlpatterns = router.urls

View File

@ -12,6 +12,8 @@ from core import filtersets
from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob
from core.models import *
from core.utils import get_rq_jobs_from_status
from django_rq.queues import get_queue
from django_rq.utils import get_statistics
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
@ -82,7 +84,7 @@ class BackgroundQueueViewSet(ViewSet):
permission_classes = [IsAdminUser]
def get_view_name(self):
return "RQ"
return "BackgroundQueueViewSet"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request):
@ -92,3 +94,56 @@ class BackgroundQueueViewSet(ViewSet):
data = get_statistics(run_maintenance_tasks=True)["queues"]
serializer = serializers.BackgroundQueueSerializer(data, many=True)
return Response(serializer.data)
class BackgroundTaskViewSet(ViewSet):
serializer_class = serializers.BackgroundTaskSerializer
permission_classes = [IsAdminUser]
def get_view_name(self):
return "BackgroundTaskViewSet"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request, queue_name):
"""
Return the UserConfig for the currently authenticated User.
"""
queue = get_queue(queue_name)
data = queue.get_jobs()
serializer = serializers.BackgroundTaskSerializer(data, many=True)
return Response(serializer.data)
class BaseBackgroundTaskViewSet(ViewSet):
serializer_class = serializers.BackgroundTaskSerializer
permission_classes = [IsAdminUser]
registry = None
def get_view_name(self):
return "BackgroundTaskViewSet"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def list(self, request, queue_name):
"""
Return the UserConfig for the currently authenticated User.
"""
queue = get_queue(queue_name)
data = get_rq_jobs_from_status(queue, self.registry)
serializer = serializers.BackgroundTaskSerializer(data, many=True)
return Response(serializer.data)
class BackgroundTaskDeferredViewSet(BaseBackgroundTaskViewSet):
registry = "deferred"
class BackgroundTaskFailedViewSet(BaseBackgroundTaskViewSet):
registry = "failed"
class BackgroundTaskFinishedViewSet(BaseBackgroundTaskViewSet):
registry = "finished"
class BackgroundTaskStartedViewSet(BaseBackgroundTaskViewSet):
registry = "started"

44
netbox/core/utils.py Normal file
View File

@ -0,0 +1,44 @@
from django.http import Http404
from django_rq.utils import get_jobs
from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
from rq.registry import (
DeferredJobRegistry,
FailedJobRegistry,
FinishedJobRegistry,
ScheduledJobRegistry,
StartedJobRegistry,
)
def get_rq_jobs_from_status(queue, status):
jobs = []
try:
registry_cls = {
RQJobStatus.STARTED: StartedJobRegistry,
RQJobStatus.DEFERRED: DeferredJobRegistry,
RQJobStatus.FINISHED: FinishedJobRegistry,
RQJobStatus.FAILED: FailedJobRegistry,
RQJobStatus.SCHEDULED: ScheduledJobRegistry,
}[status]
except KeyError:
raise Http404
registry = registry_cls(queue.name, queue.connection)
job_ids = registry.get_job_ids()
if status != RQJobStatus.DEFERRED:
jobs = get_jobs(queue, job_ids, registry)
else:
# Deferred jobs require special handling
for job_id in job_ids:
try:
jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
except NoSuchJobError:
pass
if jobs and status == RQJobStatus.SCHEDULED:
for job in jobs:
job.scheduled_at = registry.get_scheduled_time(job)
return jobs