Compare commits

...

3 Commits

Author SHA1 Message Date
Jeremy Stretch
e002c747b0
Merge 87d8403fe4 into 875a641687 2025-07-11 10:50:16 +03:00
Jeremy Stretch
875a641687
Closes #19589: Background job for bulk operations (#19804)
Some checks failed
CI / build (20.x, 3.10) (push) Has been cancelled
CI / build (20.x, 3.11) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
* Initial work on #19589

* Add tooling for handling background requests

* UI notification should link to enqueued job

* Use an informative name for the job

* Disable background jobs for file uploads
2025-07-10 09:32:35 -05:00
Jeremy Stretch
87d8403fe4 Closes #19829: Move object types REST API endpoint to core app 2025-07-09 12:15:21 -04:00
19 changed files with 205 additions and 83 deletions

View File

@ -1,4 +1,5 @@
from .serializers_.change_logging import * from .serializers_.change_logging import *
from .serializers_.data import * from .serializers_.data import *
from .serializers_.jobs import * from .serializers_.jobs import *
from .serializers_.object_types import *
from .serializers_.tasks import * from .serializers_.tasks import *

View File

@ -10,6 +10,7 @@ router.register('data-sources', views.DataSourceViewSet)
router.register('data-files', views.DataFileViewSet) router.register('data-files', views.DataFileViewSet)
router.register('jobs', views.JobViewSet) router.register('jobs', views.JobViewSet)
router.register('object-changes', views.ObjectChangeViewSet) router.register('object-changes', views.ObjectChangeViewSet)
router.register('object-types', views.ObjectTypeViewSet)
router.register('background-queues', views.BackgroundQueueViewSet, basename='rqqueue') router.register('background-queues', views.BackgroundQueueViewSet, basename='rqqueue')
router.register('background-workers', views.BackgroundWorkerViewSet, basename='rqworker') router.register('background-workers', views.BackgroundWorkerViewSet, basename='rqworker')
router.register('background-tasks', views.BackgroundTaskViewSet, basename='rqtask') router.register('background-tasks', views.BackgroundTaskViewSet, basename='rqtask')

View File

@ -17,6 +17,7 @@ from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs, requeue_rq_jo
from django_rq.queues import get_redis_connection from django_rq.queues import get_redis_connection
from django_rq.utils import get_statistics from django_rq.utils import get_statistics
from django_rq.settings import QUEUES_LIST from django_rq.settings import QUEUES_LIST
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.metadata import ContentTypeMetadata from netbox.api.metadata import ContentTypeMetadata
from netbox.api.pagination import LimitOffsetListPagination from netbox.api.pagination import LimitOffsetListPagination
from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
@ -85,6 +86,16 @@ class ObjectChangeViewSet(ReadOnlyModelViewSet):
filterset_class = filtersets.ObjectChangeFilterSet filterset_class = filtersets.ObjectChangeFilterSet
class ObjectTypeViewSet(ReadOnlyModelViewSet):
"""
Read-only list of ObjectTypes.
"""
permission_classes = [IsAuthenticatedOrLoginNotRequired]
queryset = ObjectType.objects.order_by('app_label', 'model')
serializer_class = serializers.ObjectTypeSerializer
filterset_class = filtersets.ObjectTypeFilterSet
class BaseRQViewSet(viewsets.ViewSet): class BaseRQViewSet(viewsets.ViewSet):
""" """
Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data(). Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data().

View File

