Compare commits

...

10 Commits

Author SHA1 Message Date
Jeremy Stretch
12f9103158
Merge a3ee46a32a into 875a641687 2025-07-10 10:27:12 -05:00
Jeremy Stretch
875a641687
Closes #19589: Background job for bulk operations (#19804)
Some checks failed
CI / build (20.x, 3.10) (push) Has been cancelled
CI / build (20.x, 3.11) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
* Initial work on #19589

* Add tooling for handling background requests

* UI notification should link to enqueued job

* Use an informative name for the job

* Disable background jobs for file uploads
2025-07-10 09:32:35 -05:00
Jeremy Stretch
a3ee46a32a Update documentation
Some checks failed
CI / build (20.x, 3.10) (push) Has been cancelled
CI / build (20.x, 3.11) (push) Has been cancelled
CI / build (20.x, 3.12) (push) Has been cancelled
2025-07-09 10:36:18 -04:00
Jeremy Stretch
eb86b9d6b0 Refactor HTML templates 2025-07-09 10:25:44 -04:00
Jeremy Stretch
37fb8ae8ae Test logging 2025-07-09 10:19:44 -04:00
Jeremy Stretch
6b2643c9cd Misc cleanup 2025-07-09 10:09:31 -04:00
Jeremy Stretch
9946ff910c Repurpose RQJobStatusColumn to display job entry level badges 2025-07-09 10:00:49 -04:00
Jeremy Stretch
e95c3825de Deserialize JobLogEntry timestamp 2025-07-09 09:30:38 -04:00
Jeremy Stretch
92a13a4226 Use TZ-aware timestamps 2025-07-09 08:51:03 -04:00
Jeremy Stretch
ae3de95dce Initial work on #19816 2025-07-08 16:56:52 -04:00
25 changed files with 425 additions and 73 deletions

View File

