diff --git a/netbox/core/api/urls.py b/netbox/core/api/urls.py index 1373e0785..8f6eb58e4 100644 --- a/netbox/core/api/urls.py +++ b/netbox/core/api/urls.py @@ -13,20 +13,6 @@ router.register('data-files', views.DataFileViewSet) router.register('jobs', views.JobViewSet) router.register('object-changes', views.ObjectChangeViewSet) - -# Background Tasks -""" -router.register('background-queues/', views.BackgroundQueueListView.as_view(), name='background_queue_list'), -router.register('background-queues///', views.BackgroundTaskListView.as_view(), name='background_task_list'), -router.register('background-tasks//', views.BackgroundTaskView.as_view(), name='background_task'), -router.register('background-tasks//delete/', views.BackgroundTaskDeleteView.as_view(), name='background_task_delete'), -router.register('background-tasks//requeue/', views.BackgroundTaskRequeueView.as_view(), name='background_task_requeue'), -router.register('background-tasks//enqueue/', views.BackgroundTaskEnqueueView.as_view(), name='background_task_enqueue'), -router.register('background-tasks//stop/', views.BackgroundTaskStopView.as_view(), name='background_task_stop'), -router.register('background-workers//', views.WorkerListView.as_view(), name='worker_list'), -router.register('background-workers//', views.WorkerView.as_view(), name='worker'), -""" - urlpatterns = ( path('background-queues/', views.QueueListView.as_view()), path('background-workers/', views.WorkerListView.as_view()), @@ -38,7 +24,10 @@ urlpatterns = ( path('background-tasks//started/', views.StartedTaskListView.as_view()), path('background-tasks//queued/', views.QueuedTaskListView.as_view()), path('background-task//', views.TaskDetailView.as_view()), - # path('background-tasks//workers', views.BackgroundWorkerListView.as_view()), + path('background-task//delete/', views.TaskDeleteView.as_view()), + path('background-task//requeue/', views.TaskRequeueView.as_view()), + path('background-task//enqueue/', views.TaskEnqueueView.as_view()), + path('background-task//stop/', views.TaskStopView.as_view()), path('', include(router.urls)), ) diff --git a/netbox/core/api/views.py b/netbox/core/api/views.py index ea6a381fb..8d28287cb 100644 --- a/netbox/core/api/views.py +++ b/netbox/core/api/views.py @@ -1,4 +1,4 @@ -from django.http import Http404 +from django.http import Http404, HttpResponse from django.shortcuts import get_object_or_404 from django.utils.translation import gettext_lazy as _ from drf_spectacular.types import OpenApiTypes @@ -14,7 +14,7 @@ from core import filtersets from core.choices import DataSourceStatusChoices from core.jobs import SyncDataSourceJob from core.models import * -from core.utils import get_rq_jobs_from_status +from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job from django_rq.queues import get_queue, get_redis_connection from django_rq.utils import get_statistics from django_rq.settings import QUEUES_LIST @@ -152,7 +152,22 @@ class TaskListView(APIView): return Response(serializer.data) -class TaskDetailView(APIView): +class BaseTaskView(APIView): + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Task" + + def get_task_from_id(self, task_id): + config = QUEUES_LIST[0] + task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config'])) + if not task: + raise Http404 + + return task + + +class TaskDetailView(BaseTaskView): permission_classes = [IsAdminUser] def get_view_name(self): @@ -160,19 +175,65 @@ class TaskDetailView(APIView): @extend_schema(responses={200: OpenApiTypes.OBJECT}) def get(self, request, task_id, format=None): - """ - Return the UserConfig for the currently authenticated User. - """ - config = QUEUES_LIST[0] - task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config'])) - if not task: - raise Http404 - + task = self.get_task_from_id(task_id) serializer = serializers.BackgroundTaskSerializer(task) return Response(serializer.data) -class BaseTaskView(APIView): +class TaskDeleteView(APIView): + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Task" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def get(self, request, task_id, format=None): + delete_rq_job(task_id) + return HttpResponse(status=204) + + +class TaskRequeueView(APIView): + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Task" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def get(self, request, task_id, format=None): + requeue_rq_job(task_id) + return HttpResponse(status=204) + + +class TaskEnqueueView(APIView): + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Task" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def get(self, request, task_id, format=None): + enqueue_rq_job(task_id) + return HttpResponse(status=204) + + +class TaskStopView(APIView): + permission_classes = [IsAdminUser] + + def get_view_name(self): + return "Background Task" + + @extend_schema(responses={200: OpenApiTypes.OBJECT}) + def get(self, request, task_id, format=None): + stopped_jobs = stop_rq_job(job_id) + if len(stopped_jobs) == 1: + pass + # messages.success(request, _('Job {id} has been stopped.').format(id=job_id)) + else: + pass + # messages.error(request, _('Failed to stop job {id}').format(id=job_id)) + + +class BaseTaskListView(APIView): permission_classes = [IsAdminUser] registry = None @@ -191,35 +252,35 @@ class BaseTaskView(APIView): return Response(serializer.data) -class DeferredTaskListView(BaseTaskView): +class DeferredTaskListView(BaseTaskListView): registry = "deferred" def get_view_name(self): return "Deferred Tasks" -class FailedTaskListView(BaseTaskView): +class FailedTaskListView(BaseTaskListView): registry = "failed" def get_view_name(self): return "Failed Tasks" -class FinishedTaskListView(BaseTaskView): +class FinishedTaskListView(BaseTaskListView): registry = "finished" def get_view_name(self): return "Finished Tasks" -class StartedTaskListView(BaseTaskView): +class StartedTaskListView(BaseTaskListView): registry = "started" def get_view_name(self): return "Started Tasks" -class QueuedTaskListView(BaseTaskView): +class QueuedTaskListView(BaseTaskListView): registry = "queued" @extend_schema(responses={200: OpenApiTypes.OBJECT}) diff --git a/netbox/core/utils.py b/netbox/core/utils.py index e652c3f69..653287811 100644 --- a/netbox/core/utils.py +++ b/netbox/core/utils.py @@ -1,5 +1,9 @@ from django.http import Http404 -from django_rq.utils import get_jobs +from django.utils.translation import gettext_lazy as _ +from django_rq.queues import get_queue_by_index, get_redis_connection +from django_rq.settings import QUEUES_MAP, QUEUES_LIST +from django_rq.utils import get_jobs, stop_jobs +from rq import requeue_job from rq.exceptions import NoSuchJobError from rq.job import Job as RQ_Job, JobStatus as RQJobStatus from rq.registry import ( @@ -42,3 +46,73 @@ def get_rq_jobs_from_status(queue, status): job.scheduled_at = registry.get_scheduled_time(job) return jobs + + +def delete_rq_job(job_id): + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {job_id} not found").format(job_id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + # Remove job id from queue and delete the actual job + queue.connection.lrem(queue.key, 0, job.id) + job.delete() + + +def requeue_rq_job(job_id): + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {id} not found.").format(id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) + + +def enqueue_rq_job(job_id): + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {id} not found.").format(id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + try: + # _enqueue_job is new in RQ 1.14, this is used to enqueue + # job regardless of its dependencies + queue._enqueue_job(job) + except AttributeError: + queue.enqueue_job(job) + + # Remove job from correct registry if needed + if job.get_status() == RQJobStatus.DEFERRED: + registry = DeferredJobRegistry(queue.name, queue.connection) + registry.remove(job) + elif job.get_status() == RQJobStatus.FINISHED: + registry = FinishedJobRegistry(queue.name, queue.connection) + registry.remove(job) + elif job.get_status() == RQJobStatus.SCHEDULED: + registry = ScheduledJobRegistry(queue.name, queue.connection) + registry.remove(job) + + +def stop_rq_job(job_id): + config = QUEUES_LIST[0] + try: + job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) + except NoSuchJobError: + raise Http404(_("Job {job_id} not found").format(job_id=job_id)) + + queue_index = QUEUES_MAP[job.origin] + queue = get_queue_by_index(queue_index) + + return stop_jobs(queue, job_id)[0] diff --git a/netbox/core/views.py b/netbox/core/views.py index 34b93ab58..66b6f901d 100644 --- a/netbox/core/views.py +++ b/netbox/core/views.py @@ -14,17 +14,13 @@ from django.utils.translation import gettext_lazy as _ from django.views.generic import View from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection from django_rq.settings import QUEUES_MAP, QUEUES_LIST -from django_rq.utils import get_statistics, stop_jobs -from rq import requeue_job +from django_rq.utils import get_statistics from rq.exceptions import NoSuchJobError from rq.job import Job as RQ_Job, JobStatus as RQJobStatus -from rq.registry import ( - DeferredJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, -) from rq.worker import Worker from rq.worker_registration import clean_worker_registry -from core.utils import get_rq_jobs_from_status +from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job from netbox.config import get_config, PARAMS from netbox.views import generic from netbox.views.generic.base import BaseObjectView @@ -420,19 +416,7 @@ class BackgroundTaskDeleteView(BaseRQView): form = ConfirmationForm(request.POST) if form.is_valid(): - # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {job_id} not found").format(job_id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - # Remove job id from queue and delete the actual job - queue.connection.lrem(queue.key, 0, job.id) - job.delete() + delete_rq_job(job_id) messages.success(request, _('Job {id} has been deleted.').format(id=job_id)) else: messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0])) @@ -443,17 +427,7 @@ class BackgroundTaskDeleteView(BaseRQView): class BackgroundTaskRequeueView(BaseRQView): def get(self, request, job_id): - # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {id} not found.").format(id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - requeue_job(job_id, connection=queue.connection, serializer=queue.serializer) + requeue_rq_job(job_id) messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id)) return redirect(reverse('core:background_task', args=[job_id])) @@ -462,33 +436,7 @@ class BackgroundTaskEnqueueView(BaseRQView): def get(self, request, job_id): # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {id} not found.").format(id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - try: - # _enqueue_job is new in RQ 1.14, this is used to enqueue - # job regardless of its dependencies - queue._enqueue_job(job) - except AttributeError: - queue.enqueue_job(job) - - # Remove job from correct registry if needed - if job.get_status() == RQJobStatus.DEFERRED: - registry = DeferredJobRegistry(queue.name, queue.connection) - registry.remove(job) - elif job.get_status() == RQJobStatus.FINISHED: - registry = FinishedJobRegistry(queue.name, queue.connection) - registry.remove(job) - elif job.get_status() == RQJobStatus.SCHEDULED: - registry = ScheduledJobRegistry(queue.name, queue.connection) - registry.remove(job) - + enqueue_rq_job(job_id) messages.success(request, _('Job {id} has been enqueued.').format(id=job_id)) return redirect(reverse('core:background_task', args=[job_id])) @@ -496,17 +444,7 @@ class BackgroundTaskEnqueueView(BaseRQView): class BackgroundTaskStopView(BaseRQView): def get(self, request, job_id): - # all the RQ queues should use the same connection - config = QUEUES_LIST[0] - try: - job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),) - except NoSuchJobError: - raise Http404(_("Job {job_id} not found").format(job_id=job_id)) - - queue_index = QUEUES_MAP[job.origin] - queue = get_queue_by_index(queue_index) - - stopped_jobs = stop_jobs(queue, job_id)[0] + stopped_jobs = stop_rq_job(job_id) if len(stopped_jobs) == 1: messages.success(request, _('Job {id} has been stopped.').format(id=job_id)) else: