diff --git a/netbox/core/api/views.py b/netbox/core/api/views.py index ff488e3cd..b3a024c02 100644 --- a/netbox/core/api/views.py +++ b/netbox/core/api/views.py @@ -7,6 +7,8 @@ from rest_framework.routers import APIRootView from rest_framework.viewsets import ReadOnlyModelViewSet from core import filtersets +from core.choices import DataSourceStatusChoices +from core.jobs import SyncDataSourceJob from core.models import * from netbox.api.metadata import ContentTypeMetadata from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet @@ -36,7 +38,11 @@ class DataSourceViewSet(NetBoxModelViewSet): if not request.user.has_perm('core.sync_datasource', obj=datasource): raise PermissionDenied(_("This user does not have permission to synchronize this data source.")) - datasource.enqueue_sync_job(request) + # Enqueue the sync job & update the DataSource's status + SyncDataSourceJob.enqueue(instance=datasource, user=request.user) + datasource.status = DataSourceStatusChoices.QUEUED + DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status) + serializer = serializers.DataSourceSerializer(datasource, context={'request': request}) return Response(serializer.data) diff --git a/netbox/core/migrations/0012_job_object_type_optional.py b/netbox/core/migrations/0012_job_object_type_optional.py new file mode 100644 index 000000000..3c6664afc --- /dev/null +++ b/netbox/core/migrations/0012_job_object_type_optional.py @@ -0,0 +1,24 @@ +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('contenttypes', '0002_remove_content_type_name'), + ('core', '0011_move_objectchange'), + ] + + operations = [ + migrations.AlterField( + model_name='job', + name='object_type', + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name='jobs', + to='contenttypes.contenttype' + ), + ), + ] diff --git a/netbox/core/models/data.py b/netbox/core/models/data.py index 5785d7276..a8e90ec3f 100644 --- a/netbox/core/models/data.py +++ b/netbox/core/models/data.py @@ -1,10 +1,10 @@ import hashlib import logging import os -import yaml from fnmatch import fnmatchcase from urllib.parse import urlparse +import yaml from django.conf import settings from django.contrib.contenttypes.fields import GenericForeignKey from django.core.exceptions import ValidationError @@ -12,7 +12,6 @@ from django.core.validators import RegexValidator from django.db import models from django.urls import reverse from django.utils import timezone -from django.utils.module_loading import import_string from django.utils.translation import gettext as _ from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED @@ -23,7 +22,6 @@ from utilities.querysets import RestrictedQuerySet from ..choices import * from ..exceptions import SyncError from ..signals import post_sync, pre_sync -from .jobs import Job __all__ = ( 'AutoSyncRecord', @@ -153,21 +151,6 @@ class DataSource(JobsMixin, PrimaryModel): return objectchange - def enqueue_sync_job(self, request): - """ - Enqueue a background job to synchronize the DataSource by calling sync(). - """ - # Set the status to "syncing" - self.status = DataSourceStatusChoices.QUEUED - DataSource.objects.filter(pk=self.pk).update(status=self.status) - - # Enqueue a sync job - SyncDataSourceJob = import_string('core.jobs.SyncDataSourceJob') - return SyncDataSourceJob.enqueue( - instance=self, - user=request.user - ) - def get_backend(self): backend_params = self.parameters or {} return self.backend_class(self.source_url, **backend_params) diff --git a/netbox/core/models/jobs.py b/netbox/core/models/jobs.py index 14f32f447..822d9b201 100644 --- a/netbox/core/models/jobs.py +++ b/netbox/core/models/jobs.py @@ -31,6 +31,8 @@ class Job(models.Model): to='contenttypes.ContentType', related_name='jobs', on_delete=models.CASCADE, + blank=True, + null=True ) object_id = models.PositiveBigIntegerField( blank=True, @@ -197,27 +199,34 @@ class Job(models.Model): job_end.send(self) @classmethod - def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, run_now=False, **kwargs): + def enqueue(cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, **kwargs): """ Create a Job instance and enqueue a job using the given callable Args: func: The callable object to be enqueued for execution - instance: The NetBox object to which this job pertains + instance: The NetBox object to which this job pertains (optional) name: Name for the job (optional) user: The user responsible for running the job schedule_at: Schedule the job to be executed at the passed date and time interval: Recurrence interval (in minutes) - run_now: Run the job immediately without scheduling it in the background. Should be used for interactive + immediate: Run the job immediately without scheduling it in the background. Should be used for interactive management commands only. """ - object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) + if schedule_at and immediate: + raise ValueError("enqueue() cannot be called with values for both schedule_at and immediate.") + + if instance: + object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) + object_id = instance.pk + else: + object_type = object_id = None rq_queue_name = get_queue_for_model(object_type.model) queue = django_rq.get_queue(rq_queue_name) status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING job = Job.objects.create( object_type=object_type, - object_id=instance.pk, + object_id=object_id, name=name, status=status, scheduled=schedule_at, @@ -226,14 +235,16 @@ class Job(models.Model): job_id=uuid.uuid4() ) - # Optionally, the job can be run immediately without being scheduled to run in the background. - if run_now: + # Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous + # (blocking) operation, and execution will pause until the job completes. + if immediate: func(job_id=str(job.job_id), job=job, **kwargs) - return job - # Schedule the job to run asynchronously in the background. - if schedule_at: + # Schedule the job to run at a specific date & time. + elif schedule_at: queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs) + + # Schedule the job to run asynchronously at this first available opportunity. else: queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs) diff --git a/netbox/core/views.py b/netbox/core/views.py index 508401585..8fcdc12f3 100644 --- a/netbox/core/views.py +++ b/netbox/core/views.py @@ -35,6 +35,8 @@ from utilities.htmx import htmx_partial from utilities.query import count_related from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, register_model_view from . import filtersets, forms, tables +from .choices import DataSourceStatusChoices +from .jobs import SyncDataSourceJob from .models import * @@ -75,7 +77,11 @@ class DataSourceSyncView(BaseObjectView): def post(self, request, pk): datasource = get_object_or_404(self.queryset, pk=pk) - job = datasource.enqueue_sync_job(request) + + # Enqueue the sync job & update the DataSource's status + job = SyncDataSourceJob.enqueue(instance=datasource, user=request.user) + datasource.status = DataSourceStatusChoices.QUEUED + DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status) messages.success(request, f"Queued job #{job.pk} to sync {datasource}") return redirect(datasource.get_absolute_url()) diff --git a/netbox/extras/events.py b/netbox/extras/events.py index a98ebf8e9..fc68d0f5f 100644 --- a/netbox/extras/events.py +++ b/netbox/extras/events.py @@ -1,4 +1,5 @@ import logging +from collections import defaultdict from django.conf import settings from django.contrib.auth import get_user_model @@ -8,9 +9,7 @@ from django.utils.module_loading import import_string from django.utils.translation import gettext as _ from django_rq import get_queue -from core.choices import ObjectChangeActionChoices from core.events import * -from core.models import Job from netbox.config import get_config from netbox.constants import RQ_QUEUE_DEFAULT from netbox.registry import registry @@ -126,7 +125,7 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non script = event_rule.action_object.python_class() # Enqueue a Job to record the script's execution - ScriptJob = import_string("extras.jobs.ScriptJob") + from extras.jobs import ScriptJob ScriptJob.enqueue( instance=event_rule.action_object, name=script.name, diff --git a/netbox/extras/management/commands/runscript.py b/netbox/extras/management/commands/runscript.py index ad828971c..e33ef6ec8 100644 --- a/netbox/extras/management/commands/runscript.py +++ b/netbox/extras/management/commands/runscript.py @@ -89,7 +89,7 @@ class Command(BaseCommand): instance=script_obj, name=script.name, user=user, - run_now=True, + immediate=True, data=data, request=NetBoxFakeRequest({ 'META': {}, diff --git a/netbox/utilities/jobs.py b/netbox/utilities/jobs.py index 07e663cdc..7691f01d5 100644 --- a/netbox/utilities/jobs.py +++ b/netbox/utilities/jobs.py @@ -74,16 +74,20 @@ class BackgroundJob(ABC): ) @classmethod - def get_jobs(cls, instance): + def get_jobs(cls, instance=None): """ Get all jobs of this `BackgroundJob` related to a specific instance. """ - object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) - return Job.objects.filter( - object_type=object_type, - object_id=instance.pk, - name=cls.name, - ) + jobs = Job.objects.filter(name=cls.name) + + if instance: + object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) + jobs = jobs.filter( + object_type=object_type, + object_id=instance.pk, + ) + + return jobs @classmethod def enqueue(cls, *args, **kwargs): @@ -97,7 +101,7 @@ class BackgroundJob(ABC): @classmethod @advisory_lock(ADVISORY_LOCK_KEYS['job-schedules']) - def enqueue_once(cls, instance, interval=None, *args, **kwargs): + def enqueue_once(cls, instance=None, interval=None, *args, **kwargs): """ Enqueue a new `BackgroundJob` once, i.e. skip duplicate jobs. @@ -112,7 +116,7 @@ class BackgroundJob(ABC): For additional parameters see `enqueue()`. Args: - instance: The NetBox object to which this `BackgroundJob` pertains + instance: The NetBox object to which this `BackgroundJob` pertains (optional) interval: Recurrence interval (in minutes) """ job = cls.get_jobs(instance).filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).first() @@ -137,30 +141,11 @@ class SystemJob(BackgroundJob): for system background tasks. The main use case for this method is to schedule jobs programmatically instead of using user events, e.g. to start - jobs when the plugin is loaded in NetBox. For this purpose, the `setup()` method can be used to setup a new schedule - outside of the request-response cycle. It will register the new schedule right after all plugins are loaded and the - database is connected. Then `schedule()` will take care of scheduling a single job at a time. + jobs when the plugin is loaded in NetBox. For this purpose, the `setup()` method can be used to set up a new + schedule outside the request-response cycle. It will register the new schedule right after all plugins are loaded + and the database is connected. Then `schedule()` will take care of scheduling a single job at a time. """ - @classmethod - def enqueue(cls, *args, **kwargs): - kwargs.pop('instance', None) - return super().enqueue(instance=Job(), *args, **kwargs) - - @classmethod - def enqueue_once(cls, *args, **kwargs): - kwargs.pop('instance', None) - return super().enqueue_once(instance=Job(), *args, **kwargs) - - @classmethod - def handle(cls, job, *args, **kwargs): - # A job requires a related object to be handled, or internal methods will fail. To avoid adding an extra model - # for this, the existing job object is used as a reference. This is not ideal, but it works for this purpose. - job.object = job - job.object_id = None # Hide changes from UI - - super().handle(job, *args, **kwargs) - @classmethod def setup(cls, *args, **kwargs): """