@ -158,6 +158,7 @@ LOGGING = {
* `netbox.<app>.<model>` - Generic form for model-specific log messages * `netbox.<app>.<model>` - Generic form for model-specific log messages
* `netbox.auth.*` - Authentication events * `netbox.auth.*` - Authentication events
* `netbox.api.views.*` - Views which handle business logic for the REST API * `netbox.api.views.*` - Views which handle business logic for the REST API
* `netbox.jobs.*` - Background jobs
* `netbox.reports.*` - Report execution (`module.name`) * `netbox.reports.*` - Report execution (`module.name`)
* `netbox.scripts.*` - Custom script execution (`module.name`) * `netbox.scripts.*` - Custom script execution (`module.name`)
* `netbox.views.*` - Views which handle business logic for the web UI * `netbox.views.*` - Views which handle business logic for the web UI

View File

@ -38,6 +38,27 @@ You can schedule the background job from within your code (e.g. from a model's `
This is the human-friendly names of your background job. If omitted, the class name will be used. This is the human-friendly names of your background job. If omitted, the class name will be used.
### Logging
!!! info "This feature was introduced in NetBox v4.4."
A Python logger is instantiated by the runner for each job. It can be utilized within a job's `run()` method as needed:
```python
def run(self, *args, **kwargs):
obj = MyModel.objects.get(pk=kwargs.get('pk'))
self.logger.info("Retrieved object {obj}")
```
Four of the standard Python logging levels are supported:
* `debug()`
* `info()`
* `warning()`
* `error()`
Log entries recorded using the runner's logger will be saved in the job's log in the database in addition to being processed by other [system logging handlers](../../configuration/system.md#logging).
### Scheduled Jobs ### Scheduled Jobs
As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`. As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`.

View File

@ -23,6 +23,6 @@ class JobSerializer(BaseModelSerializer):
model = Job model = Job
fields = [ fields = [
'id', 'url', 'display_url', 'display', 'object_type', 'object_id', 'name', 'status', 'created', 'scheduled', 'id', 'url', 'display_url', 'display', 'object_type', 'object_id', 'name', 'status', 'created', 'scheduled',
'interval', 'started', 'completed', 'user', 'data', 'error', 'job_id', 'interval', 'started', 'completed', 'user', 'data', 'error', 'job_id', 'log_entries',
] ]
brief_fields = ('url', 'created', 'completed', 'user', 'status') brief_fields = ('url', 'created', 'completed', 'user', 'status')

View File

@ -4,23 +4,31 @@ from django.utils.translation import gettext_lazy as _
from rq.job import JobStatus from rq.job import JobStatus
__all__ = ( __all__ = (
'JOB_LOG_ENTRY_LEVELS',
'RQ_TASK_STATUSES', 'RQ_TASK_STATUSES',
) )
@dataclass @dataclass
class Status: class Badge:
label: str label: str
color: str color: str
RQ_TASK_STATUSES = { RQ_TASK_STATUSES = {
JobStatus.QUEUED: Status(_('Queued'), 'cyan'), JobStatus.QUEUED: Badge(_('Queued'), 'cyan'),
JobStatus.FINISHED: Status(_('Finished'), 'green'), JobStatus.FINISHED: Badge(_('Finished'), 'green'),
JobStatus.FAILED: Status(_('Failed'), 'red'), JobStatus.FAILED: Badge(_('Failed'), 'red'),
JobStatus.STARTED: Status(_('Started'), 'blue'), JobStatus.STARTED: Badge(_('Started'), 'blue'),
JobStatus.DEFERRED: Status(_('Deferred'), 'gray'), JobStatus.DEFERRED: Badge(_('Deferred'), 'gray'),
JobStatus.SCHEDULED: Status(_('Scheduled'), 'purple'), JobStatus.SCHEDULED: Badge(_('Scheduled'), 'purple'),
JobStatus.STOPPED: Status(_('Stopped'), 'orange'), JobStatus.STOPPED: Badge(_('Stopped'), 'orange'),
JobStatus.CANCELED: Status(_('Cancelled'), 'yellow'), JobStatus.CANCELED: Badge(_('Cancelled'), 'yellow'),
}
JOB_LOG_ENTRY_LEVELS = {
'debug': Badge(_('Debug'), 'gray'),
'info': Badge(_('Info'), 'blue'),
'warning': Badge(_('Warning'), 'orange'),
'error': Badge(_('Error'), 'red'),
} }

View File

@ -0,0 +1,21 @@
import logging
from dataclasses import dataclass, field
from datetime import datetime
from django.utils import timezone
__all__ = (
'JobLogEntry',
)
@dataclass
class JobLogEntry:
level: str
message: str
timestamp: datetime = field(default_factory=timezone.now)
@classmethod
def from_logrecord(cls, record: logging.LogRecord):
return cls(record.levelname.lower(), record.msg)

View File

@ -7,7 +7,6 @@ from netbox.jobs import JobRunner, system_job
from netbox.search.backends import search_backend from netbox.search.backends import search_backend
from utilities.proxy import resolve_proxies from utilities.proxy import resolve_proxies
from .choices import DataSourceStatusChoices, JobIntervalChoices from .choices import DataSourceStatusChoices, JobIntervalChoices
from .exceptions import SyncError
from .models import DataSource from .models import DataSource
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -23,19 +22,23 @@ class SyncDataSourceJob(JobRunner):
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
datasource = DataSource.objects.get(pk=self.job.object_id) datasource = DataSource.objects.get(pk=self.job.object_id)
self.logger.debug(f"Found DataSource ID {datasource.pk}")
try: try:
self.logger.info(f"Syncing data source {datasource}")
datasource.sync() datasource.sync()
# Update the search cache for DataFiles belonging to this source # Update the search cache for DataFiles belonging to this source
self.logger.debug("Updating search cache for data files")
search_backend.cache(datasource.datafiles.iterator()) search_backend.cache(datasource.datafiles.iterator())
except Exception as e: except Exception as e:
self.logger.error(f"Error syncing data source: {e}")
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e raise e
self.logger.info("Syncing completed successfully")
@system_job(interval=JobIntervalChoices.INTERVAL_DAILY) @system_job(interval=JobIntervalChoices.INTERVAL_DAILY)
class SystemHousekeepingJob(JobRunner): class SystemHousekeepingJob(JobRunner):

View File

@ -0,0 +1,28 @@
import django.contrib.postgres.fields
import django.core.serializers.json
from django.db import migrations, models
import utilities.json
class Migration(migrations.Migration):
dependencies = [
('core', '0015_remove_redundant_indexes'),
]
operations = [
migrations.AddField(
model_name='job',
name='log_entries',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.JSONField(
decoder=utilities.json.JobLogDecoder,
encoder=django.core.serializers.json.DjangoJSONEncoder
),
blank=True,
default=list,
size=None
),
),
]

View File

@ -1,9 +1,12 @@
import logging
import uuid import uuid
from dataclasses import asdict
from functools import partial from functools import partial
import django_rq import django_rq
from django.conf import settings from django.conf import settings
from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from django.core.validators import MinValueValidator from django.core.validators import MinValueValidator
@ -14,8 +17,10 @@ from django.utils.translation import gettext as _
from rq.exceptions import InvalidJobOperation from rq.exceptions import InvalidJobOperation
from core.choices import JobStatusChoices from core.choices import JobStatusChoices
from core.dataclasses import JobLogEntry
from core.models import ObjectType from core.models import ObjectType
from core.signals import job_end, job_start from core.signals import job_end, job_start
from utilities.json import JobLogDecoder
from utilities.querysets import RestrictedQuerySet from utilities.querysets import RestrictedQuerySet
from utilities.rqworker import get_queue_for_model from utilities.rqworker import get_queue_for_model
@ -104,6 +109,15 @@ class Job(models.Model):
verbose_name=_('job ID'), verbose_name=_('job ID'),
unique=True unique=True
) )
log_entries = ArrayField(
verbose_name=_('log entries'),
base_field=models.JSONField(
encoder=DjangoJSONEncoder,
decoder=JobLogDecoder,
),
blank=True,
default=list,
)
objects = RestrictedQuerySet.as_manager() objects = RestrictedQuerySet.as_manager()
@ -116,7 +130,7 @@ class Job(models.Model):
verbose_name_plural = _('jobs') verbose_name_plural = _('jobs')
def __str__(self): def __str__(self):
return str(self.job_id) return self.name
def get_absolute_url(self): def get_absolute_url(self):
# TODO: Employ dynamic registration # TODO: Employ dynamic registration
@ -205,6 +219,13 @@ class Job(models.Model):
# Send signal # Send signal
job_end.send(self) job_end.send(self)
def log(self, record: logging.LogRecord):
"""
Record a LogRecord from Python's native logging in the job's log.
"""
entry = JobLogEntry.from_logrecord(record)
self.log_entries.append(asdict(entry))
@classmethod @classmethod
def enqueue( def enqueue(
cls, cls,

View File

@ -1,12 +1,11 @@
import django_tables2 as tables import django_tables2 as tables
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
from core.constants import RQ_TASK_STATUSES
from netbox.registry import registry from netbox.registry import registry
__all__ = ( __all__ = (
'BackendTypeColumn', 'BackendTypeColumn',
'RQJobStatusColumn', 'BadgeColumn',
) )
@ -23,14 +22,21 @@ class BackendTypeColumn(tables.Column):
return value return value
class RQJobStatusColumn(tables.Column): class BadgeColumn(tables.Column):
""" """
Render a colored label for the status of an RQ job. Render a colored badge for a value.
Args:
badges: A dictionary mapping of values to core.constants.Badge instances.
""" """
def __init__(self, badges, *args, **kwargs):
super().__init__(*args, **kwargs)
self.badges = badges
def render(self, value): def render(self, value):
status = RQ_TASK_STATUSES.get(value) badge = self.badges.get(value)
return mark_safe(f'<span class="badge text-bg-{status.color}">{status.label}</span>') return mark_safe(f'<span class="badge text-bg-{badge.color}">{badge.label}</span>')
def value(self, value): def value(self, value):
status = RQ_TASK_STATUSES.get(value) badge = self.badges.get(value)
return status.label return badge.label

View File

@ -1,8 +1,10 @@
import django_tables2 as tables import django_tables2 as tables
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from netbox.tables import NetBoxTable, columns from netbox.tables import BaseTable, NetBoxTable, columns
from ..models import Job from core.constants import JOB_LOG_ENTRY_LEVELS
from core.models import Job
from core.tables.columns import BadgeColumn
class JobTable(NetBoxTable): class JobTable(NetBoxTable):
@ -40,6 +42,9 @@ class JobTable(NetBoxTable):
completed = columns.DateTimeColumn( completed = columns.DateTimeColumn(
verbose_name=_('Completed'), verbose_name=_('Completed'),
) )
log_entries = tables.Column(
verbose_name=_('Log Entries'),
)
actions = columns.ActionsColumn( actions = columns.ActionsColumn(
actions=('delete',) actions=('delete',)
) )
@ -53,3 +58,24 @@ class JobTable(NetBoxTable):
default_columns = ( default_columns = (
'pk', 'id', 'object_type', 'object', 'name', 'status', 'created', 'started', 'completed', 'user', 'pk', 'id', 'object_type', 'object', 'name', 'status', 'created', 'started', 'completed', 'user',
) )
def render_log_entries(self, value):
return len(value)
class JobLogEntryTable(BaseTable):
timestamp = columns.DateTimeColumn(
timespec='milliseconds',
verbose_name=_('Time'),
)
level = BadgeColumn(
badges=JOB_LOG_ENTRY_LEVELS,
verbose_name=_('Level'),
)
message = tables.Column(
verbose_name=_('Message'),
)
class Meta(BaseTable.Meta):
empty_text = _('No log entries')
fields = ('timestamp', 'level', 'message')

View File

@ -2,7 +2,8 @@ import django_tables2 as tables
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django_tables2.utils import A from django_tables2.utils import A
from core.tables.columns import RQJobStatusColumn from core.constants import RQ_TASK_STATUSES
from core.tables.columns import BadgeColumn
from netbox.tables import BaseTable, columns from netbox.tables import BaseTable, columns
@ -84,7 +85,8 @@ class BackgroundTaskTable(BaseTable):
ended_at = columns.DateTimeColumn( ended_at = columns.DateTimeColumn(
verbose_name=_("Ended") verbose_name=_("Ended")
) )
status = RQJobStatusColumn( status = BadgeColumn(
badges=RQ_TASK_STATUSES,
verbose_name=_("Status"), verbose_name=_("Status"),
accessor='get_status' accessor='get_status'
) )

View File

@ -32,13 +32,13 @@ from utilities.forms import ConfirmationForm
from utilities.htmx import htmx_partial from utilities.htmx import htmx_partial
from utilities.json import ConfigJSONEncoder from utilities.json import ConfigJSONEncoder
from utilities.query import count_related from utilities.query import count_related
from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, register_model_view from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, ViewTab, register_model_view
from . import filtersets, forms, tables from . import filtersets, forms, tables
from .choices import DataSourceStatusChoices from .choices import DataSourceStatusChoices
from .jobs import SyncDataSourceJob from .jobs import SyncDataSourceJob
from .models import * from .models import *
from .plugins import get_catalog_plugins, get_local_plugins from .plugins import get_catalog_plugins, get_local_plugins
from .tables import CatalogPluginTable, PluginVersionTable from .tables import CatalogPluginTable, JobLogEntryTable, PluginVersionTable
# #
@ -184,6 +184,25 @@ class JobView(generic.ObjectView):
actions = (DeleteObject,) actions = (DeleteObject,)
@register_model_view(Job, 'log')
class JobLogView(generic.ObjectView):
queryset = Job.objects.all()
actions = (DeleteObject,)
template_name = 'core/job/log.html'
tab = ViewTab(
label=_('Log'),
badge=lambda obj: len(obj.log_entries),
weight=500,
)
def get_extra_context(self, request, instance):
table = JobLogEntryTable(instance.log_entries)
table.configure(request)
return {
'table': table,
}
@register_model_view(Job, 'delete') @register_model_view(Job, 'delete')
class JobDeleteView(generic.ObjectDeleteView): class JobDeleteView(generic.ObjectDeleteView):
queryset = Job.objects.defer('data') queryset = Job.objects.defer('data')

View File

@ -90,7 +90,10 @@ class ScriptJob(JobRunner):
request: The WSGI request associated with this execution (if any) request: The WSGI request associated with this execution (if any)
commit: Passed through to Script.run() commit: Passed through to Script.run()
""" """
script = ScriptModel.objects.get(pk=self.job.object_id).python_class() script_model = ScriptModel.objects.get(pk=self.job.object_id)
self.logger.debug(f"Found ScriptModel ID {script_model.pk}")
script = script_model.python_class()
self.logger.debug(f"Loaded script {script.full_name}")
# Add files to form data # Add files to form data
if request: if request:
@ -100,6 +103,7 @@ class ScriptJob(JobRunner):
# Add the current request as a property of the script # Add the current request as a property of the script
script.request = request script.request = request
self.logger.debug(f"Request ID: {request.id}")
# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process # Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
# change logging, event rules, etc. # change logging, event rules, etc.

View File

@ -8,11 +8,15 @@ from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException from rq.timeouts import JobTimeoutException
from core.choices import JobStatusChoices from core.choices import JobStatusChoices
from core.events import JOB_COMPLETED, JOB_FAILED
from core.models import Job, ObjectType from core.models import Job, ObjectType
from extras.models import Notification
from netbox.constants import ADVISORY_LOCK_KEYS from netbox.constants import ADVISORY_LOCK_KEYS
from netbox.registry import registry from netbox.registry import registry
from utilities.request import apply_request_processors
__all__ = ( __all__ = (
'AsyncViewJob',
'JobRunner', 'JobRunner',
'system_job', 'system_job',
) )
@ -34,6 +38,19 @@ def system_job(interval):
return _wrapper return _wrapper
class JobLogHandler(logging.Handler):
"""
A logging handler which records entries on a Job.
"""
def __init__(self, job, *args, **kwargs):
super().__init__(*args, **kwargs)
self.job = job
def emit(self, record):
# Enter the record in the log of the associated Job
self.job.log(record)
class JobRunner(ABC): class JobRunner(ABC):
""" """
Background Job helper class. Background Job helper class.
@ -52,6 +69,11 @@ class JobRunner(ABC):
""" """
self.job = job self.job = job
# Initiate the system logger
self.logger = logging.getLogger(f"netbox.jobs.{self.__class__.__name__}")
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(JobLogHandler(job))
@classproperty @classproperty
def name(cls): def name(cls):
return getattr(cls.Meta, 'name', cls.__name__) return getattr(cls.Meta, 'name', cls.__name__)
@ -154,3 +176,35 @@ class JobRunner(ABC):
job.delete() job.delete()
return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs) return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs)
class AsyncViewJob(JobRunner):
"""
Execute a view as a background job.
"""
class Meta:
name = 'Async View'
def run(self, view_cls, request, **kwargs):
view = view_cls.as_view()
# Apply all registered request processors (e.g. event_tracking)
with apply_request_processors(request):
data = view(request)
self.job.data = {
'log': data.log,
'errors': data.errors,
}
# Notify the user
notification = Notification(
user=request.user,
object=self.job,
event_type=JOB_COMPLETED if not data.errors else JOB_FAILED,
)
notification.save()
# TODO: Waiting on fix for bug #19806
# if errors:
# raise JobFailed()

