From a24576f12607a73f24d2c75d9a2c97194a3066d6 Mon Sep 17 00:00:00 2001 From: Arthur Hanson Date: Tue, 26 Nov 2024 07:01:06 -0800 Subject: [PATCH] 7848 Add RQ API (#17938) * 7848 Add Background Tasks (RQ) to API * 7848 Tasks * 7848 cleanup * 7848 add worker support * 7848 switch to APIView * 7848 Task detail view * 7848 Task enqueue, requeue, stop * 7848 Task enqueue, requeue, stop * 7848 Task enqueue, requeue, stop * 7848 tests * 7848 tests * 7848 OpenAPI doc generation * 7848 OpenAPI doc generation * 7848 review changes * 7848 viewset * 7848 viewset * 7848 fix tests * 7848 more viewsets * 7848 fix docstring * 7848 review comments * 7848 review comments - get all tasks * 7848 queue detail view * 7848 cleanup * 7848 cleanup * 7848 cleanup * 7848 cleanup * Rename viewsets for consistency w/serializers * Misc cleanup * 7848 review changes * 7848 review changes * 7848 add test * 7848 queue detail view * 7848 fix tests * 7848 fix the spectacular test failure * 7848 fix the spectacular test failure * Misc cleanup --------- Co-authored-by: Jeremy Stretch --- netbox/core/api/schema.py | 3 + netbox/core/api/serializers.py | 1 + netbox/core/api/serializers_/tasks.py | 87 +++++++++++++ netbox/core/api/urls.py | 5 +- netbox/core/api/views.py | 161 ++++++++++++++++++++++++ netbox/core/tests/test_api.py | 170 +++++++++++++++++++++++++- netbox/core/utils.py | 155 +++++++++++++++++++++++ netbox/core/views.py | 104 ++-------------- netbox/netbox/api/pagination.py | 25 ++++ 9 files changed, 612 insertions(+), 99 deletions(-) create mode 100644 netbox/core/api/serializers_/tasks.py create mode 100644 netbox/core/utils.py diff --git a/netbox/core/api/schema.py b/netbox/core/api/schema.py index fad907ac1..663ee2899 100644 --- a/netbox/core/api/schema.py +++ b/netbox/core/api/schema.py @@ -158,6 +158,9 @@ class NetBoxAutoSchema(AutoSchema): fields = {} if hasattr(serializer, 'child') else serializer.fields remove_fields = [] + # If you get a failure here for "AttributeError: 'cached_property' object has no attribute 'items'" + # it is probably because you are using a viewsets.ViewSet for the API View and are defining a + # serializer_class. You will also need to define a get_serializer() method like for GenericAPIView. for child_name, child in fields.items(): # read_only fields don't need to be in writable (write only) serializers if 'read_only' in dir(child) and child.read_only: diff --git a/netbox/core/api/serializers.py b/netbox/core/api/serializers.py index 2dde6be9f..9a6d4d726 100644 --- a/netbox/core/api/serializers.py +++ b/netbox/core/api/serializers.py @@ -1,3 +1,4 @@ from .serializers_.change_logging import * from .serializers_.data import * from .serializers_.jobs import * +from .serializers_.tasks import * diff --git a/netbox/core/api/serializers_/tasks.py b/netbox/core/api/serializers_/tasks.py new file mode 100644 index 000000000..53f2b5126 --- /dev/null +++ b/netbox/core/api/serializers_/tasks.py @@ -0,0 +1,87 @@ +from rest_framework import serializers +from rest_framework.reverse import reverse + +__all__ = ( + 'BackgroundTaskSerializer', + 'BackgroundQueueSerializer', + 'BackgroundWorkerSerializer', +) + + +class BackgroundTaskSerializer(serializers.Serializer): + id = serializers.CharField() + url = serializers.HyperlinkedIdentityField( + view_name='core-api:rqtask-detail', + lookup_field='id', + lookup_url_kwarg='pk' + ) + description = serializers.CharField() + origin = serializers.CharField() + func_name = serializers.CharField() + args = serializers.ListField(child=serializers.CharField()) + kwargs = serializers.DictField() + result = serializers.CharField() + timeout = serializers.IntegerField() + result_ttl = serializers.IntegerField() + created_at = serializers.DateTimeField() + enqueued_at = serializers.DateTimeField() + started_at = serializers.DateTimeField() + ended_at = serializers.DateTimeField() + worker_name = serializers.CharField() + position = serializers.SerializerMethodField() + status = serializers.SerializerMethodField() + meta = serializers.DictField() + last_heartbeat = serializers.CharField() + + is_finished = serializers.BooleanField() + is_queued = serializers.BooleanField() + is_failed = serializers.BooleanField() + is_started = serializers.BooleanField() + is_deferred = serializers.BooleanField() + is_canceled = serializers.BooleanField() + is_scheduled = serializers.BooleanField() + is_stopped = serializers.BooleanField() + + def get_position(self, obj) -> int: + return obj.get_position() + + def get_status(self, obj) -> str: + return obj.get_status() + + +class BackgroundQueueSerializer(serializers.Serializer): + name = serializers.CharField() + url = serializers.SerializerMethodField() + jobs = serializers.IntegerField() + oldest_job_timestamp = serializers.CharField() + index = serializers.IntegerField() + scheduler_pid = serializers.CharField() + workers = serializers.IntegerField() + finished_jobs = serializers.IntegerField() + started_jobs = serializers.IntegerField() + deferred_jobs = serializers.IntegerField() + failed_jobs = serializers.IntegerField() + scheduled_jobs = serializers.IntegerField() + + def get_url(self, obj): + return reverse('core-api:rqqueue-detail', args=[obj['name']], request=self.context.get("request")) + + +class BackgroundWorkerSerializer(serializers.Serializer): + name = serializers.CharField() + url = serializers.HyperlinkedIdentityField( + view_name='core-api:rqworker-detail', + lookup_field='name' + ) + state = serializers.SerializerMethodField() + birth_date = serializers.DateTimeField() + queue_names = serializers.ListField( + child=serializers.CharField() + ) + pid = serializers.CharField() + successful_job_count = serializers.IntegerField() + failed_job_count = serializers.IntegerField() + total_working_time = serializers.IntegerField() + + def get_state(self, obj): + return obj.get_state() diff --git a/netbox/core/api/urls.py b/netbox/core/api/urls.py index 95ee1896e..3c22f1cf4 100644 --- a/netbox/core/api/urls.py +++ b/netbox/core/api/urls.py @@ -1,6 +1,7 @@ from netbox.api.routers import NetBoxRouter from . import views +app_name = 'core-api' router = NetBoxRouter() router.APIRootView = views.CoreRootView @@ -9,6 +10,8 @@ router.register('data-sources', views.DataSourceViewSet) router.register('data-files', views.DataFileViewSet) router.register('jobs', views.JobViewSet) router.register('object-changes', views.ObjectChangeViewSet) +router.register('background-queues', views.BackgroundQueueViewSet, basename='rqqueue') +router.register('background-workers', views.BackgroundWorkerViewSet, basename='rqworker') +router.register('background-tasks', views.BackgroundTaskViewSet, basename='rqtask') -app_name = 'core-api' urlpatterns = router.urls diff --git a/netbox/core/api/views.py b/netbox/core/api/views.py index b3a024c02..4e5b148fc 100644 --- a/netbox/core/api/views.py +++ b/netbox/core/api/views.py @@ -1,5 +1,8 @@ +from django.http import Http404, HttpResponse from django.shortcuts import get_object_or_404 from django.utils.translation import gettext_lazy as _ +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import extend_schema from rest_framework.decorators import action from rest_framework.exceptions import PermissionDenied from rest_framework.response import Response @@ -10,8 +13,17 @@ from core import filtersets from core.choices import DataSourceStatusChoices from core.jobs import SyncDataSourceJob from core.models import * +from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs, requeue_rq_job, stop_rq_job +from django_rq.queues import get_redis_connection +from django_rq.utils import get_statistics +from django_rq.settings import QUEUES_LIST from netbox.api.metadata import ContentTypeMetadata +from netbox.api.pagination import LimitOffsetListPagination from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet +from rest_framework import viewsets +from rest_framework.permissions import IsAdminUser +from rq.job import Job as RQ_Job +from rq.worker import Worker from . import serializers @@ -71,3 +83,152 @@ class ObjectChangeViewSet(ReadOnlyModelViewSet): queryset = ObjectChange.objects.valid_models() serializer_class = serializers.ObjectChangeSerializer filterset_class = filtersets.ObjectChangeFilterSet + + +class BaseRQViewSet(viewsets.ViewSet): + """ + Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data(). + """ + permission_classes = [IsAdminUser] + serializer_class = None + + def get_data(self): + raise NotImplementedError() + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def list(self, request): + data = self.get_data() + paginator = LimitOffsetListPagination() + data = paginator.paginate_list(data, request) + + serializer = self.serializer_class(data, many=True, context={'request': request}) + return paginator.get_paginated_response(serializer.data) + + def get_serializer(self, *args, **kwargs): + """ + Return the serializer instance that should be used for validating and + deserializing input, and for serializing output. + """ + serializer_class = self.get_serializer_class() + kwargs['context'] = self.get_serializer_context() + return serializer_class(*args, **kwargs) + + +class BackgroundQueueViewSet(BaseRQViewSet): + """ + Retrieve a list of RQ Queues. + Note: Queue names are not URL safe so not returning a detail view. + """ + serializer_class = serializers.BackgroundQueueSerializer + lookup_field = 'name' + lookup_value_regex = r'[\w.@+-]+' + + def get_view_name(self): + return "Background Queues" + + def get_data(self): + return get_statistics(run_maintenance_tasks=True)["queues"] + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def retrieve(self, request, name): + data = self.get_data() + if not data: + raise Http404 + + for queue in data: + if queue['name'] == name: + serializer = self.serializer_class(queue, context={'request': request}) + return Response(serializer.data) + + raise Http404 + + +class BackgroundWorkerViewSet(BaseRQViewSet): + """ + Retrieve a list of RQ Workers. + """ + serializer_class = serializers.BackgroundWorkerSerializer + lookup_field = 'name' + + def get_view_name(self): + return "Background Workers" + + def get_data(self): + config = QUEUES_LIST[0] + return Worker.all(get_redis_connection(config['connection_config'])) + + def retrieve(self, request, name): + # all the RQ queues should use the same connection + config = QUEUES_LIST[0] + workers = Worker.all(get_redis_connection(config['connection_config'])) + worker = next((item for item in workers if item.name == name), None) + if not worker: + raise Http404 + + serializer = serializers.BackgroundWorkerSerializer(worker, context={'request': request}) + return Response(serializer.data) + + +class BackgroundTaskViewSet(BaseRQViewSet): + """ + Retrieve a list of RQ Tasks. + """ + serializer_class = serializers.BackgroundTaskSerializer + + def get_view_name(self): + return "Background Tasks" + + def get_data(self): + return get_rq_jobs() + + def get_task_from_id(self, task_id): + config = QUEUES_LIST[0] + task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config'])) + if not task: + raise Http404 + + return task + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def retrieve(self, request, pk): + """ + Retrieve the details of the specified RQ Task. + """ + task = self.get_task_from_id(pk) + serializer = self.serializer_class(task, context={'request': request}) + return Response(serializer.data) + + @action(methods=["POST"], detail=True) + def delete(self, request, pk): + """ + Delete the specified RQ Task. + """ + delete_rq_job(pk) + return HttpResponse(status=200) + + @action(methods=["POST"], detail=True) + def requeue(self, request, pk): + """ + Requeues the specified RQ Task. + """ + requeue_rq_job(pk) + return HttpResponse(status=200) + + @action(methods=["POST"], detail=True) + def enqueue(self, request, pk): + """ + Enqueues the specified RQ Task. + """ + enqueue_rq_job(pk) + return HttpResponse(status=200) + + @action(methods=["POST"], detail=True) + def stop(self, request, pk): + """ + Stops the specified RQ Task. + """ + stopped_jobs = stop_rq_job(pk) + if len(stopped_jobs) == 1: + return HttpResponse(status=200) + else: + return HttpResponse(status=204) diff --git a/netbox/core/tests/test_api.py b/netbox/core/tests/test_api.py index eeb3bd9c4..d8fb8fd83 100644 --- a/netbox/core/tests/test_api.py +++ b/netbox/core/tests/test_api.py @@ -1,7 +1,14 @@ +import uuid + +from django_rq import get_queue +from django_rq.workers import get_worker from django.urls import reverse from django.utils import timezone +from rq.job import Job as RQ_Job, JobStatus +from rq.registry import FailedJobRegistry, StartedJobRegistry -from utilities.testing import APITestCase, APIViewTestCases +from users.models import Token, User +from utilities.testing import APITestCase, APIViewTestCases, TestCase from ..models import * @@ -91,3 +98,164 @@ class DataFileTest( ), ) DataFile.objects.bulk_create(data_files) + + +class BackgroundTaskTestCase(TestCase): + user_permissions = () + + @staticmethod + def dummy_job_default(): + return "Job finished" + + @staticmethod + def dummy_job_failing(): + raise Exception("Job failed") + + def setUp(self): + """ + Create a user and token for API calls. + """ + # Create the test user and assign permissions + self.user = User.objects.create_user(username='testuser') + self.user.is_staff = True + self.user.is_active = True + self.user.save() + self.token = Token.objects.create(user=self.user) + self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'} + + # Clear all queues prior to running each test + get_queue('default').connection.flushall() + get_queue('high').connection.flushall() + get_queue('low').connection.flushall() + + def test_background_queue_list(self): + url = reverse('core-api:rqqueue-list') + + # Attempt to load view without permission + self.user.is_staff = False + self.user.save() + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 403) + + # Load view with permission + self.user.is_staff = True + self.user.save() + response = self.client.get(url, **self.header) + self.assertEqual(response.status_code, 200) + self.assertIn('default', str(response.content)) + self.assertIn('high', str(response.content)) + self.assertIn('low', str(response.content)) + + def test_background_queue(self): + response = self.client.get(reverse('core-api:rqqueue-detail', args=['default']), **self.header) + self.assertEqual(response.status_code, 200) + self.assertIn('default', str(response.content)) + self.assertIn('oldest_job_timestamp', str(response.content)) + self.assertIn('scheduled_jobs', str(response.content)) + + def test_background_task_list(self): + queue = get_queue('default') + queue.enqueue(self.dummy_job_default) + + response = self.client.get(reverse('core-api:rqtask-list'), **self.header) + self.assertEqual(response.status_code, 200) + self.assertIn('origin', str(response.content)) + self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content)) + + def test_background_task(self): + queue = get_queue('default') + job = queue.enqueue(self.dummy_job_default) + + response = self.client.get(reverse('core-api:rqtask-detail', args=[job.id]), **self.header) + self.assertEqual(response.status_code, 200) + self.assertIn(str(job.id), str(response.content)) + self.assertIn('origin', str(response.content)) + self.assertIn('meta', str(response.content)) + self.assertIn('kwargs', str(response.content)) + + def test_background_task_delete(self): + queue = get_queue('default') + job = queue.enqueue(self.dummy_job_default) + + response = self.client.post(reverse('core-api:rqtask-delete', args=[job.id]), **self.header) + self.assertEqual(response.status_code, 200) + self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection)) + queue = get_queue('default') + self.assertNotIn(job.id, queue.job_ids) + + def test_background_task_requeue(self): + queue = get_queue('default') + + # Enqueue & run a job that will fail + job = queue.enqueue(self.dummy_job_failing) + worker = get_worker('default') + worker.work(burst=True) + self.assertTrue(job.is_failed) + + # Re-enqueue the failed job and check that its status has been reset + response = self.client.post(reverse('core-api:rqtask-requeue', args=[job.id]), **self.header) + self.assertEqual(response.status_code, 200) + job = RQ_Job.fetch(job.id, queue.connection) + self.assertFalse(job.is_failed) + + def test_background_task_enqueue(self): + queue = get_queue('default') + + # Enqueue some jobs that each depends on its predecessor + job = previous_job = None + for _ in range(0, 3): + job = queue.enqueue(self.dummy_job_default, depends_on=previous_job) + previous_job = job + + # Check that the last job to be enqueued has a status of deferred + self.assertIsNotNone(job) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertIsNone(job.enqueued_at) + + # Force-enqueue the deferred job + response = self.client.post(reverse('core-api:rqtask-enqueue', args=[job.id]), **self.header) + self.assertEqual(response.status_code, 200) + + # Check that job's status is updated correctly + job = queue.fetch_job(job.id) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertIsNotNone(job.enqueued_at) + + def test_background_task_stop(self): + queue = get_queue('default') + + worker = get_worker('default') + job = queue.enqueue(self.dummy_job_default) + worker.prepare_job_execution(job) + + self.assertEqual(job.get_status(), JobStatus.STARTED) + response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header) + self.assertEqual(response.status_code, 200) + worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started + started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection) + self.assertEqual(len(started_job_registry), 0) + + canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection) + self.assertEqual(len(canceled_job_registry), 1) + self.assertIn(job.id, canceled_job_registry) + + def test_worker_list(self): + worker1 = get_worker('default', name=uuid.uuid4().hex) + worker1.register_birth() + + worker2 = get_worker('high') + worker2.register_birth() + + response = self.client.get(reverse('core-api:rqworker-list'), **self.header) + self.assertEqual(response.status_code, 200) + self.assertIn(str(worker1.name), str(response.content)) + + def test_worker(self): + worker1 = get_worker('default', name=uuid.uuid4().hex) + worker1.register_birth() + + response = self.client.get(reverse('core-api:rqworker-detail', args=[worker1.name]), **self.header) + self.assertEqual(response.status_code, 200) + self.assertIn(str(worker1.name), str(response.content)) + self.assertIn('birth_date', str(response.content)) + self.assertIn('total_working_time', str(response.content)) diff --git a/netbox/core/utils.py b/netbox/core/utils.py new file mode 100644 index 000000000..26adfdfa2 --- /dev/null +++ b/netbox/core/utils.py @@ -0,0 +1,155 @@ +from django.http import Http404 +from django.utils.translation import gettext_lazy as _ +from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection +from django_rq.settings import QUEUES_MAP, QUEUES_LIST +from django_rq.utils import get_jobs, stop_jobs +from rq import requeue_job +from rq.exceptions import NoSuchJobError +from rq.job import Job as RQ_Job, JobStatus as RQJobStatus +from rq.registry import ( + DeferredJobRegistry, + FailedJobRegistry, + FinishedJobRegistry, + ScheduledJobRegistry, + StartedJobRegistry, +) + +__all__ = ( + 'delete_rq_job', + 'enqueue_rq_job', + 'get_rq_jobs', + 'get_rq_jobs_from_status', + 'requeue_rq_job', + 'stop_rq_job', +) + + +def get_rq_jobs(): + """ + Return a list of all RQ jobs. + """ + jobs = set() + + for queue in QUEUES_LIST: + queue = get_queue(queue['name']) + jobs.update(queue.get_jobs()) + + return list(jobs) + + +def get_rq_jobs_from_status(queue, status): + """ + Return the RQ jobs with the given status. + """ + jobs = [] + + try: + registry_cls = { + RQJobStatus.STARTED: StartedJobRegistry, + RQJobStatus.DEFERRED: DeferredJobRegistry, + RQJobStatus.FINISHED: FinishedJobRegistry, + RQJobStatus.FAILED: FailedJobRegistry, + RQJobStatus.SCHEDULED: ScheduledJobRegistry, + }[status] + except KeyError: + raise Http404 + registry = registry_cls(queue.name, queue.connection) + + job_ids = registry.get_job_ids() + if status != RQJobStatus.DEFERRED: + jobs = get_jobs(queue, job_ids, registry) + else: + # Deferred jobs require special handling + for job_id in job_ids: + try: + jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer)) + except NoSuchJobError: + pass + + if jobs and status == RQJobStatus.SCHEDULED: + for job in jobs: + job.scheduled_at = registry.get_scheduled_time(job) + + return jobs + + +def delete_rq_job(job_id): + """ + Delete the specified RQ job. + """ + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {job_id} not found").format(job_id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + # Remove job id from queue and delete the actual job + queue.connection.lrem(queue.key, 0, job.id) + job.delete() + + +def requeue_rq_job(job_id): + """ + Requeue the specified RQ job. + """ + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {id} not found.").format(id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) + + +def enqueue_rq_job(job_id): + """ + Enqueue the specified RQ job. + """ + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {id} not found.").format(id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + try: + # _enqueue_job is new in RQ 1.14, this is used to enqueue + # job regardless of its dependencies + queue._enqueue_job(job) + except AttributeError: + queue.enqueue_job(job) + + # Remove job from correct registry if needed + if job.get_status() == RQJobStatus.DEFERRED: + registry = DeferredJobRegistry(queue.name, queue.connection) + registry.remove(job) + elif job.get_status() == RQJobStatus.FINISHED: + registry = FinishedJobRegistry(queue.name, queue.connection) + registry.remove(job) + elif job.get_status() == RQJobStatus.SCHEDULED: + registry = ScheduledJobRegistry(queue.name, queue.connection) + registry.remove(job) + + +def stop_rq_job(job_id): + """ + Stop the specified RQ job. + """ + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {job_id} not found").format(job_id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + return stop_jobs(queue, job_id)[0] diff --git a/netbox/core/views.py b/netbox/core/views.py index a9ec5d70a..713807a82 100644 --- a/netbox/core/views.py +++ b/netbox/core/views.py @@ -14,16 +14,13 @@ from django.utils.translation import gettext_lazy as _ from django.views.generic import View from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection from django_rq.settings import QUEUES_MAP, QUEUES_LIST -from django_rq.utils import get_jobs, get_statistics, stop_jobs -from rq import requeue_job +from django_rq.utils import get_statistics from rq.exceptions import NoSuchJobError from rq.job import Job as RQ_Job, JobStatus as RQJobStatus -from rq.registry import ( - DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, StartedJobRegistry, -) from rq.worker import Worker from rq.worker_registration import clean_worker_registry +from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job from netbox.config import get_config, PARAMS from netbox.views import generic from netbox.views.generic.base import BaseObjectView @@ -363,41 +360,12 @@ class BackgroundTaskListView(TableMixin, BaseRQView): table = tables.BackgroundTaskTable def get_table_data(self, request, queue, status): - jobs = [] # Call get_jobs() to returned queued tasks if status == RQJobStatus.QUEUED: return queue.get_jobs() - # For other statuses, determine the registry to list (or raise a 404 for invalid statuses) - try: - registry_cls = { - RQJobStatus.STARTED: StartedJobRegistry, - RQJobStatus.DEFERRED: DeferredJobRegistry, - RQJobStatus.FINISHED: FinishedJobRegistry, - RQJobStatus.FAILED: FailedJobRegistry, - RQJobStatus.SCHEDULED: ScheduledJobRegistry, - }[status] - except KeyError: - raise Http404 - registry = registry_cls(queue.name, queue.connection) - - job_ids = registry.get_job_ids() - if status != RQJobStatus.DEFERRED: - jobs = get_jobs(queue, job_ids, registry) - else: - # Deferred jobs require special handling - for job_id in job_ids: - try: - jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer)) - except NoSuchJobError: - pass - - if jobs and status == RQJobStatus.SCHEDULED: - for job in jobs: - job.scheduled_at = registry.get_scheduled_time(job) - - return jobs + return get_rq_jobs_from_status(queue, status) def get(self, request, queue_index, status): queue = get_queue_by_index(queue_index) @@ -463,19 +431,7 @@ class BackgroundTaskDeleteView(BaseRQView): form = ConfirmationForm(request.POST) if form.is_valid(): - # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {job_id} not found").format(job_id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - # Remove job id from queue and delete the actual job - queue.connection.lrem(queue.key, 0, job.id) - job.delete() + delete_rq_job(job_id) messages.success(request, _('Job {id} has been deleted.').format(id=job_id)) else: messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0])) @@ -486,17 +442,7 @@ class BackgroundTaskDeleteView(BaseRQView): class BackgroundTaskRequeueView(BaseRQView): def get(self, request, job_id): - # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {id} not found.").format(id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) + requeue_rq_job(job_id) messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id)) return redirect(reverse('core:background_task', args=[job_id])) @@ -505,33 +451,7 @@ class BackgroundTaskEnqueueView(BaseRQView): def get(self, request, job_id): # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {id} not found.").format(id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - try: - # _enqueue_job is new in RQ 1.14, this is used to enqueue - # job regardless of its dependencies - queue._enqueue_job(job) - except AttributeError: - queue.enqueue_job(job) - - # Remove job from correct registry if needed - if job.get_status() == RQJobStatus.DEFERRED: - registry = DeferredJobRegistry(queue.name, queue.connection) - registry.remove(job) - elif job.get_status() == RQJobStatus.FINISHED: - registry = FinishedJobRegistry(queue.name, queue.connection) - registry.remove(job) - elif job.get_status() == RQJobStatus.SCHEDULED: - registry = ScheduledJobRegistry(queue.name, queue.connection) - registry.remove(job) - + enqueue_rq_job(job_id) messages.success(request, _('Job {id} has been enqueued.').format(id=job_id)) return redirect(reverse('core:background_task', args=[job_id])) @@ -539,17 +459,7 @@ class BackgroundTaskEnqueueView(BaseRQView): class BackgroundTaskStopView(BaseRQView): def get(self, request, job_id): - # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {job_id} not found").format(job_id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - stopped_jobs = stop_jobs(queue, job_id)[0] + stopped_jobs = stop_rq_job(job_id) if len(stopped_jobs) == 1: messages.success(request, _('Job {id} has been stopped.').format(id=job_id)) else: diff --git a/netbox/netbox/api/pagination.py b/netbox/netbox/api/pagination.py index 5ecade264..f47434ebd 100644 --- a/netbox/netbox/api/pagination.py +++ b/netbox/netbox/api/pagination.py @@ -83,3 +83,28 @@ class StripCountAnnotationsPaginator(OptionalLimitOffsetPagination): cloned_queryset.query.annotations.clear() return cloned_queryset.count() + + +class LimitOffsetListPagination(LimitOffsetPagination): + """ + DRF LimitOffset Paginator but for list instead of queryset + """ + count = 0 + offset = 0 + + def paginate_list(self, data, request, view=None): + self.request = request + self.limit = self.get_limit(request) + self.count = len(data) + self.offset = self.get_offset(request) + + if self.limit is None: + self.limit = self.count + + if self.count == 0 or self.offset > self.count: + return [] + + if self.count > self.limit and self.template is not None: + self.display_page_controls = True + + return data[self.offset:self.offset + self.limit]