Fixes #15194: Prevent enqueuing duplicate events for an object

This commit is contained in:
Jeremy Stretch 2024-05-31 09:35:18 -04:00
parent 602754439a
commit 24d02cb381
5 changed files with 57 additions and 40 deletions

View File

@ -13,13 +13,14 @@ def event_tracking(request):
:param request: WSGIRequest object with a unique `id` set :param request: WSGIRequest object with a unique `id` set
""" """
current_request.set(request) current_request.set(request)
events_queue.set([]) events_queue.set({})
yield yield
# Flush queued webhooks to RQ # Flush queued webhooks to RQ
flush_events(events_queue.get()) if events := list(events_queue.get().values()):
flush_events(events)
# Clear context vars # Clear context vars
current_request.set(None) current_request.set(None)
events_queue.set([]) events_queue.set({})

View File

@ -58,7 +58,13 @@ def enqueue_object(queue, instance, user, request_id, action):
if model_name not in registry['model_features']['event_rules'].get(app_label, []): if model_name not in registry['model_features']['event_rules'].get(app_label, []):
return return
queue.append({ assert instance.pk is not None
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['data'] = serialize_for_event(instance)
queue[key]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
queue[key] = {
'content_type': ContentType.objects.get_for_model(instance), 'content_type': ContentType.objects.get_for_model(instance),
'object_id': instance.pk, 'object_id': instance.pk,
'event': action, 'event': action,
@ -66,7 +72,7 @@ def enqueue_object(queue, instance, user, request_id, action):
'snapshots': get_snapshots(instance, action), 'snapshots': get_snapshots(instance, action),
'username': user.username, 'username': user.username,
'request_id': request_id 'request_id': request_id
}) }
def process_event_rules(event_rules, model_name, event, data, username=None, snapshots=None, request_id=None): def process_event_rules(event_rules, model_name, event, data, username=None, snapshots=None, request_id=None):
@ -163,14 +169,14 @@ def process_event_queue(events):
) )
def flush_events(queue): def flush_events(events):
""" """
Flush a list of object representation to RQ for webhook processing. Flush a list of object representations to RQ for event processing.
""" """
if queue: if events:
for name in settings.EVENTS_PIPELINE: for name in settings.EVENTS_PIPELINE:
try: try:
func = import_string(name) func = import_string(name)
func(queue) func(events)
except Exception as e: except Exception as e:
logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e)) logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))

View File

@ -55,18 +55,6 @@ def run_validators(instance, validators):
clear_events = Signal() clear_events = Signal()
def is_same_object(instance, webhook_data, request_id):
"""
Compare the given instance to the most recent queued webhook object, returning True
if they match. This check is used to avoid creating duplicate webhook entries.
"""
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request_id == webhook_data['request_id']
)
@receiver((post_save, m2m_changed)) @receiver((post_save, m2m_changed))
def handle_changed_object(sender, instance, **kwargs): def handle_changed_object(sender, instance, **kwargs):
""" """
@ -112,13 +100,12 @@ def handle_changed_object(sender, instance, **kwargs):
objectchange.request_id = request.id objectchange.request_id = request.id
objectchange.save() objectchange.save()
# If this is an M2M change, update the previously queued webhook (from post_save) # Ensure that we're working with fresh M2M assignments
if m2m_changed:
instance.refresh_from_db()
# Enqueue the object for event processing
queue = events_queue.get() queue = events_queue.get()
if m2m_changed and queue and is_same_object(instance, queue[-1], request.id):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
queue[-1]['data'] = serialize_for_event(instance)
queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(queue, instance, request.user, request.id, action) enqueue_object(queue, instance, request.user, request.id, action)
events_queue.set(queue) events_queue.set(queue)
@ -179,7 +166,7 @@ def handle_deleted_object(sender, instance, **kwargs):
obj.snapshot() # Ensure the change record includes the "before" state obj.snapshot() # Ensure the change record includes the "before" state
getattr(obj, related_field_name).remove(instance) getattr(obj, related_field_name).remove(instance)
# Enqueue webhooks # Enqueue the object for event processing
queue = events_queue.get() queue = events_queue.get()
enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
events_queue.set(queue) events_queue.set(queue)
@ -195,7 +182,7 @@ def clear_events_queue(sender, **kwargs):
""" """
logger = logging.getLogger('events') logger = logging.getLogger('events')
logger.info(f"Clearing {len(events_queue.get())} queued events ({sender})") logger.info(f"Clearing {len(events_queue.get())} queued events ({sender})")
events_queue.set([]) events_queue.set({})
# #

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
import django_rq import django_rq
from django.http import HttpResponse from django.http import HttpResponse
from django.test import RequestFactory
from django.urls import reverse from django.urls import reverse
from requests import Session from requests import Session
from rest_framework import status from rest_framework import status
@ -12,6 +13,7 @@ from core.models import ObjectType
from dcim.choices import SiteStatusChoices from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
from extras.choices import EventRuleActionChoices, ObjectChangeActionChoices from extras.choices import EventRuleActionChoices, ObjectChangeActionChoices
from extras.context_managers import event_tracking
from extras.events import enqueue_object, flush_events, serialize_for_event from extras.events import enqueue_object, flush_events, serialize_for_event
from extras.models import EventRule, Tag, Webhook from extras.models import EventRule, Tag, Webhook
from extras.webhooks import generate_signature, send_webhook from extras.webhooks import generate_signature, send_webhook
@ -360,7 +362,7 @@ class EventRuleTest(APITestCase):
return HttpResponse() return HttpResponse()
# Enqueue a webhook for processing # Enqueue a webhook for processing
webhooks_queue = [] webhooks_queue = {}
site = Site.objects.create(name='Site 1', slug='site-1') site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_object( enqueue_object(
webhooks_queue, webhooks_queue,
@ -369,7 +371,7 @@ class EventRuleTest(APITestCase):
request_id=request_id, request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE action=ObjectChangeActionChoices.ACTION_CREATE
) )
flush_events(webhooks_queue) flush_events(list(webhooks_queue.values()))
# Retrieve the job from queue # Retrieve the job from queue
job = self.queue.jobs[0] job = self.queue.jobs[0]
@ -377,3 +379,24 @@ class EventRuleTest(APITestCase):
# Patch the Session object with our dummy_send() method, then process the webhook for sending # Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send: with patch.object(Session, 'send', dummy_send) as mock_send:
send_webhook(**job.kwargs) send_webhook(**job.kwargs)
def test_duplicate_triggers(self):
"""
Test for erroneous duplicate event triggers resulting from saving an object multiple times
within the span of a single request.
"""
url = reverse('dcim:site_add')
request = RequestFactory().get(url)
request.id = uuid.uuid4()
request.user = self.user
self.assertEqual(self.queue.count, 0, msg="Unexpected jobs found in queue")
with event_tracking(request):
site = Site(name='Site 1', slug='site-1')
site.save()
# Save the site a second time
site.save()
self.assertEqual(self.queue.count, 1, msg="Duplicate jobs found in queue")

View File

@ -7,4 +7,4 @@ __all__ = (
current_request = ContextVar('current_request', default=None) current_request = ContextVar('current_request', default=None)
events_queue = ContextVar('events_queue', default=[]) events_queue = ContextVar('events_queue', default=dict())