View File

@ -1,8 +1,5 @@
from contextlib import ExitStack
import logging import logging
import uuid import uuid
import warnings
from django.conf import settings from django.conf import settings
from django.contrib import auth, messages from django.contrib import auth, messages
@ -13,10 +10,10 @@ from django.db.utils import InternalError
from django.http import Http404, HttpResponseRedirect from django.http import Http404, HttpResponseRedirect
from netbox.config import clear_config, get_config from netbox.config import clear_config, get_config
from netbox.registry import registry
from netbox.views import handler_500 from netbox.views import handler_500
from utilities.api import is_api_request from utilities.api import is_api_request
from utilities.error_handlers import handle_rest_api_exception from utilities.error_handlers import handle_rest_api_exception
from utilities.request import apply_request_processors
__all__ = ( __all__ = (
'CoreMiddleware', 'CoreMiddleware',
@ -36,12 +33,7 @@ class CoreMiddleware:
request.id = uuid.uuid4() request.id = uuid.uuid4()
# Apply all registered request processors # Apply all registered request processors
with ExitStack() as stack: with apply_request_processors(request):
for request_processor in registry['request_processors']:
try:
stack.enter_context(request_processor(request))
except Exception as e:
warnings.warn(f'Failed to initialize request processor {request_processor}: {e}')
response = self.get_response(request) response = self.get_response(request)
# Check if language cookie should be renewed # Check if language cookie should be renewed

View File

@ -11,7 +11,10 @@ from core.choices import JobStatusChoices
class TestJobRunner(JobRunner): class TestJobRunner(JobRunner):
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
pass self.logger.debug("Debug message")
self.logger.info("Info message")
self.logger.warning("Warning message")
self.logger.error("Error message")
class JobRunnerTestCase(TestCase): class JobRunnerTestCase(TestCase):
@ -47,8 +50,16 @@ class JobRunnerTest(JobRunnerTestCase):
def test_handle(self): def test_handle(self):
job = TestJobRunner.enqueue(immediate=True) job = TestJobRunner.enqueue(immediate=True)
# Check job status
self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED) self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
# Check logging
self.assertEqual(len(job.log_entries), 4)
self.assertEqual(job.log_entries[0]['message'], "Debug message")
self.assertEqual(job.log_entries[1]['message'], "Info message")
self.assertEqual(job.log_entries[2]['message'], "Warning message")
self.assertEqual(job.log_entries[3]['message'], "Error message")
def test_handle_errored(self): def test_handle_errored(self):
class ErroredJobRunner(TestJobRunner): class ErroredJobRunner(TestJobRunner):
EXP = Exception('Test error') EXP = Exception('Test error')

View File

@ -28,6 +28,7 @@ from utilities.export import TableExport
from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields
from utilities.forms.bulk_import import BulkImportForm from utilities.forms.bulk_import import BulkImportForm
from utilities.htmx import htmx_partial from utilities.htmx import htmx_partial
from utilities.jobs import AsyncJobData, is_background_request, process_request_as_job
from utilities.permissions import get_permission_for_model from utilities.permissions import get_permission_for_model
from utilities.query import reapply_model_ordering from utilities.query import reapply_model_ordering
from utilities.request import safe_for_redirect from utilities.request import safe_for_redirect
@ -503,25 +504,32 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
if form.is_valid(): if form.is_valid():
logger.debug("Import form validation was successful") logger.debug("Import form validation was successful")
redirect_url = reverse(get_viewname(model, action='list'))
new_objects = []
# If indicated, defer this request to a background job & redirect the user
if form.cleaned_data['background_job']:
job_name = _('Bulk import {count} {object_type}').format(
count=len(form.cleaned_data['data']),
object_type=model._meta.verbose_name_plural,
)
if job := process_request_as_job(self.__class__, request, name=job_name):
msg = _('Created background job {job.pk}: <a href="{url}">{job.name}</a>').format(
url=job.get_absolute_url(),
job=job
)
messages.info(request, mark_safe(msg))
return redirect(redirect_url)
try: try:
# Iterate through data and bind each record to a new model form instance. # Iterate through data and bind each record to a new model form instance.
with transaction.atomic(using=router.db_for_write(model)): with transaction.atomic(using=router.db_for_write(model)):
new_objs = self.create_and_update_objects(form, request) new_objects = self.create_and_update_objects(form, request)
# Enforce object-level permissions # Enforce object-level permissions
if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs): if self.queryset.filter(pk__in=[obj.pk for obj in new_objects]).count() != len(new_objects):
raise PermissionsViolation raise PermissionsViolation
if new_objs:
msg = f"Imported {len(new_objs)} {model._meta.verbose_name_plural}"
logger.info(msg)
messages.success(request, msg)
view_name = get_viewname(model, action='list')
results_url = f"{reverse(view_name)}?modified_by_request={request.id}"
return redirect(results_url)
except (AbortTransaction, ValidationError): except (AbortTransaction, ValidationError):
clear_events.send(sender=self) clear_events.send(sender=self)
@ -530,6 +538,25 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
form.add_error(None, e.message) form.add_error(None, e.message)
clear_events.send(sender=self) clear_events.send(sender=self)
# If this request was executed via a background job, return the raw data for logging
if is_background_request(request):
return AsyncJobData(
log=[
_('Created {object}').format(object=str(obj))
for obj in new_objects
],
errors=form.errors
)
if new_objects:
msg = _("Imported {count} {object_type}").format(
count=len(new_objects),
object_type=model._meta.verbose_name_plural
)
logger.info(msg)
messages.success(request, msg)
return redirect(f"{redirect_url}?modified_by_request={request.id}")
else: else:
logger.debug("Form validation failed") logger.debug("Form validation failed")

