diff --git a/netbox/core/api/serializers_/tasks.py b/netbox/core/api/serializers_/tasks.py index 5f99bdaba..7537902e8 100644 --- a/netbox/core/api/serializers_/tasks.py +++ b/netbox/core/api/serializers_/tasks.py @@ -1,10 +1,37 @@ from rest_framework import serializers __all__ = ( + 'BackgroundTaskSerializer', 'BackgroundQueueSerializer', ) +class BackgroundTaskSerializer(serializers.Serializer): + id = serializers.CharField() + description = serializers.CharField() + origin = serializers.CharField() + enqueued_at = serializers.CharField() + started_at = serializers.CharField() + ended_at = serializers.DictField() + worker_name = serializers.DictField() + position = serializers.SerializerMethodField() + status = serializers.SerializerMethodField() + is_finished = serializers.BooleanField() + is_queued = serializers.BooleanField() + is_failed = serializers.BooleanField() + is_started = serializers.BooleanField() + is_deferred = serializers.BooleanField() + is_canceled = serializers.BooleanField() + is_scheduled = serializers.BooleanField() + is_stopped = serializers.BooleanField() + + def get_position(self, obj): + return obj.get_position() + + def get_status(self, obj): + return obj.get_status() + + class BackgroundQueueSerializer(serializers.Serializer): name = serializers.CharField() jobs = serializers.IntegerField() diff --git a/netbox/core/api/urls.py b/netbox/core/api/urls.py index c75da9625..7ccfd3517 100644 --- a/netbox/core/api/urls.py +++ b/netbox/core/api/urls.py @@ -9,7 +9,12 @@ router.register('data-sources', views.DataSourceViewSet) router.register('data-files', views.DataFileViewSet) router.register('jobs', views.JobViewSet) router.register('object-changes', views.ObjectChangeViewSet) -router.register('background-queues', views.BackgroundQueueViewSet, basename='RQ') +router.register('background-queues', views.BackgroundQueueViewSet, basename='BackgroundQueues') +router.register('background-tasks/(?P[\w-]+)', views.BackgroundTaskViewSet, basename='BackgroundTasks') +router.register('background-tasks/(?P[\w-]+)/deferred', views.BackgroundTaskDeferredViewSet, basename='BackgroundTaskDeferred') +router.register('background-tasks/(?P[\w-]+)/failed', views.BackgroundTaskFailedViewSet, basename='BackgroundTaskFailed') +router.register('background-tasks/(?P[\w-]+)/finished', views.BackgroundTaskFinishedViewSet, basename='BackgroundTaskFinished') +router.register('background-tasks/(?P[\w-]+)/started', views.BackgroundTaskStartedViewSet, basename='BackgroundTaskStarted') app_name = 'core-api' urlpatterns = router.urls diff --git a/netbox/core/api/views.py b/netbox/core/api/views.py index b05325ea5..b2b82ff3a 100644 --- a/netbox/core/api/views.py +++ b/netbox/core/api/views.py @@ -12,6 +12,8 @@ from core import filtersets from core.choices import DataSourceStatusChoices from core.jobs import SyncDataSourceJob from core.models import * +from core.utils import get_rq_jobs_from_status +from django_rq.queues import get_queue from django_rq.utils import get_statistics from netbox.api.metadata import ContentTypeMetadata from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet @@ -82,7 +84,7 @@ class BackgroundQueueViewSet(ViewSet): permission_classes = [IsAdminUser] def get_view_name(self): - return "RQ" + return "BackgroundQueueViewSet" @extend_schema(responses={200: OpenApiTypes.OBJECT}) def list(self, request): @@ -92,3 +94,56 @@ class BackgroundQueueViewSet(ViewSet): data = get_statistics(run_maintenance_tasks=True)["queues"] serializer = serializers.BackgroundQueueSerializer(data, many=True) return Response(serializer.data) + + +class BackgroundTaskViewSet(ViewSet): + serializer_class = serializers.BackgroundTaskSerializer + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "BackgroundTaskViewSet" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def list(self, request, queue_name): + """ + Return the UserConfig for the currently authenticated User. + """ + queue = get_queue(queue_name) + data = queue.get_jobs() + serializer = serializers.BackgroundTaskSerializer(data, many=True) + return Response(serializer.data) + + +class BaseBackgroundTaskViewSet(ViewSet): + serializer_class = serializers.BackgroundTaskSerializer + permission_classes = [IsAdminUser] + registry = None + + def get_view_name(self): + return "BackgroundTaskViewSet" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def list(self, request, queue_name): + """ + Return the UserConfig for the currently authenticated User. + """ + queue = get_queue(queue_name) + data = get_rq_jobs_from_status(queue, self.registry) + serializer = serializers.BackgroundTaskSerializer(data, many=True) + return Response(serializer.data) + + +class BackgroundTaskDeferredViewSet(BaseBackgroundTaskViewSet): + registry = "deferred" + + +class BackgroundTaskFailedViewSet(BaseBackgroundTaskViewSet): + registry = "failed" + + +class BackgroundTaskFinishedViewSet(BaseBackgroundTaskViewSet): + registry = "finished" + + +class BackgroundTaskStartedViewSet(BaseBackgroundTaskViewSet): + registry = "started" diff --git a/netbox/core/utils.py b/netbox/core/utils.py new file mode 100644 index 000000000..e652c3f69 --- /dev/null +++ b/netbox/core/utils.py @@ -0,0 +1,44 @@ +from django.http import Http404 +from django_rq.utils import get_jobs +from rq.exceptions import NoSuchJobError +from rq.job import Job as RQ_Job, JobStatus as RQJobStatus +from rq.registry import ( + DeferredJobRegistry, + FailedJobRegistry, + FinishedJobRegistry, + ScheduledJobRegistry, + StartedJobRegistry, +) + + +def get_rq_jobs_from_status(queue, status): + jobs = [] + + try: + registry_cls = { + RQJobStatus.STARTED: StartedJobRegistry, + RQJobStatus.DEFERRED: DeferredJobRegistry, + RQJobStatus.FINISHED: FinishedJobRegistry, + RQJobStatus.FAILED: FailedJobRegistry, + RQJobStatus.SCHEDULED: ScheduledJobRegistry, + }[status] + except KeyError: + raise Http404 + registry = registry_cls(queue.name, queue.connection) + + job_ids = registry.get_job_ids() + if status != RQJobStatus.DEFERRED: + jobs = get_jobs(queue, job_ids, registry) + else: + # Deferred jobs require special handling + for job_id in job_ids: + try: + jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer)) + except NoSuchJobError: + pass + + if jobs and status == RQJobStatus.SCHEDULED: + for job in jobs: + job.scheduled_at = registry.get_scheduled_time(job) + + return jobs