From 4bba92617d88f1eb63e9fc8894d54a928a34d9f2 Mon Sep 17 00:00:00 2001 From: Alexander Haase Date: Fri, 1 Nov 2024 19:56:08 +0100 Subject: [PATCH] Closes #16971: Add system jobs (#17716) * Fix check for existing jobs If a job is to be enqueued once and no specific scheduled time is specified, any scheduled time of existing jobs will be valid. Only if a specific scheduled time is specified for 'enqueue_once()' can it be evaluated. * Allow system jobs to be registered A new registry key allows background system jobs to be registered and automatically scheduled when rqworker starts. * Test scheduling of system jobs * Fix plugins scheduled job documentation The documentation reflected a non-production state of the JobRunner framework left over from development. Now a more practical example demonstrates the usage. * Allow plugins to register system jobs * Rename system job metadata To clarify which meta-attributes belong to system jobs, each of them is now prefixed with 'system_'. * Add predefined job interval choices * Remove 'system_enabled' JobRunner attribute Previously, the 'system_enabled' attribute was used to control whether a job should run or not. However, this can also be accomplished by evaluating the job's interval. * Fix test * Use a decorator to register system jobs * Specify interval when registering system job * Update documentation --------- Co-authored-by: Jeremy Stretch --- docs/plugins/development/background-jobs.md | 53 +++++++++++++++----- docs/plugins/development/data-backends.md | 2 +- netbox/core/choices.py | 14 ++++++ netbox/core/management/commands/rqworker.py | 11 ++++ netbox/netbox/jobs.py | 21 +++++++- netbox/netbox/registry.py | 1 + netbox/netbox/tests/dummy_plugin/__init__.py | 5 ++ netbox/netbox/tests/dummy_plugin/jobs.py | 9 ++++ netbox/netbox/tests/test_jobs.py | 36 +++++++++++++ netbox/netbox/tests/test_plugins.py | 9 ++++ 10 files changed, 147 insertions(+), 14 deletions(-) create mode 100644 netbox/netbox/tests/dummy_plugin/jobs.py diff --git a/docs/plugins/development/background-jobs.md b/docs/plugins/development/background-jobs.md index 873390a58..d51981b9e 100644 --- a/docs/plugins/development/background-jobs.md +++ b/docs/plugins/development/background-jobs.md @@ -29,6 +29,9 @@ class MyTestJob(JobRunner): You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead. +!!! tip + A set of predefined intervals is available at `core.choices.JobIntervalChoices` for convenience. + ### Attributes `JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged. @@ -46,26 +49,52 @@ As described above, jobs can be scheduled for immediate execution or at any late #### Example +```python title="models.py" +from django.db import models +from core.choices import JobIntervalChoices +from netbox.models import NetBoxModel +from .jobs import MyTestJob + +class MyModel(NetBoxModel): + foo = models.CharField() + + def save(self, *args, **kwargs): + MyTestJob.enqueue_once(instance=self, interval=JobIntervalChoices.INTERVAL_HOURLY) + return super().save(*args, **kwargs) + + def sync(self): + MyTestJob.enqueue(instance=self) +``` + + +### System Jobs + +Some plugins may implement background jobs that are decoupled from the request/response cycle. Typical use cases would be housekeeping tasks or synchronization jobs. These can be registered as _system jobs_ using the `system_job()` decorator. The job interval must be passed as an integer (in minutes) when registering a system job. System jobs are scheduled automatically when the RQ worker (`manage.py rqworker`) is run. + +#### Example + ```python title="jobs.py" -from netbox.jobs import JobRunner - +from core.choices import JobIntervalChoices +from netbox.jobs import JobRunner, system_job +from .models import MyModel +# Specify a predefined choice or an integer indicating +# the number of minutes between job executions +@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY) class MyHousekeepingJob(JobRunner): class Meta: - name = "Housekeeping" + name = "My Housekeeping Job" def run(self, *args, **kwargs): - # your logic goes here + MyModel.objects.filter(foo='bar').delete() + +system_jobs = ( + MyHousekeepingJob, +) ``` -```python title="__init__.py" -from netbox.plugins import PluginConfig - -class MyPluginConfig(PluginConfig): - def ready(self): - from .jobs import MyHousekeepingJob - MyHousekeepingJob.setup(interval=60) -``` +!!! note + Ensure that any system jobs are imported on initialization. Otherwise, they won't be registered. This can be achieved by extending the PluginConfig's `ready()` method. ## Task queues diff --git a/docs/plugins/development/data-backends.md b/docs/plugins/development/data-backends.md index 8b7226a41..0c6d44d95 100644 --- a/docs/plugins/development/data-backends.md +++ b/docs/plugins/development/data-backends.md @@ -18,6 +18,6 @@ backends = [MyDataBackend] ``` !!! tip - The path to the list of search indexes can be modified by setting `data_backends` in the PluginConfig instance. + The path to the list of data backends can be modified by setting `data_backends` in the PluginConfig instance. ::: netbox.data_backends.DataBackend diff --git a/netbox/core/choices.py b/netbox/core/choices.py index 01a072ce1..442acc26b 100644 --- a/netbox/core/choices.py +++ b/netbox/core/choices.py @@ -72,6 +72,20 @@ class JobStatusChoices(ChoiceSet): ) +class JobIntervalChoices(ChoiceSet): + INTERVAL_MINUTELY = 1 + INTERVAL_HOURLY = 60 + INTERVAL_DAILY = 60 * 24 + INTERVAL_WEEKLY = 60 * 24 * 7 + + CHOICES = ( + (INTERVAL_MINUTELY, _('Minutely')), + (INTERVAL_HOURLY, _('Hourly')), + (INTERVAL_DAILY, _('Daily')), + (INTERVAL_WEEKLY, _('Weekly')), + ) + + # # ObjectChanges # diff --git a/netbox/core/management/commands/rqworker.py b/netbox/core/management/commands/rqworker.py index e1fb6fd11..b2879c3d8 100644 --- a/netbox/core/management/commands/rqworker.py +++ b/netbox/core/management/commands/rqworker.py @@ -2,6 +2,8 @@ import logging from django_rq.management.commands.rqworker import Command as _Command +from netbox.registry import registry + DEFAULT_QUEUES = ('high', 'default', 'low') @@ -14,6 +16,15 @@ class Command(_Command): of only the 'default' queue). """ def handle(self, *args, **options): + # Setup system jobs. + for job, kwargs in registry['system_jobs'].items(): + try: + interval = kwargs['interval'] + except KeyError: + raise TypeError("System job must specify an interval (in minutes).") + logger.debug(f"Scheduling system job {job.name} (interval={interval})") + job.enqueue_once(**kwargs) + # Run the worker with scheduler functionality options['with_scheduler'] = True diff --git a/netbox/netbox/jobs.py b/netbox/netbox/jobs.py index ae8f2f109..965ebc9e9 100644 --- a/netbox/netbox/jobs.py +++ b/netbox/netbox/jobs.py @@ -2,6 +2,7 @@ import logging from abc import ABC, abstractmethod from datetime import timedelta +from django.core.exceptions import ImproperlyConfigured from django.utils.functional import classproperty from django_pglocks import advisory_lock from rq.timeouts import JobTimeoutException @@ -9,12 +10,30 @@ from rq.timeouts import JobTimeoutException from core.choices import JobStatusChoices from core.models import Job, ObjectType from netbox.constants import ADVISORY_LOCK_KEYS +from netbox.registry import registry __all__ = ( 'JobRunner', + 'system_job', ) +def system_job(interval): + """ + Decorator for registering a `JobRunner` class as system background job. + """ + if type(interval) is not int: + raise ImproperlyConfigured("System job interval must be an integer (minutes).") + + def _wrapper(cls): + registry['system_jobs'][cls] = { + 'interval': interval + } + return cls + + return _wrapper + + class JobRunner(ABC): """ Background Job helper class. @@ -129,7 +148,7 @@ class JobRunner(ABC): if job: # If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise, # delete the existing job and schedule a new job instead. - if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval): + if (not schedule_at or job.scheduled == schedule_at) and (job.interval == interval): return job job.delete() diff --git a/netbox/netbox/registry.py b/netbox/netbox/registry.py index 0920cbccf..48d7921f2 100644 --- a/netbox/netbox/registry.py +++ b/netbox/netbox/registry.py @@ -30,6 +30,7 @@ registry = Registry({ 'models': collections.defaultdict(set), 'plugins': dict(), 'search': dict(), + 'system_jobs': dict(), 'tables': collections.defaultdict(dict), 'views': collections.defaultdict(dict), 'widgets': dict(), diff --git a/netbox/netbox/tests/dummy_plugin/__init__.py b/netbox/netbox/tests/dummy_plugin/__init__.py index 6ab62d638..2ca7c290c 100644 --- a/netbox/netbox/tests/dummy_plugin/__init__.py +++ b/netbox/netbox/tests/dummy_plugin/__init__.py @@ -21,5 +21,10 @@ class DummyPluginConfig(PluginConfig): 'netbox.tests.dummy_plugin.events.process_events_queue' ] + def ready(self): + super().ready() + + from . import jobs # noqa: F401 + config = DummyPluginConfig diff --git a/netbox/netbox/tests/dummy_plugin/jobs.py b/netbox/netbox/tests/dummy_plugin/jobs.py new file mode 100644 index 000000000..3b9dc7a5f --- /dev/null +++ b/netbox/netbox/tests/dummy_plugin/jobs.py @@ -0,0 +1,9 @@ +from core.choices import JobIntervalChoices +from netbox.jobs import JobRunner, system_job + + +@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY) +class DummySystemJob(JobRunner): + + def run(self, *args, **kwargs): + pass diff --git a/netbox/netbox/tests/test_jobs.py b/netbox/netbox/tests/test_jobs.py index 52a7bd97a..e3e24a235 100644 --- a/netbox/netbox/tests/test_jobs.py +++ b/netbox/netbox/tests/test_jobs.py @@ -90,6 +90,15 @@ class EnqueueTest(JobRunnerTestCase): self.assertEqual(job1, job2) self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1) + def test_enqueue_once_twice_same_no_schedule_at(self): + instance = DataSource() + schedule_at = self.get_schedule_at() + job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at) + job2 = TestJobRunner.enqueue_once(instance) + + self.assertEqual(job1, job2) + self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1) + def test_enqueue_once_twice_different_schedule_at(self): instance = DataSource() job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at()) @@ -127,3 +136,30 @@ class EnqueueTest(JobRunnerTestCase): self.assertNotEqual(job1, job2) self.assertRaises(Job.DoesNotExist, job1.refresh_from_db) self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1) + + +class SystemJobTest(JobRunnerTestCase): + """ + Test that system jobs can be scheduled. + + General functionality already tested by `JobRunnerTest` and `EnqueueTest`. + """ + + def test_scheduling(self): + # Can job be enqueued? + job = TestJobRunner.enqueue(schedule_at=self.get_schedule_at()) + self.assertIsInstance(job, Job) + self.assertEqual(TestJobRunner.get_jobs().count(), 1) + + # Can job be deleted again? + job.delete() + self.assertRaises(Job.DoesNotExist, job.refresh_from_db) + self.assertEqual(TestJobRunner.get_jobs().count(), 0) + + def test_enqueue_once(self): + schedule_at = self.get_schedule_at() + job1 = TestJobRunner.enqueue_once(schedule_at=schedule_at) + job2 = TestJobRunner.enqueue_once(schedule_at=schedule_at) + + self.assertEqual(job1, job2) + self.assertEqual(TestJobRunner.get_jobs().count(), 1) diff --git a/netbox/netbox/tests/test_plugins.py b/netbox/netbox/tests/test_plugins.py index 16778667d..db82d0a75 100644 --- a/netbox/netbox/tests/test_plugins.py +++ b/netbox/netbox/tests/test_plugins.py @@ -5,8 +5,10 @@ from django.core.exceptions import ImproperlyConfigured from django.test import Client, TestCase, override_settings from django.urls import reverse +from core.choices import JobIntervalChoices from netbox.tests.dummy_plugin import config as dummy_config from netbox.tests.dummy_plugin.data_backends import DummyBackend +from netbox.tests.dummy_plugin.jobs import DummySystemJob from netbox.plugins.navigation import PluginMenu from netbox.plugins.utils import get_plugin_config from netbox.graphql.schema import Query @@ -130,6 +132,13 @@ class PluginTest(TestCase): self.assertIn('dummy', registry['data_backends']) self.assertIs(registry['data_backends']['dummy'], DummyBackend) + def test_system_jobs(self): + """ + Check registered system jobs. + """ + self.assertIn(DummySystemJob, registry['system_jobs']) + self.assertEqual(registry['system_jobs'][DummySystemJob]['interval'], JobIntervalChoices.INTERVAL_HOURLY) + def test_queues(self): """ Check that plugin queues are registered with the accurate name.