View File

@ -1,27 +1,6 @@
{% extends 'generic/object.html' %} {% extends 'core/job/base.html' %}
{% load buttons %}
{% load helpers %}
{% load perms %}
{% load i18n %} {% load i18n %}
{% block breadcrumbs %}
{{ block.super }}
{% if object.object %}
<li class="breadcrumb-item">
<a href="{% url 'core:job_list' %}?object_type={{ object.object_type_id }}">{{ object.object|meta:"verbose_name_plural"|bettertitle }}</a>
</li>
{% with parent_jobs_viewname=object.object|viewname:"jobs" %}
<li class="breadcrumb-item">
<a href="{% url parent_jobs_viewname pk=object.object.pk %}">{{ object.object }}</a>
</li>
{% endwith %}
{% else %}
<li class="breadcrumb-item">
<a href="{% url 'core:job_list' %}?name={{ object.name|urlencode }}">{{ object.name }}</a>
</li>
{% endif %}
{% endblock breadcrumbs %}
{% block content %} {% block content %}
<div class="row mb-3"> <div class="row mb-3">
<div class="col col-12 col-md-6"> <div class="col col-12 col-md-6">

View File

@ -0,0 +1,23 @@
{% extends 'generic/object.html' %}
{% load buttons %}
{% load helpers %}
{% load perms %}
{% load i18n %}
{% block breadcrumbs %}
{{ block.super }}
{% if object.object %}
<li class="breadcrumb-item">
<a href="{% url 'core:job_list' %}?object_type={{ object.object_type_id }}">{{ object.object|meta:"verbose_name_plural"|bettertitle }}</a>
</li>
{% with parent_jobs_viewname=object.object|viewname:"jobs" %}
<li class="breadcrumb-item">
<a href="{% url parent_jobs_viewname pk=object.object.pk %}">{{ object.object }}</a>
</li>
{% endwith %}
{% else %}
<li class="breadcrumb-item">
<a href="{% url 'core:job_list' %}?name={{ object.name|urlencode }}">{{ object.name }}</a>
</li>
{% endif %}
{% endblock breadcrumbs %}

