diff --git a/netbox/core/api/serializers_/tasks.py b/netbox/core/api/serializers_/tasks.py index 7537902e8..8ec546ff0 100644 --- a/netbox/core/api/serializers_/tasks.py +++ b/netbox/core/api/serializers_/tasks.py @@ -3,6 +3,7 @@ from rest_framework import serializers __all__ = ( 'BackgroundTaskSerializer', 'BackgroundQueueSerializer', + 'BackgroundWorkerSerializer', ) @@ -10,12 +11,22 @@ class BackgroundTaskSerializer(serializers.Serializer): id = serializers.CharField() description = serializers.CharField() origin = serializers.CharField() + func_name = serializers.CharField() + args = serializers.ListField(child=serializers.CharField()) + kwargs = serializers.DictField() + result = serializers.CharField() + timeout = serializers.IntegerField() + result_ttl = serializers.IntegerField() + created_at = serializers.CharField() enqueued_at = serializers.CharField() started_at = serializers.CharField() ended_at = serializers.DictField() worker_name = serializers.DictField() position = serializers.SerializerMethodField() status = serializers.SerializerMethodField() + meta = serializers.DictField() + last_heartbeat = serializers.CharField() + is_finished = serializers.BooleanField() is_queued = serializers.BooleanField() is_failed = serializers.BooleanField() @@ -45,3 +56,17 @@ class BackgroundQueueSerializer(serializers.Serializer): deferred_jobs = serializers.IntegerField() failed_jobs = serializers.IntegerField() scheduled_jobs = serializers.IntegerField() + + +class BackgroundWorkerSerializer(serializers.Serializer): + name = serializers.CharField() + state = serializers.SerializerMethodField() + birth_date = serializers.CharField() + queue_names = serializers.ListField(child=serializers.CharField()) + pid = serializers.CharField() + successful_job_count = serializers.IntegerField() + failed_job_count = serializers.IntegerField() + total_working_time = serializers.IntegerField() + + def get_state(self, obj): + return obj.get_state() diff --git a/netbox/core/api/urls.py b/netbox/core/api/urls.py index a29e2b393..19b7f4978 100644 --- a/netbox/core/api/urls.py +++ b/netbox/core/api/urls.py @@ -9,6 +9,7 @@ router.register('data-sources', views.DataSourceViewSet) router.register('data-files', views.DataFileViewSet) router.register('jobs', views.JobViewSet) router.register('object-changes', views.ObjectChangeViewSet) +router.register('background-workers', views.BackgroundWorkerViewSet, basename='BackgroundWorkers') router.register('background-queues', views.BackgroundQueueViewSet, basename='BackgroundQueues') router.register('background-tasks/(?P[\w-]+)', views.BackgroundTaskViewSet, basename='BackgroundTasks') router.register('background-tasks/(?P[\w-]+)/deferred', views.BackgroundTaskDeferredViewSet, basename='BackgroundTaskDeferred') @@ -16,6 +17,7 @@ router.register('background-tasks/(?P[\w-]+)/failed', views.Backgrou router.register('background-tasks/(?P[\w-]+)/finished', views.BackgroundTaskFinishedViewSet, basename='BackgroundTaskFinished') router.register('background-tasks/(?P[\w-]+)/started', views.BackgroundTaskStartedViewSet, basename='BackgroundTaskStarted') router.register('background-tasks/(?P[\w-]+)/queued', views.BackgroundTaskQueuedViewSet, basename='BackgroundTaskQueued') +router.register('background-tasks/(?P[\w-]+)/workers', views.BackgroundTaskWorkerViewSet, basename='BackgroundTaskWorkers') app_name = 'core-api' urlpatterns = router.urls diff --git a/netbox/core/api/views.py b/netbox/core/api/views.py index a19a7120a..a67ce1946 100644 --- a/netbox/core/api/views.py +++ b/netbox/core/api/views.py @@ -13,11 +13,14 @@ from core.choices import DataSourceStatusChoices from core.jobs import SyncDataSourceJob from core.models import * from core.utils import get_rq_jobs_from_status -from django_rq.queues import get_queue +from django_rq.queues import get_queue, get_redis_connection from django_rq.utils import get_statistics +from django_rq.settings import QUEUES_LIST from netbox.api.metadata import ContentTypeMetadata from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet from rest_framework.permissions import IsAdminUser +from rq.worker import Worker +from rq.worker_registration import clean_worker_registry from . import serializers @@ -88,14 +91,27 @@ class BackgroundQueueViewSet(ViewSet): @extend_schema(responses={200: OpenApiTypes.OBJECT}) def list(self, request): - """ - Return the UserConfig for the currently authenticated User. - """ data = get_statistics(run_maintenance_tasks=True)["queues"] serializer = serializers.BackgroundQueueSerializer(data, many=True) return Response(serializer.data) +class BackgroundWorkerViewSet(ViewSet): + serializer_class = serializers.BackgroundQueueSerializer + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Workers" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def list(self, request): + # all the RQ queues should use the same connection + config = QUEUES_LIST[0] + workers = Worker.all(get_redis_connection(config['connection_config'])) + serializer = serializers.BackgroundWorkerSerializer(workers, many=True) + return Response(serializer.data) + + class BackgroundTaskViewSet(ViewSet): serializer_class = serializers.BackgroundTaskSerializer permission_classes = [IsAdminUser] @@ -120,13 +136,10 @@ class BaseBackgroundTaskViewSet(ViewSet): registry = None def get_view_name(self): - return "BackgroundTaskViewSet" + return "Background Tasks" @extend_schema(responses={200: OpenApiTypes.OBJECT}) def list(self, request, queue_name): - """ - Return the UserConfig for the currently authenticated User. - """ queue = get_queue(queue_name) data = get_rq_jobs_from_status(queue, self.registry) serializer = serializers.BackgroundTaskSerializer(data, many=True) @@ -136,18 +149,30 @@ class BaseBackgroundTaskViewSet(ViewSet): class BackgroundTaskDeferredViewSet(BaseBackgroundTaskViewSet): registry = "deferred" + def get_view_name(self): + return "Deferred Tasks" + class BackgroundTaskFailedViewSet(BaseBackgroundTaskViewSet): registry = "failed" + def get_view_name(self): + return "Failed Tasks" + class BackgroundTaskFinishedViewSet(BaseBackgroundTaskViewSet): registry = "finished" + def get_view_name(self): + return "Finished Tasks" + class BackgroundTaskStartedViewSet(BaseBackgroundTaskViewSet): registry = "started" + def get_view_name(self): + return "Started Tasks" + class BackgroundTaskQueuedViewSet(BaseBackgroundTaskViewSet): registry = "queued" @@ -161,3 +186,20 @@ class BackgroundTaskQueuedViewSet(BaseBackgroundTaskViewSet): data = queue.get_jobs() serializer = serializers.BackgroundTaskSerializer(data, many=True) return Response(serializer.data) + + +class BackgroundTaskWorkerViewSet(ViewSet): + serializer_class = serializers.BackgroundQueueSerializer + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Workers" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def list(self, request, queue_name): + queue = get_queue(queue_name) + clean_worker_registry(queue) + all_workers = Worker.all(queue.connection) + workers = [worker for worker in all_workers if queue.name in worker.queue_names()] + serializer = serializers.BackgroundWorkerSerializer(workers, many=True) + return Response(serializer.data) diff --git a/netbox/core/views.py b/netbox/core/views.py index 6d15efed4..34b93ab58 100644 --- a/netbox/core/views.py +++ b/netbox/core/views.py @@ -17,7 +17,7 @@ from django_rq.settings import QUEUES_MAP, QUEUES_LIST from django_rq.utils import get_statistics, stop_jobs from rq import requeue_job from rq.exceptions import NoSuchJobError -from rq.job import Job as RQ_Job +from rq.job import Job as RQ_Job, JobStatus as RQJobStatus from rq.registry import ( DeferredJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, )