14132 webhooks_queue -> events_queue

This commit is contained in:
Arthur 2023-10-27 14:55:16 -07:00
parent 3b5c682511
commit 4b3b88f23d
4 changed files with 16 additions and 16 deletions

View File

@ -1,6 +1,6 @@
from contextlib import contextmanager from contextlib import contextmanager
from netbox.context import current_request, webhooks_queue from netbox.context import current_request, events_queue
from .webhooks import flush_webhooks from .webhooks import flush_webhooks
@ -13,13 +13,13 @@ def event_wrapper(request):
:param request: WSGIRequest object with a unique `id` set :param request: WSGIRequest object with a unique `id` set
""" """
current_request.set(request) current_request.set(request)
webhooks_queue.set([]) events_queue.set([])
yield yield
# Flush queued webhooks to RQ # Flush queued webhooks to RQ
flush_webhooks(webhooks_queue.get()) flush_webhooks(events_queue.get())
# Clear context vars # Clear context vars
current_request.set(None) current_request.set(None)
webhooks_queue.set([]) events_queue.set([])

View File

@ -10,7 +10,7 @@ from django_prometheus.models import model_deletes, model_inserts, model_updates
from extras.validators import CustomValidator from extras.validators import CustomValidator
from netbox.config import get_config from netbox.config import get_config
from netbox.context import current_request, webhooks_queue from netbox.context import current_request, events_queue
from netbox.signals import post_clean from netbox.signals import post_clean
from utilities.exceptions import AbortRequest from utilities.exceptions import AbortRequest
from .choices import ObjectChangeActionChoices from .choices import ObjectChangeActionChoices
@ -81,14 +81,14 @@ def handle_changed_object(sender, instance, **kwargs):
objectchange.save() objectchange.save()
# If this is an M2M change, update the previously queued webhook (from post_save) # If this is an M2M change, update the previously queued webhook (from post_save)
queue = webhooks_queue.get() queue = events_queue.get()
if m2m_changed and queue and is_same_object(instance, queue[-1], request.id): if m2m_changed and queue and is_same_object(instance, queue[-1], request.id):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
queue[-1]['data'] = serialize_for_webhook(instance) queue[-1]['data'] = serialize_for_webhook(instance)
queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else: else:
enqueue_object(queue, instance, request.user, request.id, action) enqueue_object(queue, instance, request.user, request.id, action)
webhooks_queue.set(queue) events_queue.set(queue)
# Increment metric counters # Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE: if action == ObjectChangeActionChoices.ACTION_CREATE:
@ -117,9 +117,9 @@ def handle_deleted_object(sender, instance, **kwargs):
objectchange.save() objectchange.save()
# Enqueue webhooks # Enqueue webhooks
queue = webhooks_queue.get() queue = events_queue.get()
enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
webhooks_queue.set(queue) events_queue.set(queue)
# Increment metric counters # Increment metric counters
model_deletes.labels(instance._meta.model_name).inc() model_deletes.labels(instance._meta.model_name).inc()
@ -131,8 +131,8 @@ def clear_webhook_queue(sender, **kwargs):
Delete any queued webhooks (e.g. because of an aborted bulk transaction) Delete any queued webhooks (e.g. because of an aborted bulk transaction)
""" """
logger = logging.getLogger('webhooks') logger = logging.getLogger('webhooks')
logger.info(f"Clearing {len(webhooks_queue.get())} queued webhooks ({sender})") logger.info(f"Clearing {len(events_queue.get())} queued webhooks ({sender})")
webhooks_queue.set([]) events_queue.set([])
# #

View File

@ -313,16 +313,16 @@ class WebhookTest(APITestCase):
return HttpResponse() return HttpResponse()
# Enqueue a webhook for processing # Enqueue a webhook for processing
webhooks_queue = [] events_queue = []
site = Site.objects.create(name='Site 1', slug='site-1') site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_object( enqueue_object(
webhooks_queue, events_queue,
instance=site, instance=site,
user=self.user, user=self.user,
request_id=request_id, request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE action=ObjectChangeActionChoices.ACTION_CREATE
) )
flush_webhooks(webhooks_queue) flush_webhooks(events_queue)
# Retrieve the job from queue # Retrieve the job from queue
job = self.queue.jobs[0] job = self.queue.jobs[0]

View File

@ -2,9 +2,9 @@ from contextvars import ContextVar
__all__ = ( __all__ = (
'current_request', 'current_request',
'webhooks_queue', 'events_queue',
) )
current_request = ContextVar('current_request', default=None) current_request = ContextVar('current_request', default=None)
webhooks_queue = ContextVar('webhooks_queue', default=[]) events_queue = ContextVar('events_queue', default=[])