Files
netbox/netbox/core/api/serializers_/tasks.py
Arthur Hanson 49d3ea45dc 7848 Add RQ API (#17938)
* 7848 Add Background Tasks (RQ) to API

* 7848 Tasks

* 7848 cleanup

* 7848 add worker support

* 7848 switch to APIView

* 7848 Task detail view

* 7848 Task enqueue, requeue, stop

* 7848 Task enqueue, requeue, stop

* 7848 Task enqueue, requeue, stop

* 7848 tests

* 7848 tests

* 7848 OpenAPI doc generation

* 7848 OpenAPI doc generation

* 7848 review changes

* 7848 viewset

* 7848 viewset

* 7848 fix tests

* 7848 more viewsets

* 7848 fix docstring

* 7848 review comments

* 7848 review comments - get all tasks

* 7848 queue detail view

* 7848 cleanup

* 7848 cleanup

* 7848 cleanup

* 7848 cleanup

* Rename viewsets for consistency w/serializers

* Misc cleanup

* 7848 review changes

* 7848 review changes

* 7848 add test

* 7848 queue detail view

* 7848 fix tests

* 7848 fix the spectacular test failure

* 7848 fix the spectacular test failure

* Misc cleanup

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
2024-11-26 10:01:06 -05:00

88 lines
3.0 KiB
Python

from rest_framework import serializers
from rest_framework.reverse import reverse
__all__ = (
'BackgroundTaskSerializer',
'BackgroundQueueSerializer',
'BackgroundWorkerSerializer',
)
class BackgroundTaskSerializer(serializers.Serializer):
id = serializers.CharField()
url = serializers.HyperlinkedIdentityField(
view_name='core-api:rqtask-detail',
lookup_field='id',
lookup_url_kwarg='pk'
)
description = serializers.CharField()
origin = serializers.CharField()
func_name = serializers.CharField()
args = serializers.ListField(child=serializers.CharField())
kwargs = serializers.DictField()
result = serializers.CharField()
timeout = serializers.IntegerField()
result_ttl = serializers.IntegerField()
created_at = serializers.DateTimeField()
enqueued_at = serializers.DateTimeField()
started_at = serializers.DateTimeField()
ended_at = serializers.DateTimeField()
worker_name = serializers.CharField()
position = serializers.SerializerMethodField()
status = serializers.SerializerMethodField()
meta = serializers.DictField()
last_heartbeat = serializers.CharField()
is_finished = serializers.BooleanField()
is_queued = serializers.BooleanField()
is_failed = serializers.BooleanField()
is_started = serializers.BooleanField()
is_deferred = serializers.BooleanField()
is_canceled = serializers.BooleanField()
is_scheduled = serializers.BooleanField()
is_stopped = serializers.BooleanField()
def get_position(self, obj) -> int:
return obj.get_position()
def get_status(self, obj) -> str:
return obj.get_status()
class BackgroundQueueSerializer(serializers.Serializer):
name = serializers.CharField()
url = serializers.SerializerMethodField()
jobs = serializers.IntegerField()
oldest_job_timestamp = serializers.CharField()
index = serializers.IntegerField()
scheduler_pid = serializers.CharField()
workers = serializers.IntegerField()
finished_jobs = serializers.IntegerField()
started_jobs = serializers.IntegerField()
deferred_jobs = serializers.IntegerField()
failed_jobs = serializers.IntegerField()
scheduled_jobs = serializers.IntegerField()
def get_url(self, obj):
return reverse('core-api:rqqueue-detail', args=[obj['name']], request=self.context.get("request"))
class BackgroundWorkerSerializer(serializers.Serializer):
name = serializers.CharField()
url = serializers.HyperlinkedIdentityField(
view_name='core-api:rqworker-detail',
lookup_field='name'
)
state = serializers.SerializerMethodField()
birth_date = serializers.DateTimeField()
queue_names = serializers.ListField(
child=serializers.CharField()
)
pid = serializers.CharField()
successful_job_count = serializers.IntegerField()
failed_job_count = serializers.IntegerField()
total_working_time = serializers.IntegerField()
def get_state(self, obj):
return obj.get_state()