View File

@ -0,0 +1,12 @@
{% extends 'core/job/base.html' %}
{% load render_table from django_tables2 %}
{% block content %}
<div class="row mb-3">
<div class="col">
<div class="card">
{% render_table table %}
</div>
</div>
</div>
{% endblock %}

View File

@ -50,6 +50,7 @@ Context:
{% render_field form.data %} {% render_field form.data %}
{% render_field form.format %} {% render_field form.format %}
{% render_field form.csv_delimiter %} {% render_field form.csv_delimiter %}
{% render_field form.background_job %}
<div class="form-group"> <div class="form-group">
<div class="col col-md-12 text-end"> <div class="col col-md-12 text-end">
{% if return_url %} {% if return_url %}
@ -94,6 +95,7 @@ Context:
{% render_field form.data_file %} {% render_field form.data_file %}
{% render_field form.format %} {% render_field form.format %}
{% render_field form.csv_delimiter %} {% render_field form.csv_delimiter %}
{% render_field form.background_job %}
<div class="form-group"> <div class="form-group">
<div class="col col-md-12 text-end"> <div class="col col-md-12 text-end">
{% if return_url %} {% if return_url %}

View File

@ -37,6 +37,11 @@ class BulkImportForm(SyncedDataMixin, forms.Form):
help_text=_("The character which delimits CSV fields. Applies only to CSV format."), help_text=_("The character which delimits CSV fields. Applies only to CSV format."),
required=False required=False
) )
background_job = forms.BooleanField(
label=_('Background job'),
help_text=_("Enqueue a background job to complete the bulk import/update."),
required=False,
)
data_field = 'data' data_field = 'data'

