7848 Task enqueue, requeue, stop

This commit is contained in:
Arthur Hanson 2024-11-06 13:49:36 -08:00
parent 088ab820cd
commit afb18d4ea9
4 changed files with 163 additions and 101 deletions

View File

@ -13,20 +13,6 @@ router.register('data-files', views.DataFileViewSet)
router.register('jobs', views.JobViewSet) router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet) router.register('object-changes', views.ObjectChangeViewSet)
# Background Tasks
"""
router.register('background-queues/', views.BackgroundQueueListView.as_view(), name='background_queue_list'),
router.register('background-queues/<int:queue_index>/<str:status>/', views.BackgroundTaskListView.as_view(), name='background_task_list'),
router.register('background-tasks/<str:job_id>/', views.BackgroundTaskView.as_view(), name='background_task'),
router.register('background-tasks/<str:job_id>/delete/', views.BackgroundTaskDeleteView.as_view(), name='background_task_delete'),
router.register('background-tasks/<str:job_id>/requeue/', views.BackgroundTaskRequeueView.as_view(), name='background_task_requeue'),
router.register('background-tasks/<str:job_id>/enqueue/', views.BackgroundTaskEnqueueView.as_view(), name='background_task_enqueue'),
router.register('background-tasks/<str:job_id>/stop/', views.BackgroundTaskStopView.as_view(), name='background_task_stop'),
router.register('background-workers/<int:queue_index>/', views.WorkerListView.as_view(), name='worker_list'),
router.register('background-workers/<str:key>/', views.WorkerView.as_view(), name='worker'),
"""
urlpatterns = ( urlpatterns = (
path('background-queues/', views.QueueListView.as_view()), path('background-queues/', views.QueueListView.as_view()),
path('background-workers/', views.WorkerListView.as_view()), path('background-workers/', views.WorkerListView.as_view()),
@ -38,7 +24,10 @@ urlpatterns = (
path('background-tasks/<str:queue_name>/started/', views.StartedTaskListView.as_view()), path('background-tasks/<str:queue_name>/started/', views.StartedTaskListView.as_view()),
path('background-tasks/<str:queue_name>/queued/', views.QueuedTaskListView.as_view()), path('background-tasks/<str:queue_name>/queued/', views.QueuedTaskListView.as_view()),
path('background-task/<str:task_id>/', views.TaskDetailView.as_view()), path('background-task/<str:task_id>/', views.TaskDetailView.as_view()),
# path('background-tasks/<str:queue_name>/workers', views.BackgroundWorkerListView.as_view()), path('background-task/<str:task_id>/delete/', views.TaskDeleteView.as_view()),
path('background-task/<str:task_id>/requeue/', views.TaskRequeueView.as_view()),
path('background-task/<str:task_id>/enqueue/', views.TaskEnqueueView.as_view()),
path('background-task/<str:task_id>/stop/', views.TaskStopView.as_view()),
path('', include(router.urls)), path('', include(router.urls)),
) )

View File

@ -1,4 +1,4 @@
from django.http import Http404 from django.http import Http404, HttpResponse
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from drf_spectacular.types import OpenApiTypes from drf_spectacular.types import OpenApiTypes
@ -14,7 +14,7 @@ from core import filtersets
from core.choices import DataSourceStatusChoices from core.choices import DataSourceStatusChoices
from core.jobs import SyncDataSourceJob from core.jobs import SyncDataSourceJob
from core.models import * from core.models import *
from core.utils import get_rq_jobs_from_status from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
from django_rq.queues import get_queue, get_redis_connection from django_rq.queues import get_queue, get_redis_connection
from django_rq.utils import get_statistics from django_rq.utils import get_statistics
from django_rq.settings import QUEUES_LIST from django_rq.settings import QUEUES_LIST
@ -152,7 +152,22 @@ class TaskListView(APIView):
return Response(serializer.data) return Response(serializer.data)
class TaskDetailView(APIView): class BaseTaskView(APIView):
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Task"
def get_task_from_id(self, task_id):
config = QUEUES_LIST[0]
task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config']))
if not task:
raise Http404
return task
class TaskDetailView(BaseTaskView):
permission_classes = [IsAdminUser] permission_classes = [IsAdminUser]
def get_view_name(self): def get_view_name(self):
@ -160,19 +175,65 @@ class TaskDetailView(APIView):
@extend_schema(responses={200: OpenApiTypes.OBJECT}) @extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None): def get(self, request, task_id, format=None):
""" task = self.get_task_from_id(task_id)
Return the UserConfig for the currently authenticated User.
"""
config = QUEUES_LIST[0]
task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config']))
if not task:
raise Http404
serializer = serializers.BackgroundTaskSerializer(task) serializer = serializers.BackgroundTaskSerializer(task)
return Response(serializer.data) return Response(serializer.data)
class BaseTaskView(APIView): class TaskDeleteView(APIView):
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Task"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
delete_rq_job(task_id)
return HttpResponse(status=204)
class TaskRequeueView(APIView):
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Task"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
requeue_rq_job(task_id)
return HttpResponse(status=204)
class TaskEnqueueView(APIView):
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Task"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
enqueue_rq_job(task_id)
return HttpResponse(status=204)
class TaskStopView(APIView):
permission_classes = [IsAdminUser]
def get_view_name(self):
return "Background Task"
@extend_schema(responses={200: OpenApiTypes.OBJECT})
def get(self, request, task_id, format=None):
stopped_jobs = stop_rq_job(job_id)
if len(stopped_jobs) == 1:
pass
# messages.success(request, _('Job {id} has been stopped.').format(id=job_id))
else:
pass
# messages.error(request, _('Failed to stop job {id}').format(id=job_id))
class BaseTaskListView(APIView):
permission_classes = [IsAdminUser] permission_classes = [IsAdminUser]
registry = None registry = None
@ -191,35 +252,35 @@ class BaseTaskView(APIView):
return Response(serializer.data) return Response(serializer.data)
class DeferredTaskListView(BaseTaskView): class DeferredTaskListView(BaseTaskListView):
registry = "deferred" registry = "deferred"
def get_view_name(self): def get_view_name(self):
return "Deferred Tasks" return "Deferred Tasks"
class FailedTaskListView(BaseTaskView): class FailedTaskListView(BaseTaskListView):
registry = "failed" registry = "failed"
def get_view_name(self): def get_view_name(self):
return "Failed Tasks" return "Failed Tasks"
class FinishedTaskListView(BaseTaskView): class FinishedTaskListView(BaseTaskListView):
registry = "finished" registry = "finished"
def get_view_name(self): def get_view_name(self):
return "Finished Tasks" return "Finished Tasks"
class StartedTaskListView(BaseTaskView): class StartedTaskListView(BaseTaskListView):
registry = "started" registry = "started"
def get_view_name(self): def get_view_name(self):
return "Started Tasks" return "Started Tasks"
class QueuedTaskListView(BaseTaskView): class QueuedTaskListView(BaseTaskListView):
registry = "queued" registry = "queued"
@extend_schema(responses={200: OpenApiTypes.OBJECT}) @extend_schema(responses={200: OpenApiTypes.OBJECT})

