Closes #16971: Add system jobs (#17716)

* Fix check for existing jobs

If a job is to be enqueued once and no specific scheduled time is
specified, any scheduled time of existing jobs will be valid. Only if a
specific scheduled time is specified for 'enqueue_once()' can it be
evaluated.

* Allow system jobs to be registered

A new registry key allows background system jobs to be registered and
automatically scheduled when rqworker starts.

* Test scheduling of system jobs

* Fix plugins scheduled job documentation

The documentation reflected a non-production state of the JobRunner
framework left over from development. Now a more practical example
demonstrates the usage.

* Allow plugins to register system jobs

* Rename system job metadata

To clarify which meta-attributes belong to system jobs, each of them is
now prefixed with 'system_'.

* Add predefined job interval choices

* Remove 'system_enabled' JobRunner attribute

Previously, the 'system_enabled' attribute was used to control whether a
job should run or not. However, this can also be accomplished by
evaluating the job's interval.

* Fix test

* Use a decorator to register system jobs

* Specify interval when registering system job

* Update documentation

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
This commit is contained in:
Alexander Haase 2024-11-01 19:56:08 +01:00 committed by GitHub
parent 6dc75d8db1
commit 4bba92617d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 147 additions and 14 deletions

View File

@ -29,6 +29,9 @@ class MyTestJob(JobRunner):
You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead. You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.
!!! tip
A set of predefined intervals is available at `core.choices.JobIntervalChoices` for convenience.
### Attributes ### Attributes
`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged. `JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.
@ -46,26 +49,52 @@ As described above, jobs can be scheduled for immediate execution or at any late
#### Example #### Example
```python title="models.py"
from django.db import models
from core.choices import JobIntervalChoices
from netbox.models import NetBoxModel
from .jobs import MyTestJob
class MyModel(NetBoxModel):
foo = models.CharField()
def save(self, *args, **kwargs):
MyTestJob.enqueue_once(instance=self, interval=JobIntervalChoices.INTERVAL_HOURLY)
return super().save(*args, **kwargs)
def sync(self):
MyTestJob.enqueue(instance=self)
```
### System Jobs
Some plugins may implement background jobs that are decoupled from the request/response cycle. Typical use cases would be housekeeping tasks or synchronization jobs. These can be registered as _system jobs_ using the `system_job()` decorator. The job interval must be passed as an integer (in minutes) when registering a system job. System jobs are scheduled automatically when the RQ worker (`manage.py rqworker`) is run.
#### Example
```python title="jobs.py" ```python title="jobs.py"
from netbox.jobs import JobRunner from core.choices import JobIntervalChoices
from netbox.jobs import JobRunner, system_job
from .models import MyModel
# Specify a predefined choice or an integer indicating
# the number of minutes between job executions
@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
class MyHousekeepingJob(JobRunner): class MyHousekeepingJob(JobRunner):
class Meta: class Meta:
name = "Housekeeping" name = "My Housekeeping Job"
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
# your logic goes here MyModel.objects.filter(foo='bar').delete()
system_jobs = (
MyHousekeepingJob,
)
``` ```
```python title="__init__.py" !!! note
from netbox.plugins import PluginConfig Ensure that any system jobs are imported on initialization. Otherwise, they won't be registered. This can be achieved by extending the PluginConfig's `ready()` method.
class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
```
## Task queues ## Task queues

View File

@ -18,6 +18,6 @@ backends = [MyDataBackend]
``` ```
!!! tip !!! tip
The path to the list of search indexes can be modified by setting `data_backends` in the PluginConfig instance. The path to the list of data backends can be modified by setting `data_backends` in the PluginConfig instance.
::: netbox.data_backends.DataBackend ::: netbox.data_backends.DataBackend

View File

@ -72,6 +72,20 @@ class JobStatusChoices(ChoiceSet):
) )
class JobIntervalChoices(ChoiceSet):
INTERVAL_MINUTELY = 1
INTERVAL_HOURLY = 60
INTERVAL_DAILY = 60 * 24
INTERVAL_WEEKLY = 60 * 24 * 7
CHOICES = (
(INTERVAL_MINUTELY, _('Minutely')),
(INTERVAL_HOURLY, _('Hourly')),
(INTERVAL_DAILY, _('Daily')),
(INTERVAL_WEEKLY, _('Weekly')),
)
# #
# ObjectChanges # ObjectChanges
# #

View File

@ -2,6 +2,8 @@ import logging
from django_rq.management.commands.rqworker import Command as _Command from django_rq.management.commands.rqworker import Command as _Command
from netbox.registry import registry
DEFAULT_QUEUES = ('high', 'default', 'low') DEFAULT_QUEUES = ('high', 'default', 'low')
@ -14,6 +16,15 @@ class Command(_Command):
of only the 'default' queue). of only the 'default' queue).
""" """
def handle(self, *args, **options): def handle(self, *args, **options):
# Setup system jobs.
for job, kwargs in registry['system_jobs'].items():
try:
interval = kwargs['interval']
except KeyError:
raise TypeError("System job must specify an interval (in minutes).")
logger.debug(f"Scheduling system job {job.name} (interval={interval})")
job.enqueue_once(**kwargs)
# Run the worker with scheduler functionality # Run the worker with scheduler functionality
options['with_scheduler'] = True options['with_scheduler'] = True

View File