46
netbox/utilities/jobs.py Normal file
View File

@ -0,0 +1,46 @@
from dataclasses import dataclass
from typing import List
from netbox.jobs import AsyncViewJob
from utilities.request import copy_safe_request
__all__ = (
'AsyncJobData',
'is_background_request',
'process_request_as_job',
)
@dataclass
class AsyncJobData:
log: List[str]
errors: List[str]
def is_background_request(request):
"""
Return True if the request is being processed as a background job.
"""
return getattr(request, '_background', False)
def process_request_as_job(view, request, name=None):
"""
Process a request using a view as a background job.
"""
# Check that the request that is not already being processed as a background job (would be a loop)
if is_background_request(request):
return
# Create a serializable copy of the original request
request_copy = copy_safe_request(request)
request_copy._background = True
# Enqueue a job to perform the work in the background
return AsyncViewJob.enqueue(
name=name,
user=request.user,
view_cls=view,
request=request_copy,
)

View File

@ -1,10 +1,14 @@
import decimal import decimal
import json
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from utilities.datetime import datetime_from_timestamp
__all__ = ( __all__ = (
'ConfigJSONEncoder', 'ConfigJSONEncoder',
'CustomFieldJSONEncoder', 'CustomFieldJSONEncoder',
'JobLogDecoder',
) )
@ -29,3 +33,21 @@ class ConfigJSONEncoder(DjangoJSONEncoder):
return type(o).__name__ return type(o).__name__
return super().default(o) return super().default(o)
class JobLogDecoder(json.JSONDecoder):
"""
Deserialize JobLogEntry timestamps.
"""
def __init__(self, *args, **kwargs):
kwargs['object_hook'] = self._deserialize_entry
super().__init__(*args, **kwargs)
def _deserialize_entry(self, obj: dict) -> dict:
if obj.get('timestamp'):
# Deserialize a timestamp string to a native datetime object
try:
obj['timestamp'] = datetime_from_timestamp(obj['timestamp'])
except ValueError:
pass
return obj