View File

@ -1,5 +1,9 @@
from django.http import Http404 from django.http import Http404
from django_rq.utils import get_jobs from django.utils.translation import gettext_lazy as _
from django_rq.queues import get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.utils import get_jobs, stop_jobs
from rq import requeue_job
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
from rq.registry import ( from rq.registry import (
@ -42,3 +46,73 @@ def get_rq_jobs_from_status(queue, status):
job.scheduled_at = registry.get_scheduled_time(job) job.scheduled_at = registry.get_scheduled_time(job)
return jobs return jobs
def delete_rq_job(job_id):
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
# Remove job id from queue and delete the actual job
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
def requeue_rq_job(job_id):
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {id} not found.").format(id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
def enqueue_rq_job(job_id):
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {id} not found.").format(id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
try:
# _enqueue_job is new in RQ 1.14, this is used to enqueue
# job regardless of its dependencies
queue._enqueue_job(job)
except AttributeError:
queue.enqueue_job(job)
# Remove job from correct registry if needed
if job.get_status() == RQJobStatus.DEFERRED:
registry = DeferredJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.FINISHED:
registry = FinishedJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.SCHEDULED:
registry = ScheduledJobRegistry(queue.name, queue.connection)
registry.remove(job)
def stop_rq_job(job_id):
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
return stop_jobs(queue, job_id)[0]

View File

@ -14,17 +14,13 @@ from django.utils.translation import gettext_lazy as _
from django.views.generic import View from django.views.generic import View
from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_MAP, QUEUES_LIST from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.utils import get_statistics, stop_jobs from django_rq.utils import get_statistics
from rq import requeue_job
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
from rq.registry import (
DeferredJobRegistry, FinishedJobRegistry, ScheduledJobRegistry,
)
from rq.worker import Worker from rq.worker import Worker
from rq.worker_registration import clean_worker_registry from rq.worker_registration import clean_worker_registry
from core.utils import get_rq_jobs_from_status from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
from netbox.config import get_config, PARAMS from netbox.config import get_config, PARAMS
from netbox.views import generic from netbox.views import generic
from netbox.views.generic.base import BaseObjectView from netbox.views.generic.base import BaseObjectView
@ -420,19 +416,7 @@ class BackgroundTaskDeleteView(BaseRQView):
form = ConfirmationForm(request.POST) form = ConfirmationForm(request.POST)
if form.is_valid(): if form.is_valid():
# all the RQ queues should use the same connection delete_rq_job(job_id)
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
# Remove job id from queue and delete the actual job
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
messages.success(request, _('Job {id} has been deleted.').format(id=job_id)) messages.success(request, _('Job {id} has been deleted.').format(id=job_id))
else: else:
messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0])) messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0]))
@ -443,17 +427,7 @@ class BackgroundTaskDeleteView(BaseRQView):
class BackgroundTaskRequeueView(BaseRQView): class BackgroundTaskRequeueView(BaseRQView):
def get(self, request, job_id): def get(self, request, job_id):
# all the RQ queues should use the same connection requeue_rq_job(job_id)
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {id} not found.").format(id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id)) messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id))
return redirect(reverse('core:background_task', args=[job_id])) return redirect(reverse('core:background_task', args=[job_id]))
@ -462,33 +436,7 @@ class BackgroundTaskEnqueueView(BaseRQView):
def get(self, request, job_id): def get(self, request, job_id):
# all the RQ queues should use the same connection # all the RQ queues should use the same connection
config = QUEUES_LIST[0] enqueue_rq_job(job_id)
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {id} not found.").format(id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
try:
# _enqueue_job is new in RQ 1.14, this is used to enqueue
# job regardless of its dependencies
queue._enqueue_job(job)
except AttributeError:
queue.enqueue_job(job)
# Remove job from correct registry if needed
if job.get_status() == RQJobStatus.DEFERRED:
registry = DeferredJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.FINISHED:
registry = FinishedJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.SCHEDULED:
registry = ScheduledJobRegistry(queue.name, queue.connection)
registry.remove(job)
messages.success(request, _('Job {id} has been enqueued.').format(id=job_id)) messages.success(request, _('Job {id} has been enqueued.').format(id=job_id))
return redirect(reverse('core:background_task', args=[job_id])) return redirect(reverse('core:background_task', args=[job_id]))
@ -496,17 +444,7 @@ class BackgroundTaskEnqueueView(BaseRQView):
class BackgroundTaskStopView(BaseRQView): class BackgroundTaskStopView(BaseRQView):
def get(self, request, job_id): def get(self, request, job_id):
# all the RQ queues should use the same connection stopped_jobs = stop_rq_job(job_id)
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
stopped_jobs = stop_jobs(queue, job_id)[0]
if len(stopped_jobs) == 1: if len(stopped_jobs) == 1:
messages.success(request, _('Job {id} has been stopped.').format(id=job_id)) messages.success(request, _('Job {id} has been stopped.').format(id=job_id))
else: else: