diff --git a/netbox/extras/context_managers.py b/netbox/extras/context_managers.py index 9f73fe9c3..d4aeb8364 100644 --- a/netbox/extras/context_managers.py +++ b/netbox/extras/context_managers.py @@ -3,8 +3,7 @@ from contextlib import contextmanager from django.db.models.signals import m2m_changed, pre_delete, post_save from extras.signals import clear_webhooks, clear_webhook_queue, handle_changed_object, handle_deleted_object -from netbox import thread_locals -from netbox.request_context import set_request +from netbox.context import current_request, webhooks_queue from .webhooks import flush_webhooks @@ -16,8 +15,8 @@ def change_logging(request): :param request: WSGIRequest object with a unique `id` set """ - set_request(request) - thread_locals.webhook_queue = [] + current_request.set(request) + webhooks_queue.set([]) # Connect our receivers to the post_save and post_delete signals. post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object') @@ -35,8 +34,8 @@ def change_logging(request): clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue') # Flush queued webhooks to RQ - flush_webhooks(thread_locals.webhook_queue) - del thread_locals.webhook_queue + flush_webhooks(webhooks_queue.get()) - # Clear the request from thread-local storage - set_request(None) + # Clear context vars + current_request.set(None) + webhooks_queue.set([]) diff --git a/netbox/extras/signals.py b/netbox/extras/signals.py index aff350cc4..8854d6314 100644 --- a/netbox/extras/signals.py +++ b/netbox/extras/signals.py @@ -7,9 +7,8 @@ from django.dispatch import receiver, Signal from django_prometheus.models import model_deletes, model_inserts, model_updates from extras.validators import CustomValidator -from netbox import thread_locals from netbox.config import get_config -from netbox.request_context import get_request +from netbox.context import current_request, webhooks_queue from netbox.signals import post_clean from .choices import ObjectChangeActionChoices from .models import ConfigRevision, CustomField, ObjectChange @@ -30,7 +29,7 @@ def handle_changed_object(sender, instance, **kwargs): if not hasattr(instance, 'to_objectchange'): return - request = get_request() + request = current_request.get() m2m_changed = False def is_same_object(instance, webhook_data): @@ -69,13 +68,14 @@ def handle_changed_object(sender, instance, **kwargs): objectchange.save() # If this is an M2M change, update the previously queued webhook (from post_save) - webhook_queue = thread_locals.webhook_queue - if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]): + queue = webhooks_queue.get() + if m2m_changed and queue and is_same_object(instance, queue[-1]): instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments - webhook_queue[-1]['data'] = serialize_for_webhook(instance) - webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] + queue[-1]['data'] = serialize_for_webhook(instance) + queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] else: - enqueue_object(webhook_queue, instance, request.user, request.id, action) + enqueue_object(queue, instance, request.user, request.id, action) + webhooks_queue.set(queue) # Increment metric counters if action == ObjectChangeActionChoices.ACTION_CREATE: @@ -91,7 +91,7 @@ def handle_deleted_object(sender, instance, **kwargs): if not hasattr(instance, 'to_objectchange'): return - request = get_request() + request = current_request.get() # Record an ObjectChange if applicable if hasattr(instance, 'to_objectchange'): @@ -101,8 +101,9 @@ def handle_deleted_object(sender, instance, **kwargs): objectchange.save() # Enqueue webhooks - webhook_queue = thread_locals.webhook_queue - enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) + queue = webhooks_queue.get() + enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) + webhooks_queue.set(queue) # Increment metric counters model_deletes.labels(instance._meta.model_name).inc() @@ -113,10 +114,8 @@ def clear_webhook_queue(sender, **kwargs): Delete any queued webhooks (e.g. because of an aborted bulk transaction) """ logger = logging.getLogger('webhooks') - webhook_queue = thread_locals.webhook_queue - - logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})") - webhook_queue.clear() + logger.info(f"Clearing {len(webhooks_queue.get())} queued webhooks ({sender})") + webhooks_queue.set([]) # diff --git a/netbox/netbox/__init__.py b/netbox/netbox/__init__.py index 5cf431025..e69de29bb 100644 --- a/netbox/netbox/__init__.py +++ b/netbox/netbox/__init__.py @@ -1,3 +0,0 @@ -import threading - -thread_locals = threading.local() diff --git a/netbox/netbox/context.py b/netbox/netbox/context.py new file mode 100644 index 000000000..02c6fccae --- /dev/null +++ b/netbox/netbox/context.py @@ -0,0 +1,10 @@ +from contextvars import ContextVar + +__all__ = ( + 'current_request', + 'webhooks_queue', +) + + +current_request = ContextVar('current_request') +webhooks_queue = ContextVar('webhooks_queue') diff --git a/netbox/netbox/request_context.py b/netbox/netbox/request_context.py deleted file mode 100644 index 41e8283e8..000000000 --- a/netbox/netbox/request_context.py +++ /dev/null @@ -1,9 +0,0 @@ -from netbox import thread_locals - - -def set_request(request): - thread_locals.request = request - - -def get_request(): - return getattr(thread_locals, 'request', None)