mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-13 16:47:34 -06:00
15692: Introduce background jobs (#16927)
* Introduce reusable BackgroundJob framework A new abstract class can be used to implement job function classes. It handles the necessary logic for starting and stopping jobs, including exception handling and rescheduling of recurring jobs. This commit also includes the migration of data source jobs to the new framework. * Restore using import_string for jobs Using the 'import_string()' utility from Django allows the job script class to be simplified, as module imports no longer need to avoid loops. This should make it easier to queue and maintain jobs. * Use SyncDataSourceJob for management command Instead of maintaining two separate job execution logics, the same job is now used for both background and interactive execution. * Implement BackgroundJob for running scripts The independent implementations of interactive and background script execution have been merged into a single BackgroundJob implementation. * Fix documentation of model features * Ensure consitent code style * Introduce reusable ScheduledJob A new abstract class can be used to implement job function classes that specialize in scheduling. These use the same logic as regular BackgroundJobs, but ensure that they are only scheduled once at any given time. * Introduce reusable SystemJob A new abstract class can be used to implement job function classes that specialize in system background tasks (e.g. synchronization or housekeeping). In addition to the features of the BackgroundJob and ScheduledJob classes, these implement additional logic to not need to be bound to an existing NetBox object and to setup job schedules on plugin load instead of an interactive request. * Add documentation for jobs framework * Revert "Use SyncDataSourceJob for management" This partially reverts commitdb591d4
. The 'run_now' parameter of 'enqueue()' remains, as its being used by following commits. * Merge enqueued status into JobStatusChoices * Fix logger for ScriptJob * Remove job name for scripts Because scripts are already linked through the Job Instance field, the name is displayed twice. Removing this reduces redundancy and opens up the possibility of simplifying the BackgroundJob framework in future commits. * Merge ScheduledJob into BackgroundJob Instead of using separate classes, the logic of ScheduledJob is now merged into the generic BackgroundJob class. This allows reusing the same logic, but dynamically deciding whether to enqueue the same job once or multiple times. * Add name attribute for BackgroundJob Instead of defining individual names on enqueue, BackgroundJob classes can now set a job name in their meta class. This is equivalent to other Django classes and NetBox scripts. * Drop enqueue_sync_job() method from DataSource * Import ScriptJob directly * Relax requirement for Jobs to reference a specific object * Rename 'run_now' arg on Job.enqueue() to 'immediate' * Fix queue lookup in Job enqueue * Collapse SystemJob into BackgroundJob * Remove legacy JobResultStatusChoices ChoiceSet was moved to core in40572b5
. * Use queue 'low' for system jobs by default System jobs usually perform low-priority background tasks and therefore can use a different queue than 'default', which is used for regular jobs related to specific objects. * Add test cases for BackgroundJob handling * Fix enqueue interval jobs As the job's name is set by enqueue(), it must not be passed in handle() to avoid duplicate kwargs with the same name. * Honor schedule_at for job's enqueue_once Not only can a job's interval change, but so can the time at which it is scheduled to run. If a specific scheduled time is set, it will also be checked against the current job schedule. If there are any changes, the job is rescheduled with the new time. * Switch BackgroundJob to regular methods Instead of using a class method for run(), a regular method is used for this purpose. This gives the possibility to add more convenience methods in the future, e.g. for interacting with the job object or for logging, as implemented for scripts. * Fix background tasks documentation * Test enqueue in combination with enqueue_once * Rename background jobs to tasks (to differentiate from RQ) * Touch up docs * Revert "Use queue 'low' for system jobs by default" This reverts commitb17b2050df
. * Remove system background job This commit reverts commits4880d81
and0b15ecf
. Using the database 'connection_created' signal for job registration feels a little wrong at this point, as it would trigger registration very often. However, the background job framework is prepared for this use case and can be used by plugins once the auto-registration of jobs is solved. * Fix runscript management command Defining names for background jobs was disabled withfb75389
. The preceeding changes in257976d
did forget the management command. * Use regular imports for ScriptJob * Rename BackgroundJob to JobRunner --------- Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
This commit is contained in:
parent
dde84b47da
commit
d3a3a6ba46
@ -18,7 +18,7 @@ Depending on its classification, each NetBox model may support various features
|
||||
| [Custom links](../customization/custom-links.md) | `CustomLinksMixin` | `custom_links` | These models support the assignment of custom links |
|
||||
| [Custom validation](../customization/custom-validation.md) | `CustomValidationMixin` | - | Supports the enforcement of custom validation rules |
|
||||
| [Export templates](../customization/export-templates.md) | `ExportTemplatesMixin` | `export_templates` | Users can create custom export templates for these models |
|
||||
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Users can create custom export templates for these models |
|
||||
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Background jobs can be scheduled for these models |
|
||||
| [Journaling](../features/journaling.md) | `JournalingMixin` | `journaling` | These models support persistent historical commentary |
|
||||
| [Synchronized data](../integrations/synchronized-data.md) | `SyncedDataMixin` | `synced_data` | Certain model data can be automatically synchronized from a remote data source |
|
||||
| [Tagging](../models/extras/tag.md) | `TagsMixin` | `tags` | The models can be tagged with user-defined tags |
|
||||
|
97
docs/plugins/development/background-jobs.md
Normal file
97
docs/plugins/development/background-jobs.md
Normal file
@ -0,0 +1,97 @@
|
||||
# Background Jobs
|
||||
|
||||
NetBox plugins can defer certain operations by enqueuing [background jobs](../../features/background-jobs.md), which are executed asynchronously by background workers. This is helpful for decoupling long-running processes from the user-facing request-response cycle.
|
||||
|
||||
For example, your plugin might need to fetch data from a remote system. Depending on the amount of data and the responsiveness of the remote server, this could take a few minutes. Deferring this task to a queued job ensures that it can be completed in the background, without interrupting the user. The data it fetches can be made available once the job has completed.
|
||||
|
||||
## Job Runners
|
||||
|
||||
A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `JobRunner` class.
|
||||
|
||||
::: utilities.jobs.JobRunner
|
||||
|
||||
#### Example
|
||||
|
||||
```python title="jobs.py"
|
||||
from utilities.jobs import JobRunner
|
||||
|
||||
|
||||
class MyTestJob(JobRunner):
|
||||
class Meta:
|
||||
name = "My Test Job"
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
obj = self.job.object
|
||||
# your logic goes here
|
||||
```
|
||||
|
||||
You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.
|
||||
|
||||
### Attributes
|
||||
|
||||
`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.
|
||||
|
||||
#### `name`
|
||||
|
||||
This is the human-friendly names of your background job. If omitted, the class name will be used.
|
||||
|
||||
### Scheduled Jobs
|
||||
|
||||
As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`.
|
||||
|
||||
!!! tip
|
||||
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.
|
||||
|
||||
#### Example
|
||||
|
||||
```python title="jobs.py"
|
||||
from utilities.jobs import JobRunner
|
||||
|
||||
|
||||
class MyHousekeepingJob(JobRunner):
|
||||
class Meta:
|
||||
name = "Housekeeping"
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
# your logic goes here
|
||||
```
|
||||
|
||||
```python title="__init__.py"
|
||||
from netbox.plugins import PluginConfig
|
||||
|
||||
class MyPluginConfig(PluginConfig):
|
||||
def ready(self):
|
||||
from .jobs import MyHousekeepingJob
|
||||
MyHousekeepingJob.setup(interval=60)
|
||||
```
|
||||
|
||||
## Task queues
|
||||
|
||||
Three task queues of differing priority are defined by default:
|
||||
|
||||
* High
|
||||
* Default
|
||||
* Low
|
||||
|
||||
Any tasks in the "high" queue are completed before the default queue is checked, and any tasks in the default queue are completed before those in the "low" queue.
|
||||
|
||||
Plugins can also add custom queues for their own needs by setting the `queues` attribute under the PluginConfig class. An example is included below:
|
||||
|
||||
```python
|
||||
class MyPluginConfig(PluginConfig):
|
||||
name = 'myplugin'
|
||||
...
|
||||
queues = [
|
||||
'foo',
|
||||
'bar',
|
||||
]
|
||||
```
|
||||
|
||||
The `PluginConfig` above creates two custom queues with the following names `my_plugin.foo` and `my_plugin.bar`. (The plugin's name is prepended to each queue to avoid conflicts between plugins.)
|
||||
|
||||
!!! warning "Configuring the RQ worker process"
|
||||
By default, NetBox's RQ worker process only services the high, default, and low queues. Plugins which introduce custom queues should advise users to either reconfigure the default worker, or run a dedicated worker specifying the necessary queues. For example:
|
||||
|
||||
```
|
||||
python manage.py rqworker my_plugin.foo my_plugin.bar
|
||||
```
|
@ -1,30 +0,0 @@
|
||||
# Background Tasks
|
||||
|
||||
NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle, using the [Python RQ](https://python-rq.org/) library. Three task queues of differing priority are defined by default:
|
||||
|
||||
* High
|
||||
* Default
|
||||
* Low
|
||||
|
||||
Any tasks in the "high" queue are completed before the default queue is checked, and any tasks in the default queue are completed before those in the "low" queue.
|
||||
|
||||
Plugins can also add custom queues for their own needs by setting the `queues` attribute under the PluginConfig class. An example is included below:
|
||||
|
||||
```python
|
||||
class MyPluginConfig(PluginConfig):
|
||||
name = 'myplugin'
|
||||
...
|
||||
queues = [
|
||||
'foo',
|
||||
'bar',
|
||||
]
|
||||
```
|
||||
|
||||
The PluginConfig above creates two custom queues with the following names `my_plugin.foo` and `my_plugin.bar`. (The plugin's name is prepended to each queue to avoid conflicts between plugins.)
|
||||
|
||||
!!! warning "Configuring the RQ worker process"
|
||||
By default, NetBox's RQ worker process only services the high, default, and low queues. Plugins which introduce custom queues should advise users to either reconfigure the default worker, or run a dedicated worker specifying the necessary queues. For example:
|
||||
|
||||
```
|
||||
python manage.py rqworker my_plugin.foo my_plugin.bar
|
||||
```
|
@ -47,6 +47,7 @@ project-name/
|
||||
- __init__.py
|
||||
- filtersets.py
|
||||
- graphql.py
|
||||
- jobs.py
|
||||
- models.py
|
||||
- middleware.py
|
||||
- navigation.py
|
||||
|
@ -130,6 +130,8 @@ For more information about database migrations, see the [Django documentation](h
|
||||
|
||||
::: netbox.models.features.ExportTemplatesMixin
|
||||
|
||||
::: netbox.models.features.JobsMixin
|
||||
|
||||
::: netbox.models.features.JournalingMixin
|
||||
|
||||
::: netbox.models.features.TagsMixin
|
||||
|
@ -146,7 +146,7 @@ nav:
|
||||
- Data Backends: 'plugins/development/data-backends.md'
|
||||
- REST API: 'plugins/development/rest-api.md'
|
||||
- GraphQL API: 'plugins/development/graphql-api.md'
|
||||
- Background Tasks: 'plugins/development/background-tasks.md'
|
||||
- Background Jobs: 'plugins/development/background-jobs.md'
|
||||
- Dashboard Widgets: 'plugins/development/dashboard-widgets.md'
|
||||
- Staged Changes: 'plugins/development/staged-changes.md'
|
||||
- Exceptions: 'plugins/development/exceptions.md'
|
||||
|
@ -7,6 +7,8 @@ from rest_framework.routers import APIRootView
|
||||
from rest_framework.viewsets import ReadOnlyModelViewSet
|
||||
|
||||
from core import filtersets
|
||||
from core.choices import DataSourceStatusChoices
|
||||
from core.jobs import SyncDataSourceJob
|
||||
from core.models import *
|
||||
from netbox.api.metadata import ContentTypeMetadata
|
||||
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
|
||||
@ -36,7 +38,11 @@ class DataSourceViewSet(NetBoxModelViewSet):
|
||||
if not request.user.has_perm('core.sync_datasource', obj=datasource):
|
||||
raise PermissionDenied(_("This user does not have permission to synchronize this data source."))
|
||||
|
||||
datasource.enqueue_sync_job(request)
|
||||
# Enqueue the sync job & update the DataSource's status
|
||||
SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
|
||||
datasource.status = DataSourceStatusChoices.QUEUED
|
||||
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)
|
||||
|
||||
serializer = serializers.DataSourceSerializer(datasource, context={'request': request})
|
||||
|
||||
return Response(serializer.data)
|
||||
|
@ -59,6 +59,12 @@ class JobStatusChoices(ChoiceSet):
|
||||
(STATUS_FAILED, _('Failed'), 'red'),
|
||||
)
|
||||
|
||||
ENQUEUED_STATE_CHOICES = (
|
||||
STATUS_PENDING,
|
||||
STATUS_SCHEDULED,
|
||||
STATUS_RUNNING,
|
||||
)
|
||||
|
||||
TERMINAL_STATE_CHOICES = (
|
||||
STATUS_COMPLETED,
|
||||
STATUS_ERRORED,
|
||||
|
@ -1,33 +1,33 @@
|
||||
import logging
|
||||
|
||||
from netbox.search.backends import search_backend
|
||||
from .choices import *
|
||||
from utilities.jobs import JobRunner
|
||||
from .choices import DataSourceStatusChoices
|
||||
from .exceptions import SyncError
|
||||
from .models import DataSource
|
||||
from rq.timeouts import JobTimeoutException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def sync_datasource(job, *args, **kwargs):
|
||||
class SyncDataSourceJob(JobRunner):
|
||||
"""
|
||||
Call sync() on a DataSource.
|
||||
"""
|
||||
datasource = DataSource.objects.get(pk=job.object_id)
|
||||
|
||||
try:
|
||||
job.start()
|
||||
datasource.sync()
|
||||
class Meta:
|
||||
name = 'Synchronization'
|
||||
|
||||
# Update the search cache for DataFiles belonging to this source
|
||||
search_backend.cache(datasource.datafiles.iterator())
|
||||
def run(self, *args, **kwargs):
|
||||
datasource = DataSource.objects.get(pk=self.job.object_id)
|
||||
|
||||
job.terminate()
|
||||
try:
|
||||
datasource.sync()
|
||||
|
||||
except Exception as e:
|
||||
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
|
||||
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
|
||||
if type(e) in (SyncError, JobTimeoutException):
|
||||
logging.error(e)
|
||||
else:
|
||||
# Update the search cache for DataFiles belonging to this source
|
||||
search_backend.cache(datasource.datafiles.iterator())
|
||||
|
||||
except Exception as e:
|
||||
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
|
||||
if type(e) is SyncError:
|
||||
logging.error(e)
|
||||
raise e
|
||||
|
24
netbox/core/migrations/0012_job_object_type_optional.py
Normal file
24
netbox/core/migrations/0012_job_object_type_optional.py
Normal file
@ -0,0 +1,24 @@
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('contenttypes', '0002_remove_content_type_name'),
|
||||
('core', '0011_move_objectchange'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='job',
|
||||
name='object_type',
|
||||
field=models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name='jobs',
|
||||
to='contenttypes.contenttype'
|
||||
),
|
||||
),
|
||||
]
|
@ -1,10 +1,10 @@
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import yaml
|
||||
from fnmatch import fnmatchcase
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import yaml
|
||||
from django.conf import settings
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.core.exceptions import ValidationError
|
||||
@ -12,7 +12,6 @@ from django.core.validators import RegexValidator
|
||||
from django.db import models
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.module_loading import import_string
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
|
||||
@ -23,7 +22,6 @@ from utilities.querysets import RestrictedQuerySet
|
||||
from ..choices import *
|
||||
from ..exceptions import SyncError
|
||||
from ..signals import post_sync, pre_sync
|
||||
from .jobs import Job
|
||||
|
||||
__all__ = (
|
||||
'AutoSyncRecord',
|
||||
@ -153,21 +151,6 @@ class DataSource(JobsMixin, PrimaryModel):
|
||||
|
||||
return objectchange
|
||||
|
||||
def enqueue_sync_job(self, request):
|
||||
"""
|
||||
Enqueue a background job to synchronize the DataSource by calling sync().
|
||||
"""
|
||||
# Set the status to "syncing"
|
||||
self.status = DataSourceStatusChoices.QUEUED
|
||||
DataSource.objects.filter(pk=self.pk).update(status=self.status)
|
||||
|
||||
# Enqueue a sync job
|
||||
return Job.enqueue(
|
||||
import_string('core.jobs.sync_datasource'),
|
||||
instance=self,
|
||||
user=request.user
|
||||
)
|
||||
|
||||
def get_backend(self):
|
||||
backend_params = self.parameters or {}
|
||||
return self.backend_class(self.source_url, **backend_params)
|
||||
|
@ -31,6 +31,8 @@ class Job(models.Model):
|
||||
to='contenttypes.ContentType',
|
||||
related_name='jobs',
|
||||
on_delete=models.CASCADE,
|
||||
blank=True,
|
||||
null=True
|
||||
)
|
||||
object_id = models.PositiveBigIntegerField(
|
||||
blank=True,
|
||||
@ -197,25 +199,34 @@ class Job(models.Model):
|
||||
job_end.send(self)
|
||||
|
||||
@classmethod
|
||||
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
|
||||
def enqueue(cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, **kwargs):
|
||||
"""
|
||||
Create a Job instance and enqueue a job using the given callable
|
||||
|
||||
Args:
|
||||
func: The callable object to be enqueued for execution
|
||||
instance: The NetBox object to which this job pertains
|
||||
instance: The NetBox object to which this job pertains (optional)
|
||||
name: Name for the job (optional)
|
||||
user: The user responsible for running the job
|
||||
schedule_at: Schedule the job to be executed at the passed date and time
|
||||
interval: Recurrence interval (in minutes)
|
||||
immediate: Run the job immediately without scheduling it in the background. Should be used for interactive
|
||||
management commands only.
|
||||
"""
|
||||
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
|
||||
rq_queue_name = get_queue_for_model(object_type.model)
|
||||
if schedule_at and immediate:
|
||||
raise ValueError("enqueue() cannot be called with values for both schedule_at and immediate.")
|
||||
|
||||
if instance:
|
||||
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
|
||||
object_id = instance.pk
|
||||
else:
|
||||
object_type = object_id = None
|
||||
rq_queue_name = get_queue_for_model(object_type.model if object_type else None)
|
||||
queue = django_rq.get_queue(rq_queue_name)
|
||||
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
|
||||
job = Job.objects.create(
|
||||
object_type=object_type,
|
||||
object_id=instance.pk,
|
||||
object_id=object_id,
|
||||
name=name,
|
||||
status=status,
|
||||
scheduled=schedule_at,
|
||||
@ -224,8 +235,16 @@ class Job(models.Model):
|
||||
job_id=uuid.uuid4()
|
||||
)
|
||||
|
||||
if schedule_at:
|
||||
# Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous
|
||||
# (blocking) operation, and execution will pause until the job completes.
|
||||
if immediate:
|
||||
func(job_id=str(job.job_id), job=job, **kwargs)
|
||||
|
||||
# Schedule the job to run at a specific date & time.
|
||||
elif schedule_at:
|
||||
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
|
||||
|
||||
# Schedule the job to run asynchronously at this first available opportunity.
|
||||
else:
|
||||
queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs)
|
||||
|
||||
|
@ -34,6 +34,8 @@ from utilities.htmx import htmx_partial
|
||||
from utilities.query import count_related
|
||||
from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, register_model_view
|
||||
from . import filtersets, forms, tables
|
||||
from .choices import DataSourceStatusChoices
|
||||
from .jobs import SyncDataSourceJob
|
||||
from .models import *
|
||||
from .plugins import get_plugins
|
||||
from .tables import CatalogPluginTable, PluginVersionTable
|
||||
@ -76,7 +78,11 @@ class DataSourceSyncView(BaseObjectView):
|
||||
|
||||
def post(self, request, pk):
|
||||
datasource = get_object_or_404(self.queryset, pk=pk)
|
||||
job = datasource.enqueue_sync_job(request)
|
||||
|
||||
# Enqueue the sync job & update the DataSource's status
|
||||
job = SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
|
||||
datasource.status = DataSourceStatusChoices.QUEUED
|
||||
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)
|
||||
|
||||
messages.success(request, f"Queued job #{job.pk} to sync {datasource}")
|
||||
return redirect(datasource.get_absolute_url())
|
||||
|
@ -1,5 +1,6 @@
|
||||
from django.http import Http404
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils.module_loading import import_string
|
||||
from django_rq.queues import get_connection
|
||||
from rest_framework import status
|
||||
from rest_framework.decorators import action
|
||||
@ -11,10 +12,10 @@ from rest_framework.routers import APIRootView
|
||||
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
|
||||
from rq import Worker
|
||||
|
||||
from core.models import Job, ObjectType
|
||||
from core.models import ObjectType
|
||||
from extras import filtersets
|
||||
from extras.models import *
|
||||
from extras.scripts import run_script
|
||||
from extras.jobs import ScriptJob
|
||||
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
|
||||
from netbox.api.features import SyncedDataMixin
|
||||
from netbox.api.metadata import ContentTypeMetadata
|
||||
@ -273,10 +274,8 @@ class ScriptViewSet(ModelViewSet):
|
||||
raise RQWorkerNotRunningException()
|
||||
|
||||
if input_serializer.is_valid():
|
||||
Job.enqueue(
|
||||
run_script,
|
||||
ScriptJob.enqueue(
|
||||
instance=script,
|
||||
name=script.python_class.class_name,
|
||||
user=request.user,
|
||||
data=input_serializer.data['data'],
|
||||
request=copy_safe_request(request),
|
||||
|
@ -191,35 +191,6 @@ class DurationChoices(ChoiceSet):
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
# Job results
|
||||
#
|
||||
|
||||
class JobResultStatusChoices(ChoiceSet):
|
||||
|
||||
STATUS_PENDING = 'pending'
|
||||
STATUS_SCHEDULED = 'scheduled'
|
||||
STATUS_RUNNING = 'running'
|
||||
STATUS_COMPLETED = 'completed'
|
||||
STATUS_ERRORED = 'errored'
|
||||
STATUS_FAILED = 'failed'
|
||||
|
||||
CHOICES = (
|
||||
(STATUS_PENDING, _('Pending'), 'cyan'),
|
||||
(STATUS_SCHEDULED, _('Scheduled'), 'gray'),
|
||||
(STATUS_RUNNING, _('Running'), 'blue'),
|
||||
(STATUS_COMPLETED, _('Completed'), 'green'),
|
||||
(STATUS_ERRORED, _('Errored'), 'red'),
|
||||
(STATUS_FAILED, _('Failed'), 'red'),
|
||||
)
|
||||
|
||||
TERMINAL_STATE_CHOICES = (
|
||||
STATUS_COMPLETED,
|
||||
STATUS_ERRORED,
|
||||
STATUS_FAILED,
|
||||
)
|
||||
|
||||
|
||||
#
|
||||
# Webhooks
|
||||
#
|
||||
|
@ -1,5 +1,6 @@
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
@ -10,7 +11,6 @@ from django.utils.translation import gettext as _
|
||||
from django_rq import get_queue
|
||||
|
||||
from core.events import *
|
||||
from core.models import Job
|
||||
from netbox.config import get_config
|
||||
from netbox.constants import RQ_QUEUE_DEFAULT
|
||||
from netbox.registry import registry
|
||||
@ -126,8 +126,8 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
|
||||
script = event_rule.action_object.python_class()
|
||||
|
||||
# Enqueue a Job to record the script's execution
|
||||
Job.enqueue(
|
||||
"extras.scripts.run_script",
|
||||
from extras.jobs import ScriptJob
|
||||
ScriptJob.enqueue(
|
||||
instance=event_rule.action_object,
|
||||
name=script.name,
|
||||
user=user,
|
||||
|
107
netbox/extras/jobs.py
Normal file
107
netbox/extras/jobs.py
Normal file
@ -0,0 +1,107 @@
|
||||
import logging
|
||||
import traceback
|
||||
from contextlib import nullcontext
|
||||
|
||||
from django.db import transaction
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from extras.models import Script as ScriptModel
|
||||
from extras.signals import clear_events
|
||||
from netbox.context_managers import event_tracking
|
||||
from utilities.exceptions import AbortScript, AbortTransaction
|
||||
from utilities.jobs import JobRunner
|
||||
from .utils import is_report
|
||||
|
||||
|
||||
class ScriptJob(JobRunner):
|
||||
"""
|
||||
Script execution job.
|
||||
|
||||
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
|
||||
exists outside the Script class to ensure it cannot be overridden by a script author.
|
||||
"""
|
||||
|
||||
class Meta:
|
||||
# An explicit job name is not set because it doesn't make sense in this context. Currently, there's no scenario
|
||||
# where jobs other than this one are used. Therefore, it is hidden, resulting in a cleaner job table overview.
|
||||
name = ''
|
||||
|
||||
def run_script(self, script, request, data, commit):
|
||||
"""
|
||||
Core script execution task. We capture this within a method to allow for conditionally wrapping it with the
|
||||
event_tracking context manager (which is bypassed if commit == False).
|
||||
|
||||
Args:
|
||||
request: The WSGI request associated with this execution (if any)
|
||||
data: A dictionary of data to be passed to the script upon execution
|
||||
commit: Passed through to Script.run()
|
||||
"""
|
||||
logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
|
||||
logger.info(f"Running script (commit={commit})")
|
||||
|
||||
try:
|
||||
try:
|
||||
with transaction.atomic():
|
||||
script.output = script.run(data, commit)
|
||||
if not commit:
|
||||
raise AbortTransaction()
|
||||
except AbortTransaction:
|
||||
script.log_info(message=_("Database changes have been reverted automatically."))
|
||||
if script.failed:
|
||||
logger.warning(f"Script failed")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
if type(e) is AbortScript:
|
||||
msg = _("Script aborted with error: ") + str(e)
|
||||
if is_report(type(script)):
|
||||
script.log_failure(message=msg)
|
||||
else:
|
||||
script.log_failure(msg)
|
||||
logger.error(f"Script aborted with error: {e}")
|
||||
|
||||
else:
|
||||
stacktrace = traceback.format_exc()
|
||||
script.log_failure(
|
||||
message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
|
||||
)
|
||||
logger.error(f"Exception raised during script execution: {e}")
|
||||
|
||||
if type(e) is not AbortTransaction:
|
||||
script.log_info(message=_("Database changes have been reverted due to error."))
|
||||
|
||||
# Clear all pending events. Job termination (including setting the status) is handled by the job framework.
|
||||
if request:
|
||||
clear_events.send(request)
|
||||
raise
|
||||
|
||||
# Update the job data regardless of the execution status of the job. Successes should be reported as well as
|
||||
# failures.
|
||||
finally:
|
||||
self.job.data = script.get_job_data()
|
||||
|
||||
def run(self, data, request=None, commit=True, **kwargs):
|
||||
"""
|
||||
Run the script.
|
||||
|
||||
Args:
|
||||
job: The Job associated with this execution
|
||||
data: A dictionary of data to be passed to the script upon execution
|
||||
request: The WSGI request associated with this execution (if any)
|
||||
commit: Passed through to Script.run()
|
||||
"""
|
||||
script = ScriptModel.objects.get(pk=self.job.object_id).python_class()
|
||||
|
||||
# Add files to form data
|
||||
if request:
|
||||
files = request.FILES
|
||||
for field_name, fileobj in files.items():
|
||||
data[field_name] = fileobj
|
||||
|
||||
# Add the current request as a property of the script
|
||||
script.request = request
|
||||
|
||||
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
|
||||
# change logging, event rules, etc.
|
||||
with event_tracking(request) if commit else nullcontext():
|
||||
self.run_script(script, request, data, commit)
|
@ -1,19 +1,14 @@
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.db import transaction
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
from core.choices import JobStatusChoices
|
||||
from core.models import Job
|
||||
from extras.jobs import ScriptJob
|
||||
from extras.scripts import get_module_and_script
|
||||
from extras.signals import clear_events
|
||||
from netbox.context_managers import event_tracking
|
||||
from utilities.exceptions import AbortTransaction
|
||||
from utilities.request import NetBoxFakeRequest
|
||||
|
||||
|
||||
@ -33,44 +28,6 @@ class Command(BaseCommand):
|
||||
parser.add_argument('script', help="Script to run")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
|
||||
def _run_script():
|
||||
"""
|
||||
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
|
||||
the event_tracking context manager (which is bypassed if commit == False).
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
with transaction.atomic():
|
||||
script.output = script.run(data=data, commit=commit)
|
||||
if not commit:
|
||||
raise AbortTransaction()
|
||||
except AbortTransaction:
|
||||
script.log_info("Database changes have been reverted automatically.")
|
||||
clear_events.send(request)
|
||||
job.data = script.get_job_data()
|
||||
job.terminate()
|
||||
except Exception as e:
|
||||
stacktrace = traceback.format_exc()
|
||||
script.log_failure(
|
||||
f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
|
||||
)
|
||||
script.log_info("Database changes have been reverted due to error.")
|
||||
logger.error(f"Exception raised during script execution: {e}")
|
||||
clear_events.send(request)
|
||||
job.data = script.get_job_data()
|
||||
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
|
||||
|
||||
# Print any test method results
|
||||
for test_name, attrs in job.data['tests'].items():
|
||||
self.stdout.write(
|
||||
"\t{}: {} success, {} info, {} warning, {} failure".format(
|
||||
test_name, attrs['success'], attrs['info'], attrs['warning'], attrs['failure']
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Script completed in {job.duration}")
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
# Params
|
||||
@ -84,8 +41,8 @@ class Command(BaseCommand):
|
||||
data = {}
|
||||
|
||||
module_name, script_name = script.split('.', 1)
|
||||
module, script = get_module_and_script(module_name, script_name)
|
||||
script = script.python_class
|
||||
module, script_obj = get_module_and_script(module_name, script_name)
|
||||
script = script_obj.python_class
|
||||
|
||||
# Take user from command line if provided and exists, other
|
||||
if options['user']:
|
||||
@ -120,40 +77,29 @@ class Command(BaseCommand):
|
||||
# Initialize the script form
|
||||
script = script()
|
||||
form = script.as_form(data, None)
|
||||
|
||||
# Create the job
|
||||
job = Job.objects.create(
|
||||
object=module,
|
||||
name=script.class_name,
|
||||
user=User.objects.filter(is_superuser=True).order_by('pk')[0],
|
||||
job_id=uuid.uuid4()
|
||||
)
|
||||
|
||||
request = NetBoxFakeRequest({
|
||||
'META': {},
|
||||
'POST': data,
|
||||
'GET': {},
|
||||
'FILES': {},
|
||||
'user': user,
|
||||
'path': '',
|
||||
'id': job.job_id
|
||||
})
|
||||
|
||||
if form.is_valid():
|
||||
job.status = JobStatusChoices.STATUS_RUNNING
|
||||
job.save()
|
||||
|
||||
logger.info(f"Running script (commit={commit})")
|
||||
script.request = request
|
||||
|
||||
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
|
||||
# change logging, webhooks, etc.
|
||||
with event_tracking(request):
|
||||
_run_script()
|
||||
else:
|
||||
if not form.is_valid():
|
||||
logger.error('Data is not valid:')
|
||||
for field, errors in form.errors.get_json_data().items():
|
||||
for error in errors:
|
||||
logger.error(f'\t{field}: {error.get("message")}')
|
||||
job.status = JobStatusChoices.STATUS_ERRORED
|
||||
job.save()
|
||||
raise CommandError()
|
||||
|
||||
# Execute the script.
|
||||
job = ScriptJob.enqueue(
|
||||
instance=script_obj,
|
||||
user=user,
|
||||
immediate=True,
|
||||
data=data,
|
||||
request=NetBoxFakeRequest({
|
||||
'META': {},
|
||||
'POST': data,
|
||||
'GET': {},
|
||||
'FILES': {},
|
||||
'user': user,
|
||||
'path': '',
|
||||
'id': uuid.uuid4()
|
||||
}),
|
||||
commit=commit,
|
||||
)
|
||||
|
||||
logger.info(f"Script completed in {job.duration}")
|
||||
|
@ -2,32 +2,23 @@ import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import traceback
|
||||
from datetime import timedelta
|
||||
|
||||
import yaml
|
||||
from django import forms
|
||||
from django.conf import settings
|
||||
from django.core.validators import RegexValidator
|
||||
from django.db import transaction
|
||||
from django.utils import timezone
|
||||
from django.utils.functional import classproperty
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from core.choices import JobStatusChoices
|
||||
from core.models import Job
|
||||
from extras.choices import LogLevelChoices
|
||||
from extras.models import ScriptModule, Script as ScriptModel
|
||||
from extras.signals import clear_events
|
||||
from extras.models import ScriptModule
|
||||
from ipam.formfields import IPAddressFormField, IPNetworkFormField
|
||||
from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
|
||||
from netbox.context_managers import event_tracking
|
||||
from utilities.exceptions import AbortScript, AbortTransaction
|
||||
from utilities.forms import add_blank_choice
|
||||
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField
|
||||
from utilities.forms.widgets import DatePicker, DateTimePicker
|
||||
from .forms import ScriptForm
|
||||
from .utils import is_report
|
||||
|
||||
|
||||
__all__ = (
|
||||
@ -48,7 +39,6 @@ __all__ = (
|
||||
'StringVar',
|
||||
'TextVar',
|
||||
'get_module_and_script',
|
||||
'run_script',
|
||||
)
|
||||
|
||||
|
||||
@ -613,111 +603,3 @@ def get_module_and_script(module_name, script_name):
|
||||
module = ScriptModule.objects.get(file_path=f'{module_name}.py')
|
||||
script = module.scripts.get(name=script_name)
|
||||
return module, script
|
||||
|
||||
|
||||
def run_script(data, job, request=None, commit=True, **kwargs):
|
||||
"""
|
||||
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
|
||||
exists outside the Script class to ensure it cannot be overridden by a script author.
|
||||
|
||||
Args:
|
||||
data: A dictionary of data to be passed to the script upon execution
|
||||
job: The Job associated with this execution
|
||||
request: The WSGI request associated with this execution (if any)
|
||||
commit: Passed through to Script.run()
|
||||
"""
|
||||
job.start()
|
||||
|
||||
script = ScriptModel.objects.get(pk=job.object_id).python_class()
|
||||
|
||||
logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
|
||||
logger.info(f"Running script (commit={commit})")
|
||||
|
||||
# Add files to form data
|
||||
if request:
|
||||
files = request.FILES
|
||||
for field_name, fileobj in files.items():
|
||||
data[field_name] = fileobj
|
||||
|
||||
# Add the current request as a property of the script
|
||||
script.request = request
|
||||
|
||||
def set_job_data(script):
|
||||
job.data = {
|
||||
'log': script.messages,
|
||||
'output': script.output,
|
||||
'tests': script.tests,
|
||||
}
|
||||
|
||||
return job
|
||||
|
||||
def _run_script(job):
|
||||
"""
|
||||
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
|
||||
the event_tracking context manager (which is bypassed if commit == False).
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
with transaction.atomic():
|
||||
script.output = script.run(data, commit)
|
||||
if not commit:
|
||||
raise AbortTransaction()
|
||||
except AbortTransaction:
|
||||
script.log_info(message=_("Database changes have been reverted automatically."))
|
||||
if request:
|
||||
clear_events.send(request)
|
||||
|
||||
job.data = script.get_job_data()
|
||||
if script.failed:
|
||||
logger.warning(f"Script failed")
|
||||
job.terminate(status=JobStatusChoices.STATUS_FAILED)
|
||||
else:
|
||||
job.terminate()
|
||||
|
||||
except Exception as e:
|
||||
if type(e) is AbortScript:
|
||||
msg = _("Script aborted with error: ") + str(e)
|
||||
if is_report(type(script)):
|
||||
script.log_failure(message=msg)
|
||||
else:
|
||||
script.log_failure(msg)
|
||||
|
||||
logger.error(f"Script aborted with error: {e}")
|
||||
else:
|
||||
stacktrace = traceback.format_exc()
|
||||
script.log_failure(
|
||||
message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
|
||||
)
|
||||
logger.error(f"Exception raised during script execution: {e}")
|
||||
script.log_info(message=_("Database changes have been reverted due to error."))
|
||||
|
||||
job.data = script.get_job_data()
|
||||
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
|
||||
if request:
|
||||
clear_events.send(request)
|
||||
|
||||
logger.info(f"Script completed in {job.duration}")
|
||||
|
||||
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
|
||||
# change logging, event rules, etc.
|
||||
if commit:
|
||||
with event_tracking(request):
|
||||
_run_script(job)
|
||||
else:
|
||||
_run_script(job)
|
||||
|
||||
# Schedule the next job if an interval has been set
|
||||
if job.interval:
|
||||
new_scheduled_time = job.scheduled + timedelta(minutes=job.interval)
|
||||
Job.enqueue(
|
||||
run_script,
|
||||
instance=job.object,
|
||||
name=job.name,
|
||||
user=job.user,
|
||||
schedule_at=new_scheduled_time,
|
||||
interval=job.interval,
|
||||
job_timeout=script.job_timeout,
|
||||
data=data,
|
||||
request=request,
|
||||
commit=commit
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ from django.db.models import Count, Q
|
||||
from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponse
|
||||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.urls import reverse
|
||||
from django.utils.module_loading import import_string
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext as _
|
||||
from django.views.generic import View
|
||||
@ -35,7 +36,6 @@ from virtualization.models import VirtualMachine
|
||||
from . import filtersets, forms, tables
|
||||
from .constants import LOG_LEVEL_RANK
|
||||
from .models import *
|
||||
from .scripts import run_script
|
||||
from .tables import ReportResultsTable, ScriptResultsTable
|
||||
|
||||
|
||||
@ -1175,10 +1175,9 @@ class ScriptView(BaseScriptView):
|
||||
if not get_workers_for_queue('default'):
|
||||
messages.error(request, _("Unable to run script: RQ worker process not running."))
|
||||
elif form.is_valid():
|
||||
job = Job.enqueue(
|
||||
run_script,
|
||||
ScriptJob = import_string("extras.jobs.ScriptJob")
|
||||
job = ScriptJob.enqueue(
|
||||
instance=script,
|
||||
name=script_class.class_name,
|
||||
user=request.user,
|
||||
schedule_at=form.cleaned_data.pop('_schedule_at'),
|
||||
interval=form.cleaned_data.pop('_interval'),
|
||||
|
@ -23,6 +23,9 @@ ADVISORY_LOCK_KEYS = {
|
||||
'wirelesslangroup': 105600,
|
||||
'inventoryitem': 105700,
|
||||
'inventoryitemtemplate': 105800,
|
||||
|
||||
# Jobs
|
||||
'job-schedules': 110100,
|
||||
}
|
||||
|
||||
# Default view action permission mapping
|
||||
|
133
netbox/utilities/jobs.py
Normal file
133
netbox/utilities/jobs.py
Normal file
@ -0,0 +1,133 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import timedelta
|
||||
|
||||
from django.utils.functional import classproperty
|
||||
from django_pglocks import advisory_lock
|
||||
from rq.timeouts import JobTimeoutException
|
||||
|
||||
from core.choices import JobStatusChoices
|
||||
from core.models import Job, ObjectType
|
||||
from netbox.constants import ADVISORY_LOCK_KEYS
|
||||
|
||||
__all__ = (
|
||||
'JobRunner',
|
||||
)
|
||||
|
||||
|
||||
class JobRunner(ABC):
|
||||
"""
|
||||
Background Job helper class.
|
||||
|
||||
This class handles the execution of a background job. It is responsible for maintaining its state, reporting errors,
|
||||
and scheduling recurring jobs.
|
||||
"""
|
||||
|
||||
class Meta:
|
||||
pass
|
||||
|
||||
def __init__(self, job):
|
||||
"""
|
||||
Args:
|
||||
job: The specific `Job` this `JobRunner` is executing.
|
||||
"""
|
||||
self.job = job
|
||||
|
||||
@classproperty
|
||||
def name(cls):
|
||||
return getattr(cls.Meta, 'name', cls.__name__)
|
||||
|
||||
@abstractmethod
|
||||
def run(self, *args, **kwargs):
|
||||
"""
|
||||
Run the job.
|
||||
|
||||
A `JobRunner` class needs to implement this method to execute all commands of the job.
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def handle(cls, job, *args, **kwargs):
|
||||
"""
|
||||
Handle the execution of a `Job`.
|
||||
|
||||
This method is called by the Job Scheduler to handle the execution of all job commands. It will maintain the
|
||||
job's metadata and handle errors. For periodic jobs, a new job is automatically scheduled using its `interval`.
|
||||
"""
|
||||
try:
|
||||
job.start()
|
||||
cls(job).run(*args, **kwargs)
|
||||
job.terminate()
|
||||
|
||||
except Exception as e:
|
||||
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
|
||||
if type(e) is JobTimeoutException:
|
||||
logging.error(e)
|
||||
|
||||
# If the executed job is a periodic job, schedule its next execution at the specified interval.
|
||||
finally:
|
||||
if job.interval:
|
||||
new_scheduled_time = (job.scheduled or job.started) + timedelta(minutes=job.interval)
|
||||
cls.enqueue(
|
||||
instance=job.object,
|
||||
user=job.user,
|
||||
schedule_at=new_scheduled_time,
|
||||
interval=job.interval,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_jobs(cls, instance=None):
|
||||
"""
|
||||
Get all jobs of this `JobRunner` related to a specific instance.
|
||||
"""
|
||||
jobs = Job.objects.filter(name=cls.name)
|
||||
|
||||
if instance:
|
||||
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
|
||||
jobs = jobs.filter(
|
||||
object_type=object_type,
|
||||
object_id=instance.pk,
|
||||
)
|
||||
|
||||
return jobs
|
||||
|
||||
@classmethod
|
||||
def enqueue(cls, *args, **kwargs):
|
||||
"""
|
||||
Enqueue a new `Job`.
|
||||
|
||||
This method is a wrapper of `Job.enqueue()` using `handle()` as function callback. See its documentation for
|
||||
parameters.
|
||||
"""
|
||||
return Job.enqueue(cls.handle, name=cls.name, *args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
@advisory_lock(ADVISORY_LOCK_KEYS['job-schedules'])
|
||||
def enqueue_once(cls, instance=None, schedule_at=None, interval=None, *args, **kwargs):
|
||||
"""
|
||||
Enqueue a new `Job` once, i.e. skip duplicate jobs.
|
||||
|
||||
Like `enqueue()`, this method adds a new `Job` to the job queue. However, if there's already a job of this
|
||||
class scheduled for `instance`, the existing job will be updated if necessary. This ensures that a particular
|
||||
schedule is only set up once at any given time, i.e. multiple calls to this method are idempotent.
|
||||
|
||||
Note that this does not forbid running additional jobs with the `enqueue()` method, e.g. to schedule an
|
||||
immediate synchronization job in addition to a periodic synchronization schedule.
|
||||
|
||||
For additional parameters see `enqueue()`.
|
||||
|
||||
Args:
|
||||
instance: The NetBox object to which this job pertains (optional)
|
||||
schedule_at: Schedule the job to be executed at the passed date and time
|
||||
interval: Recurrence interval (in minutes)
|
||||
"""
|
||||
job = cls.get_jobs(instance).filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).first()
|
||||
if job:
|
||||
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
|
||||
# delete the existing job and schedule a new job instead.
|
||||
if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval):
|
||||
return job
|
||||
job.delete()
|
||||
|
||||
return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs)
|
129
netbox/utilities/tests/test_jobs.py
Normal file
129
netbox/utilities/tests/test_jobs.py
Normal file
@ -0,0 +1,129 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from django.test import TestCase
|
||||
from django.utils import timezone
|
||||
from django_rq import get_queue
|
||||
|
||||
from ..jobs import *
|
||||
from core.models import Job
|
||||
from core.choices import JobStatusChoices
|
||||
|
||||
|
||||
class TestJobRunner(JobRunner):
|
||||
def run(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class JobRunnerTestCase(TestCase):
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
|
||||
# Clear all queues after running each test
|
||||
get_queue('default').connection.flushall()
|
||||
get_queue('high').connection.flushall()
|
||||
get_queue('low').connection.flushall()
|
||||
|
||||
@staticmethod
|
||||
def get_schedule_at(offset=1):
|
||||
# Schedule jobs a week in advance to avoid accidentally running jobs on worker nodes used for testing.
|
||||
return timezone.now() + timedelta(weeks=offset)
|
||||
|
||||
|
||||
class JobRunnerTest(JobRunnerTestCase):
|
||||
"""
|
||||
Test internal logic of `JobRunner`.
|
||||
"""
|
||||
|
||||
def test_name_default(self):
|
||||
self.assertEqual(TestJobRunner.name, TestJobRunner.__name__)
|
||||
|
||||
def test_name_set(self):
|
||||
class NamedJobRunner(TestJobRunner):
|
||||
class Meta:
|
||||
name = 'TestName'
|
||||
|
||||
self.assertEqual(NamedJobRunner.name, 'TestName')
|
||||
|
||||
def test_handle(self):
|
||||
job = TestJobRunner.enqueue(immediate=True)
|
||||
|
||||
self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
|
||||
|
||||
def test_handle_errored(self):
|
||||
class ErroredJobRunner(TestJobRunner):
|
||||
EXP = Exception('Test error')
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
raise self.EXP
|
||||
|
||||
job = ErroredJobRunner.enqueue(immediate=True)
|
||||
|
||||
self.assertEqual(job.status, JobStatusChoices.STATUS_ERRORED)
|
||||
self.assertEqual(job.error, repr(ErroredJobRunner.EXP))
|
||||
|
||||
|
||||
class EnqueueTest(JobRunnerTestCase):
|
||||
"""
|
||||
Test enqueuing of `JobRunner`.
|
||||
"""
|
||||
|
||||
def test_enqueue(self):
|
||||
instance = Job()
|
||||
for i in range(1, 3):
|
||||
job = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
|
||||
|
||||
self.assertIsInstance(job, Job)
|
||||
self.assertEqual(TestJobRunner.get_jobs(instance).count(), i)
|
||||
|
||||
def test_enqueue_once(self):
|
||||
job = TestJobRunner.enqueue_once(instance=Job(), schedule_at=self.get_schedule_at())
|
||||
|
||||
self.assertIsInstance(job, Job)
|
||||
self.assertEqual(job.name, TestJobRunner.__name__)
|
||||
|
||||
def test_enqueue_once_twice_same(self):
|
||||
instance = Job()
|
||||
schedule_at = self.get_schedule_at()
|
||||
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
|
||||
job2 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
|
||||
|
||||
self.assertEqual(job1, job2)
|
||||
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
|
||||
|
||||
def test_enqueue_once_twice_different_schedule_at(self):
|
||||
instance = Job()
|
||||
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at())
|
||||
job2 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
|
||||
|
||||
self.assertNotEqual(job1, job2)
|
||||
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
|
||||
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
|
||||
|
||||
def test_enqueue_once_twice_different_interval(self):
|
||||
instance = Job()
|
||||
schedule_at = self.get_schedule_at()
|
||||
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
|
||||
job2 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at, interval=60)
|
||||
|
||||
self.assertNotEqual(job1, job2)
|
||||
self.assertEqual(job1.interval, None)
|
||||
self.assertEqual(job2.interval, 60)
|
||||
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
|
||||
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
|
||||
|
||||
def test_enqueue_once_with_enqueue(self):
|
||||
instance = Job()
|
||||
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
|
||||
job2 = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
|
||||
|
||||
self.assertNotEqual(job1, job2)
|
||||
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 2)
|
||||
|
||||
def test_enqueue_once_after_enqueue(self):
|
||||
instance = Job()
|
||||
job1 = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
|
||||
job2 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
|
||||
|
||||
self.assertNotEqual(job1, job2)
|
||||
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
|
||||
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
|
Loading…
Reference in New Issue
Block a user