@ -2,6 +2,7 @@ import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import timedelta from datetime import timedelta
from django.core.exceptions import ImproperlyConfigured
from django.utils.functional import classproperty from django.utils.functional import classproperty
from django_pglocks import advisory_lock from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException from rq.timeouts import JobTimeoutException
@ -9,12 +10,30 @@ from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices from core.choices import JobStatusChoices
from core.models import Job, ObjectType from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS from netbox.constants import ADVISORY_LOCK_KEYS
from netbox.registry import registry
__all__ = ( __all__ = (
'JobRunner', 'JobRunner',
'system_job',
) )
def system_job(interval):
"""
Decorator for registering a `JobRunner` class as system background job.
"""
if type(interval) is not int:
raise ImproperlyConfigured("System job interval must be an integer (minutes).")
def _wrapper(cls):
registry['system_jobs'][cls] = {
'interval': interval
}
return cls
return _wrapper
class JobRunner(ABC): class JobRunner(ABC):
""" """
Background Job helper class. Background Job helper class.
@ -129,7 +148,7 @@ class JobRunner(ABC):
if job: if job:
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise, # If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
# delete the existing job and schedule a new job instead. # delete the existing job and schedule a new job instead.
if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval): if (not schedule_at or job.scheduled == schedule_at) and (job.interval == interval):
return job return job
job.delete() job.delete()

View File

@ -30,6 +30,7 @@ registry = Registry({
'models': collections.defaultdict(set), 'models': collections.defaultdict(set),
'plugins': dict(), 'plugins': dict(),
'search': dict(), 'search': dict(),
'system_jobs': dict(),
'tables': collections.defaultdict(dict), 'tables': collections.defaultdict(dict),
'views': collections.defaultdict(dict), 'views': collections.defaultdict(dict),
'widgets': dict(), 'widgets': dict(),

View File

@ -21,5 +21,10 @@ class DummyPluginConfig(PluginConfig):
'netbox.tests.dummy_plugin.events.process_events_queue' 'netbox.tests.dummy_plugin.events.process_events_queue'
] ]
def ready(self):
super().ready()
from . import jobs # noqa: F401
config = DummyPluginConfig config = DummyPluginConfig

View File

@ -0,0 +1,9 @@
from core.choices import JobIntervalChoices
from netbox.jobs import JobRunner, system_job
@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
class DummySystemJob(JobRunner):
def run(self, *args, **kwargs):
pass

View File

@ -90,6 +90,15 @@ class EnqueueTest(JobRunnerTestCase):
self.assertEqual(job1, job2) self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1) self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
def test_enqueue_once_twice_same_no_schedule_at(self):
instance = DataSource()
schedule_at = self.get_schedule_at()
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(instance)
self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
def test_enqueue_once_twice_different_schedule_at(self): def test_enqueue_once_twice_different_schedule_at(self):
instance = DataSource() instance = DataSource()
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at()) job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at())
@ -127,3 +136,30 @@ class EnqueueTest(JobRunnerTestCase):
self.assertNotEqual(job1, job2) self.assertNotEqual(job1, job2)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db) self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1) self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)
class SystemJobTest(JobRunnerTestCase):
"""
Test that system jobs can be scheduled.
General functionality already tested by `JobRunnerTest` and `EnqueueTest`.
"""
def test_scheduling(self):
# Can job be enqueued?
job = TestJobRunner.enqueue(schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(TestJobRunner.get_jobs().count(), 1)
# Can job be deleted again?
job.delete()
self.assertRaises(Job.DoesNotExist, job.refresh_from_db)
self.assertEqual(TestJobRunner.get_jobs().count(), 0)
def test_enqueue_once(self):
schedule_at = self.get_schedule_at()
job1 = TestJobRunner.enqueue_once(schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(schedule_at=schedule_at)
self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs().count(), 1)

View File

@ -5,8 +5,10 @@ from django.core.exceptions import ImproperlyConfigured
from django.test import Client, TestCase, override_settings from django.test import Client, TestCase, override_settings
from django.urls import reverse from django.urls import reverse
from core.choices import JobIntervalChoices
from netbox.tests.dummy_plugin import config as dummy_config from netbox.tests.dummy_plugin import config as dummy_config
from netbox.tests.dummy_plugin.data_backends import DummyBackend from netbox.tests.dummy_plugin.data_backends import DummyBackend
from netbox.tests.dummy_plugin.jobs import DummySystemJob
from netbox.plugins.navigation import PluginMenu from netbox.plugins.navigation import PluginMenu
from netbox.plugins.utils import get_plugin_config from netbox.plugins.utils import get_plugin_config
from netbox.graphql.schema import Query from netbox.graphql.schema import Query
@ -130,6 +132,13 @@ class PluginTest(TestCase):
self.assertIn('dummy', registry['data_backends']) self.assertIn('dummy', registry['data_backends'])
self.assertIs(registry['data_backends']['dummy'], DummyBackend) self.assertIs(registry['data_backends']['dummy'], DummyBackend)
def test_system_jobs(self):
"""
Check registered system jobs.
"""
self.assertIn(DummySystemJob, registry['system_jobs'])
self.assertEqual(registry['system_jobs'][DummySystemJob]['interval'], JobIntervalChoices.INTERVAL_HOURLY)
def test_queues(self): def test_queues(self):
""" """
Check that plugin queues are registered with the accurate name. Check that plugin queues are registered with the accurate name.