View File

@ -1,13 +1,17 @@
import warnings
from contextlib import ExitStack, contextmanager
from urllib.parse import urlparse from urllib.parse import urlparse
from django.utils.http import url_has_allowed_host_and_scheme from django.utils.http import url_has_allowed_host_and_scheme
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from netaddr import AddrFormatError, IPAddress from netaddr import AddrFormatError, IPAddress
from netbox.registry import registry
from .constants import HTTP_REQUEST_META_SAFE_COPY from .constants import HTTP_REQUEST_META_SAFE_COPY
__all__ = ( __all__ = (
'NetBoxFakeRequest', 'NetBoxFakeRequest',
'apply_request_processors',
'copy_safe_request', 'copy_safe_request',
'get_client_ip', 'get_client_ip',
'safe_for_redirect', 'safe_for_redirect',
@ -48,6 +52,7 @@ def copy_safe_request(request):
'GET': request.GET, 'GET': request.GET,
'FILES': request.FILES, 'FILES': request.FILES,
'user': request.user, 'user': request.user,
'method': request.method,
'path': request.path, 'path': request.path,
'id': getattr(request, 'id', None), # UUID assigned by middleware 'id': getattr(request, 'id', None), # UUID assigned by middleware
}) })
@ -87,3 +92,17 @@ def safe_for_redirect(url):
Returns True if the given URL is safe to use as an HTTP redirect; otherwise returns False. Returns True if the given URL is safe to use as an HTTP redirect; otherwise returns False.
""" """
return url_has_allowed_host_and_scheme(url, allowed_hosts=None) return url_has_allowed_host_and_scheme(url, allowed_hosts=None)
@contextmanager
def apply_request_processors(request):
"""
A context manager with applies all registered request processors (such as event_tracking).
"""
with ExitStack() as stack:
for request_processor in registry['request_processors']:
try:
stack.enter_context(request_processor(request))
except Exception as e:
warnings.warn(f'Failed to initialize request processor {request_processor.__name__}: {e}')
yield