mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-22 20:12:00 -06:00
Merge pull request #7861 from netbox-community/7657-threadsafe-changelog
Fixes #7657: Make request/webhook caching thread-safe
This commit is contained in:
commit
c9b00891ed
@ -2,8 +2,9 @@ from contextlib import contextmanager
|
|||||||
|
|
||||||
from django.db.models.signals import m2m_changed, pre_delete, post_save
|
from django.db.models.signals import m2m_changed, pre_delete, post_save
|
||||||
|
|
||||||
from extras.signals import clear_webhooks, _clear_webhook_queue, _handle_changed_object, _handle_deleted_object
|
from extras.signals import clear_webhooks, clear_webhook_queue, handle_changed_object, handle_deleted_object
|
||||||
from utilities.utils import curry
|
from netbox import thread_locals
|
||||||
|
from netbox.request_context import set_request
|
||||||
from .webhooks import flush_webhooks
|
from .webhooks import flush_webhooks
|
||||||
|
|
||||||
|
|
||||||
@ -15,12 +16,8 @@ def change_logging(request):
|
|||||||
|
|
||||||
:param request: WSGIRequest object with a unique `id` set
|
:param request: WSGIRequest object with a unique `id` set
|
||||||
"""
|
"""
|
||||||
webhook_queue = []
|
set_request(request)
|
||||||
|
thread_locals.webhook_queue = []
|
||||||
# Curry signals receivers to pass the current request
|
|
||||||
handle_changed_object = curry(_handle_changed_object, request, webhook_queue)
|
|
||||||
handle_deleted_object = curry(_handle_deleted_object, request, webhook_queue)
|
|
||||||
clear_webhook_queue = curry(_clear_webhook_queue, webhook_queue)
|
|
||||||
|
|
||||||
# Connect our receivers to the post_save and post_delete signals.
|
# Connect our receivers to the post_save and post_delete signals.
|
||||||
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
|
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
|
||||||
@ -38,5 +35,8 @@ def change_logging(request):
|
|||||||
clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')
|
clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')
|
||||||
|
|
||||||
# Flush queued webhooks to RQ
|
# Flush queued webhooks to RQ
|
||||||
flush_webhooks(webhook_queue)
|
flush_webhooks(thread_locals.webhook_queue)
|
||||||
del webhook_queue
|
del thread_locals.webhook_queue
|
||||||
|
|
||||||
|
# Clear the request from thread-local storage
|
||||||
|
set_request(None)
|
||||||
|
@ -6,6 +6,8 @@ from django.db.models.signals import m2m_changed, post_save, pre_delete
|
|||||||
from django.dispatch import receiver, Signal
|
from django.dispatch import receiver, Signal
|
||||||
from django_prometheus.models import model_deletes, model_inserts, model_updates
|
from django_prometheus.models import model_deletes, model_inserts, model_updates
|
||||||
|
|
||||||
|
from netbox import thread_locals
|
||||||
|
from netbox.request_context import get_request
|
||||||
from netbox.signals import post_clean
|
from netbox.signals import post_clean
|
||||||
from .choices import ObjectChangeActionChoices
|
from .choices import ObjectChangeActionChoices
|
||||||
from .models import CustomField, ObjectChange
|
from .models import CustomField, ObjectChange
|
||||||
@ -20,10 +22,16 @@ from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
|
|||||||
clear_webhooks = Signal()
|
clear_webhooks = Signal()
|
||||||
|
|
||||||
|
|
||||||
def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
|
def handle_changed_object(sender, instance, **kwargs):
|
||||||
"""
|
"""
|
||||||
Fires when an object is created or updated.
|
Fires when an object is created or updated.
|
||||||
"""
|
"""
|
||||||
|
if not hasattr(instance, 'to_objectchange'):
|
||||||
|
return
|
||||||
|
|
||||||
|
request = get_request()
|
||||||
|
m2m_changed = False
|
||||||
|
|
||||||
def is_same_object(instance, webhook_data):
|
def is_same_object(instance, webhook_data):
|
||||||
return (
|
return (
|
||||||
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
|
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
|
||||||
@ -31,11 +39,6 @@ def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
|
|||||||
request.id == webhook_data['request_id']
|
request.id == webhook_data['request_id']
|
||||||
)
|
)
|
||||||
|
|
||||||
if not hasattr(instance, 'to_objectchange'):
|
|
||||||
return
|
|
||||||
|
|
||||||
m2m_changed = False
|
|
||||||
|
|
||||||
# Determine the type of change being made
|
# Determine the type of change being made
|
||||||
if kwargs.get('created'):
|
if kwargs.get('created'):
|
||||||
action = ObjectChangeActionChoices.ACTION_CREATE
|
action = ObjectChangeActionChoices.ACTION_CREATE
|
||||||
@ -65,6 +68,7 @@ def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
|
|||||||
objectchange.save()
|
objectchange.save()
|
||||||
|
|
||||||
# If this is an M2M change, update the previously queued webhook (from post_save)
|
# If this is an M2M change, update the previously queued webhook (from post_save)
|
||||||
|
webhook_queue = thread_locals.webhook_queue
|
||||||
if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
|
if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
|
||||||
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
|
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
|
||||||
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
|
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
|
||||||
@ -79,13 +83,15 @@ def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
|
|||||||
model_updates.labels(instance._meta.model_name).inc()
|
model_updates.labels(instance._meta.model_name).inc()
|
||||||
|
|
||||||
|
|
||||||
def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs):
|
def handle_deleted_object(sender, instance, **kwargs):
|
||||||
"""
|
"""
|
||||||
Fires when an object is deleted.
|
Fires when an object is deleted.
|
||||||
"""
|
"""
|
||||||
if not hasattr(instance, 'to_objectchange'):
|
if not hasattr(instance, 'to_objectchange'):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
request = get_request()
|
||||||
|
|
||||||
# Record an ObjectChange if applicable
|
# Record an ObjectChange if applicable
|
||||||
if hasattr(instance, 'to_objectchange'):
|
if hasattr(instance, 'to_objectchange'):
|
||||||
objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
|
objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
|
||||||
@ -94,19 +100,21 @@ def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs):
|
|||||||
objectchange.save()
|
objectchange.save()
|
||||||
|
|
||||||
# Enqueue webhooks
|
# Enqueue webhooks
|
||||||
|
webhook_queue = thread_locals.webhook_queue
|
||||||
enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
|
enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
|
||||||
|
|
||||||
# Increment metric counters
|
# Increment metric counters
|
||||||
model_deletes.labels(instance._meta.model_name).inc()
|
model_deletes.labels(instance._meta.model_name).inc()
|
||||||
|
|
||||||
|
|
||||||
def _clear_webhook_queue(webhook_queue, sender, **kwargs):
|
def clear_webhook_queue(sender, **kwargs):
|
||||||
"""
|
"""
|
||||||
Delete any queued webhooks (e.g. because of an aborted bulk transaction)
|
Delete any queued webhooks (e.g. because of an aborted bulk transaction)
|
||||||
"""
|
"""
|
||||||
logger = logging.getLogger('webhooks')
|
logger = logging.getLogger('webhooks')
|
||||||
logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})")
|
webhook_queue = thread_locals.webhook_queue
|
||||||
|
|
||||||
|
logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})")
|
||||||
webhook_queue.clear()
|
webhook_queue.clear()
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,3 @@
|
|||||||
|
import threading
|
||||||
|
|
||||||
|
thread_locals = threading.local()
|
@ -1,10 +1,10 @@
|
|||||||
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
from urllib import parse
|
from urllib import parse
|
||||||
import logging
|
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth.middleware import RemoteUserMiddleware as RemoteUserMiddleware_
|
|
||||||
from django.contrib import auth
|
from django.contrib import auth
|
||||||
|
from django.contrib.auth.middleware import RemoteUserMiddleware as RemoteUserMiddleware_
|
||||||
from django.core.exceptions import ImproperlyConfigured
|
from django.core.exceptions import ImproperlyConfigured
|
||||||
from django.db import ProgrammingError
|
from django.db import ProgrammingError
|
||||||
from django.http import Http404, HttpResponseRedirect
|
from django.http import Http404, HttpResponseRedirect
|
||||||
@ -114,7 +114,7 @@ class RemoteUserMiddleware(RemoteUserMiddleware_):
|
|||||||
return groups
|
return groups
|
||||||
|
|
||||||
|
|
||||||
class ObjectChangeMiddleware(object):
|
class ObjectChangeMiddleware:
|
||||||
"""
|
"""
|
||||||
This middleware performs three functions in response to an object being created, updated, or deleted:
|
This middleware performs three functions in response to an object being created, updated, or deleted:
|
||||||
|
|
||||||
|
9
netbox/netbox/request_context.py
Normal file
9
netbox/netbox/request_context.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
from netbox import thread_locals
|
||||||
|
|
||||||
|
|
||||||
|
def set_request(request):
|
||||||
|
thread_locals.request = request
|
||||||
|
|
||||||
|
|
||||||
|
def get_request():
|
||||||
|
return getattr(thread_locals, 'request', None)
|
@ -327,13 +327,6 @@ def decode_dict(encoded_dict: Dict, *, decode_keys: bool = True) -> Dict:
|
|||||||
return {urllib.parse.unquote(k): decode_value(v, decode_keys) for k, v in encoded_dict.items()}
|
return {urllib.parse.unquote(k): decode_value(v, decode_keys) for k, v in encoded_dict.items()}
|
||||||
|
|
||||||
|
|
||||||
# Taken from django.utils.functional (<3.0)
|
|
||||||
def curry(_curried_func, *args, **kwargs):
|
|
||||||
def _curried(*moreargs, **morekwargs):
|
|
||||||
return _curried_func(*args, *moreargs, **{**kwargs, **morekwargs})
|
|
||||||
return _curried
|
|
||||||
|
|
||||||
|
|
||||||
def array_to_string(array):
|
def array_to_string(array):
|
||||||
"""
|
"""
|
||||||
Generate an efficient, human-friendly string from a set of integers. Intended for use with ArrayField.
|
Generate an efficient, human-friendly string from a set of integers. Intended for use with ArrayField.
|
||||||
|
Loading…
Reference in New Issue
Block a user