Fixes #19806: Introduce JobFailed exception to allow marking background jobs as failed (#19807)

This commit is contained in:
Jeremy Stretch 2025-07-02 15:02:49 -04:00 committed by GitHub
parent ea4c205a37
commit 3b8841ee3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 38 additions and 11 deletions

View File

@ -2,9 +2,9 @@
NetBox includes the ability to execute certain functions as background tasks. These include:
* [Report](../customization/reports.md) execution
* [Custom script](../customization/custom-scripts.md) execution
* Synchronization of [remote data sources](../integrations/synchronized-data.md)
* Housekeeping tasks
Additionally, NetBox plugins can enqueue their own background tasks. This is accomplished using the [Job model](../models/core/job.md). Background tasks are executed by the `rqworker` process(es).

View File

@ -15,7 +15,6 @@ A background job implements a basic [Job](../../models/core/job.md) executor for
```python title="jobs.py"
from netbox.jobs import JobRunner
class MyTestJob(JobRunner):
class Meta:
name = "My Test Job"
@ -25,6 +24,8 @@ class MyTestJob(JobRunner):
# your logic goes here
```
Completed jobs will have their status updated to "completed" by default, or "errored" if an unhandled exception was raised by the `run()` method. To intentionally mark a job as failed, raise the `core.exceptions.JobFailed` exception. (Note that "failed" differs from "errored" in that a failure may be expected under certain conditions, whereas an error is not.)
You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.
!!! tip

View File

@ -1,9 +1,19 @@
from django.core.exceptions import ImproperlyConfigured
class SyncError(Exception):
pass
__all__ = (
'IncompatiblePluginError',
'JobFailed',
'SyncError',
)
class IncompatiblePluginError(ImproperlyConfigured):
pass
class JobFailed(Exception):
pass
class SyncError(Exception):
pass

View File

@ -187,15 +187,14 @@ class Job(models.Model):
"""
Mark the job as completed, optionally specifying a particular termination status.
"""
valid_statuses = JobStatusChoices.TERMINAL_STATE_CHOICES
if status not in valid_statuses:
if status not in JobStatusChoices.TERMINAL_STATE_CHOICES:
raise ValueError(
_("Invalid status for job termination. Choices are: {choices}").format(
choices=', '.join(valid_statuses)
choices=', '.join(JobStatusChoices.TERMINAL_STATE_CHOICES)
)
)
# Mark the job as completed
# Set the job's status and completion time
self.status = status
if error:
self.error = error

View File

@ -8,6 +8,7 @@ from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices
from core.exceptions import JobFailed
from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS
from netbox.registry import registry
@ -73,15 +74,21 @@ class JobRunner(ABC):
This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval`.
"""
logger = logging.getLogger('netbox.jobs')
try:
job.start()
cls(job).run(*args, **kwargs)
job.terminate()
except JobFailed:
logger.warning(f"Job {job} failed")
job.terminate(status=JobStatusChoices.STATUS_FAILED)
except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
if type(e) is JobTimeoutException:
logging.error(e)
logger.error(e)
# If the executed job is a periodic job, schedule its next execution at the specified interval.
finally:

View File

@ -7,11 +7,15 @@ from django_rq import get_queue
from ..jobs import *
from core.models import DataSource, Job
from core.choices import JobStatusChoices
from core.exceptions import JobFailed
from utilities.testing import disable_warnings
class TestJobRunner(JobRunner):
def run(self, *args, **kwargs):
pass
if kwargs.get('make_fail', False):
raise JobFailed()
class JobRunnerTestCase(TestCase):
@ -49,6 +53,12 @@ class JobRunnerTest(JobRunnerTestCase):
self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
def test_handle_failed(self):
with disable_warnings('netbox.jobs'):
job = TestJobRunner.enqueue(immediate=True, make_fail=True)
self.assertEqual(job.status, JobStatusChoices.STATUS_FAILED)
def test_handle_errored(self):
class ErroredJobRunner(TestJobRunner):
EXP = Exception('Test error')