mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-29 11:56:25 -06:00
Specify interval when registering system job
This commit is contained in:
parent
566201507d
commit
6b192c9ae8
@ -17,11 +17,13 @@ class Command(_Command):
|
||||
"""
|
||||
def handle(self, *args, **options):
|
||||
# Setup system jobs.
|
||||
for job in registry['system_jobs'].values():
|
||||
interval = getattr(job.Meta, 'system_interval', 0)
|
||||
if interval:
|
||||
logger.debug(f"Scheduling system job {job.name}")
|
||||
job.enqueue_once(interval=interval)
|
||||
for job, kwargs in registry['system_jobs'].items():
|
||||
try:
|
||||
interval = kwargs['interval']
|
||||
except KeyError:
|
||||
raise TypeError("System job must specify an interval (in minutes).")
|
||||
logger.debug(f"Scheduling system job {job.name} (interval={interval})")
|
||||
job.enqueue_once(**kwargs)
|
||||
|
||||
# Run the worker with scheduler functionality
|
||||
options['with_scheduler'] = True
|
||||
|
@ -2,6 +2,7 @@ import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import timedelta
|
||||
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.utils.functional import classproperty
|
||||
from django_pglocks import advisory_lock
|
||||
from rq.timeouts import JobTimeoutException
|
||||
@ -17,12 +18,17 @@ __all__ = (
|
||||
)
|
||||
|
||||
|
||||
def system_job():
|
||||
def system_job(interval):
|
||||
"""
|
||||
Decorator for registering a `JobRunner` class as system background job.
|
||||
"""
|
||||
if type(interval) is not int:
|
||||
raise ImproperlyConfigured("System job interval must be an integer (minutes).")
|
||||
|
||||
def _wrapper(cls):
|
||||
registry['system_jobs'][cls.name] = cls
|
||||
registry['system_jobs'][cls] = {
|
||||
'interval': interval
|
||||
}
|
||||
return cls
|
||||
|
||||
return _wrapper
|
||||
|
@ -2,15 +2,8 @@ from core.choices import JobIntervalChoices
|
||||
from netbox.jobs import JobRunner, system_job
|
||||
|
||||
|
||||
@system_job()
|
||||
@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
|
||||
class DummySystemJob(JobRunner):
|
||||
class Meta:
|
||||
system_interval = JobIntervalChoices.INTERVAL_HOURLY
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
system_jobs = (
|
||||
DummySystemJob,
|
||||
)
|
||||
|
@ -5,6 +5,7 @@ from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import Client, TestCase, override_settings
|
||||
from django.urls import reverse
|
||||
|
||||
from core.choices import JobIntervalChoices
|
||||
from netbox.tests.dummy_plugin import config as dummy_config
|
||||
from netbox.tests.dummy_plugin.data_backends import DummyBackend
|
||||
from netbox.tests.dummy_plugin.jobs import DummySystemJob
|
||||
@ -135,8 +136,8 @@ class PluginTest(TestCase):
|
||||
"""
|
||||
Check registered system jobs.
|
||||
"""
|
||||
self.assertIn(DummySystemJob.name, registry['system_jobs'])
|
||||
self.assertIs(registry['system_jobs'][DummySystemJob.name], DummySystemJob)
|
||||
self.assertIn(DummySystemJob, registry['system_jobs'])
|
||||
self.assertEqual(registry['system_jobs'][DummySystemJob]['interval'], JobIntervalChoices.INTERVAL_HOURLY)
|
||||
|
||||
def test_queues(self):
|
||||
"""
|
||||
|
Loading…
Reference in New Issue
Block a user