@ -1,9 +1,8 @@
import django_filters
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.db.models import Q from django.db.models import Q
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
import django_filters
from netbox.filtersets import BaseFilterSet, ChangeLoggedModelFilterSet, NetBoxModelFilterSet from netbox.filtersets import BaseFilterSet, ChangeLoggedModelFilterSet, NetBoxModelFilterSet
from netbox.utils import get_data_backend_choices from netbox.utils import get_data_backend_choices
from users.models import User from users.models import User
@ -17,6 +16,7 @@ __all__ = (
'DataSourceFilterSet', 'DataSourceFilterSet',
'JobFilterSet', 'JobFilterSet',
'ObjectChangeFilterSet', 'ObjectChangeFilterSet',
'ObjectTypeFilterSet',
) )
@ -134,6 +134,25 @@ class JobFilterSet(BaseFilterSet):
) )
class ObjectTypeFilterSet(django_filters.FilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)
class Meta:
model = ObjectType
fields = ('id', 'app_label', 'model')
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(app_label__icontains=value) |
Q(model__icontains=value)
)
class ObjectChangeFilterSet(BaseFilterSet): class ObjectChangeFilterSet(BaseFilterSet):
q = django_filters.CharFilter( q = django_filters.CharFilter(
method='search', method='search',

View File

@ -116,7 +116,7 @@ class Job(models.Model):
verbose_name_plural = _('jobs') verbose_name_plural = _('jobs')
def __str__(self): def __str__(self):
return str(self.job_id) return self.name
def get_absolute_url(self): def get_absolute_url(self):
# TODO: Employ dynamic registration # TODO: Employ dynamic registration

View File

@ -7,6 +7,7 @@ from django.utils import timezone
from rq.job import Job as RQ_Job, JobStatus from rq.job import Job as RQ_Job, JobStatus
from rq.registry import FailedJobRegistry, StartedJobRegistry from rq.registry import FailedJobRegistry, StartedJobRegistry
from rest_framework import status
from users.models import Token, User from users.models import Token, User
from utilities.testing import APITestCase, APIViewTestCases, TestCase from utilities.testing import APITestCase, APIViewTestCases, TestCase
from utilities.testing.utils import disable_logging from utilities.testing.utils import disable_logging
@ -101,6 +102,22 @@ class DataFileTest(
DataFile.objects.bulk_create(data_files) DataFile.objects.bulk_create(data_files)
class ObjectTypeTest(APITestCase):
def test_list_objects(self):
object_type_count = ObjectType.objects.count()
response = self.client.get(reverse('extras-api:objecttype-list'), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], object_type_count)
def test_get_object(self):
object_type = ObjectType.objects.first()
url = reverse('extras-api:objecttype-detail', kwargs={'pk': object_type.pk})
self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
class BackgroundTaskTestCase(TestCase): class BackgroundTaskTestCase(TestCase):
user_permissions = () user_permissions = ()

View File

@ -1,4 +1,3 @@
from .serializers_.objecttypes import *
from .serializers_.attachments import * from .serializers_.attachments import *
from .serializers_.bookmarks import * from .serializers_.bookmarks import *
from .serializers_.customfields import * from .serializers_.customfields import *

View File

@ -1,5 +1,6 @@
from django.urls import include, path from django.urls import include, path
from core.api.views import ObjectTypeViewSet
from netbox.api.routers import NetBoxRouter from netbox.api.routers import NetBoxRouter
from . import views from . import views
@ -26,7 +27,9 @@ router.register('journal-entries', views.JournalEntryViewSet)
router.register('config-contexts', views.ConfigContextViewSet) router.register('config-contexts', views.ConfigContextViewSet)
router.register('config-templates', views.ConfigTemplateViewSet) router.register('config-templates', views.ConfigTemplateViewSet)
router.register('scripts', views.ScriptViewSet, basename='script') router.register('scripts', views.ScriptViewSet, basename='script')
router.register('object-types', views.ObjectTypeViewSet)
# TODO: Remove in NetBox v4.5
router.register('object-types', ObjectTypeViewSet)
app_name = 'extras-api' app_name = 'extras-api'
urlpatterns = [ urlpatterns = [

View File

@ -10,10 +10,9 @@ from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
from rest_framework.renderers import JSONRenderer from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.routers import APIRootView from rest_framework.routers import APIRootView
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from rest_framework.viewsets import ModelViewSet
from rq import Worker from rq import Worker
from core.models import ObjectType
from extras import filtersets from extras import filtersets
from extras.jobs import ScriptJob from extras.jobs import ScriptJob
from extras.models import * from extras.models import *
@ -314,20 +313,6 @@ class ScriptViewSet(ModelViewSet):
return Response(input_serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(input_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
#
# Object types
#
class ObjectTypeViewSet(ReadOnlyModelViewSet):
"""
Read-only list of ObjectTypes.
"""
permission_classes = [IsAuthenticatedOrLoginNotRequired]
queryset = ObjectType.objects.order_by('app_label', 'model')
serializer_class = serializers.ObjectTypeSerializer
filterset_class = filtersets.ObjectTypeFilterSet
# #
# User dashboard # User dashboard
# #

View File

@ -29,7 +29,6 @@ __all__ = (
'JournalEntryFilterSet', 'JournalEntryFilterSet',
'LocalConfigContextFilterSet', 'LocalConfigContextFilterSet',
'NotificationGroupFilterSet', 'NotificationGroupFilterSet',
'ObjectTypeFilterSet',
'SavedFilterFilterSet', 'SavedFilterFilterSet',
'ScriptFilterSet', 'ScriptFilterSet',
'TableConfigFilterSet', 'TableConfigFilterSet',
@ -788,26 +787,3 @@ class LocalConfigContextFilterSet(django_filters.FilterSet):
def _local_context_data(self, queryset, name, value): def _local_context_data(self, queryset, name, value):
return queryset.exclude(local_context_data__isnull=value) return queryset.exclude(local_context_data__isnull=value)
#
# ContentTypes
#
class ObjectTypeFilterSet(django_filters.FilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)
class Meta:
model = ObjectType
fields = ('id', 'app_label', 'model')
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(app_label__icontains=value) |
Q(model__icontains=value)
)

View File

@ -3,7 +3,6 @@ import datetime
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.urls import reverse from django.urls import reverse
from django.utils.timezone import make_aware, now from django.utils.timezone import make_aware, now
from rest_framework import status
from core.choices import ManagedFileRootPathChoices from core.choices import ManagedFileRootPathChoices
from core.events import * from core.events import *
@ -921,22 +920,6 @@ class CreatedUpdatedFilterTest(APITestCase):
self.assertEqual(response.data['results'][0]['id'], rack2.pk) self.assertEqual(response.data['results'][0]['id'], rack2.pk)
class ObjectTypeTest(APITestCase):
def test_list_objects(self):
object_type_count = ObjectType.objects.count()
response = self.client.get(reverse('extras-api:objecttype-list'), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], object_type_count)
def test_get_object(self):
object_type = ObjectType.objects.first()
url = reverse('extras-api:objecttype-detail', kwargs={'pk': object_type.pk})
self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
class SubscriptionTest(APIViewTestCases.APIViewTestCase): class SubscriptionTest(APIViewTestCases.APIViewTestCase):
model = Subscription model = Subscription
brief_fields = ['display', 'id', 'object_id', 'object_type', 'url', 'user'] brief_fields = ['display', 'id', 'object_id', 'object_type', 'url', 'user']

View File

@ -8,11 +8,15 @@ from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices from core.choices import JobStatusChoices
from core.events import JOB_COMPLETED, JOB_FAILED
from core.models import Job, ObjectType from core.models import Job, ObjectType
from extras.models import Notification
from netbox.constants import ADVISORY_LOCK_KEYS from netbox.constants import ADVISORY_LOCK_KEYS
from netbox.registry import registry from netbox.registry import registry
from utilities.request import apply_request_processors
__all__ = ( __all__ = (
'AsyncViewJob',
'JobRunner', 'JobRunner',
'system_job', 'system_job',
) )
@ -154,3 +158,35 @@ class JobRunner(ABC):
job.delete() job.delete()
return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs) return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs)
class AsyncViewJob(JobRunner):
"""
Execute a view as a background job.
"""
class Meta:
name = 'Async View'
def run(self, view_cls, request, **kwargs):
view = view_cls.as_view()
# Apply all registered request processors (e.g. event_tracking)
with apply_request_processors(request):
data = view(request)
self.job.data = {
'log': data.log,
'errors': data.errors,
}
# Notify the user
notification = Notification(
user=request.user,
object=self.job,
event_type=JOB_COMPLETED if not data.errors else JOB_FAILED,
)
notification.save()
# TODO: Waiting on fix for bug #19806
# if errors:
# raise JobFailed()

View File

@ -1,8 +1,5 @@
from contextlib import ExitStack
import logging import logging
import uuid import uuid
import warnings
from django.conf import settings from django.conf import settings
from django.contrib import auth, messages from django.contrib import auth, messages
@ -13,10 +10,10 @@ from django.db.utils import InternalError
from django.http import Http404, HttpResponseRedirect from django.http import Http404, HttpResponseRedirect
from netbox.config import clear_config, get_config from netbox.config import clear_config, get_config
from netbox.registry import registry
from netbox.views import handler_500 from netbox.views import handler_500
from utilities.api import is_api_request from utilities.api import is_api_request
from utilities.error_handlers import handle_rest_api_exception from utilities.error_handlers import handle_rest_api_exception
from utilities.request import apply_request_processors
__all__ = ( __all__ = (
'CoreMiddleware', 'CoreMiddleware',
@ -36,12 +33,7 @@ class CoreMiddleware:
request.id = uuid.uuid4() request.id = uuid.uuid4()
# Apply all registered request processors # Apply all registered request processors
with ExitStack() as stack: with apply_request_processors(request):
for request_processor in registry['request_processors']:
try:
stack.enter_context(request_processor(request))
except Exception as e:
warnings.warn(f'Failed to initialize request processor {request_processor}: {e}')
response = self.get_response(request) response = self.get_response(request)
# Check if language cookie should be renewed # Check if language cookie should be renewed

View File

@ -28,6 +28,7 @@ from utilities.export import TableExport
from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields
from utilities.forms.bulk_import import BulkImportForm from utilities.forms.bulk_import import BulkImportForm
from utilities.htmx import htmx_partial from utilities.htmx import htmx_partial
from utilities.jobs import AsyncJobData, is_background_request, process_request_as_job
from utilities.permissions import get_permission_for_model from utilities.permissions import get_permission_for_model
from utilities.query import reapply_model_ordering from utilities.query import reapply_model_ordering
from utilities.request import safe_for_redirect from utilities.request import safe_for_redirect
@ -503,25 +504,32 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
if form.is_valid(): if form.is_valid():
logger.debug("Import form validation was successful") logger.debug("Import form validation was successful")
redirect_url = reverse(get_viewname(model, action='list'))
new_objects = []
# If indicated, defer this request to a background job & redirect the user
if form.cleaned_data['background_job']:
job_name = _('Bulk import {count} {object_type}').format(
count=len(form.cleaned_data['data']),
object_type=model._meta.verbose_name_plural,
)
if job := process_request_as_job(self.__class__, request, name=job_name):
msg = _('Created background job {job.pk}: <a href="{url}">{job.name}</a>').format(
url=job.get_absolute_url(),
job=job
)
messages.info(request, mark_safe(msg))
return redirect(redirect_url)
try: try:
# Iterate through data and bind each record to a new model form instance. # Iterate through data and bind each record to a new model form instance.
with transaction.atomic(using=router.db_for_write(model)): with transaction.atomic(using=router.db_for_write(model)):
new_objs = self.create_and_update_objects(form, request) new_objects = self.create_and_update_objects(form, request)
# Enforce object-level permissions # Enforce object-level permissions
if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs): if self.queryset.filter(pk__in=[obj.pk for obj in new_objects]).count() != len(new_objects):
raise PermissionsViolation raise PermissionsViolation
if new_objs:
msg = f"Imported {len(new_objs)} {model._meta.verbose_name_plural}"
logger.info(msg)
messages.success(request, msg)
view_name = get_viewname(model, action='list')
results_url = f"{reverse(view_name)}?modified_by_request={request.id}"
return redirect(results_url)
except (AbortTransaction, ValidationError): except (AbortTransaction, ValidationError):
clear_events.send(sender=self) clear_events.send(sender=self)
@ -530,6 +538,25 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
form.add_error(None, e.message) form.add_error(None, e.message)
clear_events.send(sender=self) clear_events.send(sender=self)
# If this request was executed via a background job, return the raw data for logging
if is_background_request(request):
return AsyncJobData(
log=[
_('Created {object}').format(object=str(obj))
for obj in new_objects
],
errors=form.errors
)
if new_objects:
msg = _("Imported {count} {object_type}").format(
count=len(new_objects),
object_type=model._meta.verbose_name_plural
)
logger.info(msg)
messages.success(request, msg)
return redirect(f"{redirect_url}?modified_by_request={request.id}")
else: else:
logger.debug("Form validation failed") logger.debug("Form validation failed")

View File

@ -50,6 +50,7 @@ Context:
{% render_field form.data %} {% render_field form.data %}
{% render_field form.format %} {% render_field form.format %}
{% render_field form.csv_delimiter %} {% render_field form.csv_delimiter %}
{% render_field form.background_job %}
<div class="form-group"> <div class="form-group">
<div class="col col-md-12 text-end"> <div class="col col-md-12 text-end">
{% if return_url %} {% if return_url %}
@ -94,6 +95,7 @@ Context:
{% render_field form.data_file %} {% render_field form.data_file %}
{% render_field form.format %} {% render_field form.format %}
{% render_field form.csv_delimiter %} {% render_field form.csv_delimiter %}
{% render_field form.background_job %}
<div class="form-group"> <div class="form-group">
<div class="col col-md-12 text-end"> <div class="col col-md-12 text-end">
{% if return_url %} {% if return_url %}

View File

@ -37,6 +37,11 @@ class BulkImportForm(SyncedDataMixin, forms.Form):
help_text=_("The character which delimits CSV fields. Applies only to CSV format."), help_text=_("The character which delimits CSV fields. Applies only to CSV format."),
required=False required=False
) )
background_job = forms.BooleanField(
label=_('Background job'),
help_text=_("Enqueue a background job to complete the bulk import/update."),
required=False,
)
data_field = 'data' data_field = 'data'

46
netbox/utilities/jobs.py Normal file
View File

@ -0,0 +1,46 @@
from dataclasses import dataclass
from typing import List
from netbox.jobs import AsyncViewJob
from utilities.request import copy_safe_request
__all__ = (
'AsyncJobData',
'is_background_request',
'process_request_as_job',
)
@dataclass
class AsyncJobData:
log: List[str]
errors: List[str]
def is_background_request(request):
"""
Return True if the request is being processed as a background job.
"""
return getattr(request, '_background', False)
def process_request_as_job(view, request, name=None):
"""
Process a request using a view as a background job.
"""
# Check that the request that is not already being processed as a background job (would be a loop)
if is_background_request(request):
return
# Create a serializable copy of the original request
request_copy = copy_safe_request(request)
request_copy._background = True
# Enqueue a job to perform the work in the background
return AsyncViewJob.enqueue(
name=name,
user=request.user,
view_cls=view,
request=request_copy,
)

View File

@ -1,13 +1,17 @@
import warnings
from contextlib import ExitStack, contextmanager
from urllib.parse import urlparse from urllib.parse import urlparse
from django.utils.http import url_has_allowed_host_and_scheme from django.utils.http import url_has_allowed_host_and_scheme
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from netaddr import AddrFormatError, IPAddress from netaddr import AddrFormatError, IPAddress
from netbox.registry import registry
from .constants import HTTP_REQUEST_META_SAFE_COPY from .constants import HTTP_REQUEST_META_SAFE_COPY
__all__ = ( __all__ = (
'NetBoxFakeRequest', 'NetBoxFakeRequest',
'apply_request_processors',
'copy_safe_request', 'copy_safe_request',
'get_client_ip', 'get_client_ip',
'safe_for_redirect', 'safe_for_redirect',
@ -48,6 +52,7 @@ def copy_safe_request(request):
'GET': request.GET, 'GET': request.GET,
'FILES': request.FILES, 'FILES': request.FILES,
'user': request.user, 'user': request.user,
'method': request.method,
'path': request.path, 'path': request.path,
'id': getattr(request, 'id', None), # UUID assigned by middleware 'id': getattr(request, 'id', None), # UUID assigned by middleware
}) })
@ -87,3 +92,17 @@ def safe_for_redirect(url):
Returns True if the given URL is safe to use as an HTTP redirect; otherwise returns False. Returns True if the given URL is safe to use as an HTTP redirect; otherwise returns False.
""" """
return url_has_allowed_host_and_scheme(url, allowed_hosts=None) return url_has_allowed_host_and_scheme(url, allowed_hosts=None)
@contextmanager
def apply_request_processors(request):
"""
A context manager with applies all registered request processors (such as event_tracking).
"""
with ExitStack() as stack:
for request_processor in registry['request_processors']:
try:
stack.enter_context(request_processor(request))
except Exception as e:
warnings.warn(f'Failed to initialize request processor {request_processor.__name__}: {e}')
yield