Introduce reusable BackgroundJob framework

A new abstract class can be used to implement job function classes. It
handles the necessary logic for starting and stopping jobs, including
exception handling and rescheduling of recurring jobs.

This commit also includes the migration of data source jobs to the new
framework.
This commit is contained in:
Alexander Haase 2024-06-17 23:02:58 +02:00
parent 388ba3d736
commit 5fab8e4839
3 changed files with 88 additions and 22 deletions

View File

@ -1,33 +1,32 @@
import logging
from netbox.search.backends import search_backend
from .choices import *
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException
from utilities.jobs import BackgroundJob
logger = logging.getLogger(__name__)
def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(BackgroundJob):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)
try:
job.start()
datasource.sync()
@classmethod
def run(cls, job, *args, **kwargs):
from netbox.search.backends import search_backend
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
datasource = DataSource.objects.get(pk=job.object_id)
job.terminate()
try:
datasource.sync()
except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e

View File

@ -12,7 +12,6 @@ from django.core.validators import RegexValidator
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
@ -162,8 +161,8 @@ class DataSource(JobsMixin, PrimaryModel):
DataSource.objects.filter(pk=self.pk).update(status=self.status)
# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
from ..jobs import SyncDataSourceJob
return SyncDataSourceJob.enqueue(
instance=self,
user=request.user
)

68
netbox/utilities/jobs.py Normal file
View File

@ -0,0 +1,68 @@
import logging
from abc import ABC, abstractmethod
from datetime import timedelta
from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices
from core.models import Job
class BackgroundJob(ABC):
"""
Background Job helper class.
This class handles the execution of a background job. It is responsible for maintaining its state, reporting errors,
and scheduling recurring jobs.
"""
@classmethod
@abstractmethod
def run(cls, *args, **kwargs) -> None:
"""
Run the job.
A `BackgroundJob` class needs to implement this method to execute all commands of the job.
"""
pass
@classmethod
def handle(cls, job, *args, **kwargs):
"""
Handle the execution of a `BackgroundJob`.
This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval'.
"""
try:
job.start()
cls.run(job, *args, **kwargs)
job.terminate()
except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if type(e) is JobTimeoutException:
logging.error(e)
# If the executed job is a periodic job, schedule its next execution at the specified interval.
finally:
if job.interval:
new_scheduled_time = (job.scheduled or job.started) + timedelta(minutes=job.interval)
cls.enqueue(
instance=job.object,
name=job.name,
user=job.user,
schedule_at=new_scheduled_time,
interval=job.interval,
**kwargs,
)
@classmethod
def enqueue(cls, *args, **kwargs):
"""
Enqueue a new `BackgroundJob`.
This method is a wrapper of `Job.enqueue` using `handle()` as function callback. See its documentation for
parameters.
"""
return Job.enqueue(cls.handle, *args, **kwargs)