diff --git a/docs/release-notes/version-2.11.md b/docs/release-notes/version-2.11.md index 2d5ff3b35..fa472cf8f 100644 --- a/docs/release-notes/version-2.11.md +++ b/docs/release-notes/version-2.11.md @@ -11,6 +11,7 @@ ### Bug Fixes * [#6064](https://github.com/netbox-community/netbox/issues/6064) - Fix object permission assignments for user and group models +* [#6284](https://github.com/netbox-community/netbox/issues/6284) - Avoid sending redundant webhooks when adding/removing tags * [#6496](https://github.com/netbox-community/netbox/issues/6496) - Fix upgrade script when Python installed in nonstandard path * [#6502](https://github.com/netbox-community/netbox/issues/6502) - Correct permissions evaluation for running a report via the REST API diff --git a/netbox/extras/context_managers.py b/netbox/extras/context_managers.py index 4a33f28ef..25a49b325 100644 --- a/netbox/extras/context_managers.py +++ b/netbox/extras/context_managers.py @@ -4,6 +4,7 @@ from django.db.models.signals import m2m_changed, pre_delete, post_save from extras.signals import _handle_changed_object, _handle_deleted_object from utilities.utils import curry +from .webhooks import flush_webhooks @contextmanager @@ -14,9 +15,11 @@ def change_logging(request): :param request: WSGIRequest object with a unique `id` set """ + webhook_queue = [] + # Curry signals receivers to pass the current request - handle_changed_object = curry(_handle_changed_object, request) - handle_deleted_object = curry(_handle_deleted_object, request) + handle_changed_object = curry(_handle_changed_object, request, webhook_queue) + handle_deleted_object = curry(_handle_deleted_object, request, webhook_queue) # Connect our receivers to the post_save and post_delete signals. post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object') @@ -30,3 +33,7 @@ def change_logging(request): post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object') + + # Flush queued webhooks to RQ + flush_webhooks(webhook_queue) + del webhook_queue diff --git a/netbox/extras/signals.py b/netbox/extras/signals.py index 2191c4397..2fc292294 100644 --- a/netbox/extras/signals.py +++ b/netbox/extras/signals.py @@ -12,17 +12,27 @@ from prometheus_client import Counter from .choices import ObjectChangeActionChoices from .models import CustomField, ObjectChange -from .webhooks import enqueue_webhooks +from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook # # Change logging/webhooks # -def _handle_changed_object(request, sender, instance, **kwargs): +def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs): """ Fires when an object is created or updated. """ + def is_same_object(instance, webhook_data): + return ( + ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and + instance.pk == webhook_data['object_id'] and + request.id == webhook_data['request_id'] + ) + + if not hasattr(instance, 'to_objectchange'): + return + m2m_changed = False # Determine the type of change being made @@ -53,8 +63,13 @@ def _handle_changed_object(request, sender, instance, **kwargs): objectchange.request_id = request.id objectchange.save() - # Enqueue webhooks - enqueue_webhooks(instance, request.user, request.id, action) + # If this is an M2M change, update the previously queued webhook (from post_save) + if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]): + instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments + webhook_queue[-1]['data'] = serialize_for_webhook(instance) + webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] + else: + enqueue_object(webhook_queue, instance, request.user, request.id, action) # Increment metric counters if action == ObjectChangeActionChoices.ACTION_CREATE: @@ -68,10 +83,13 @@ def _handle_changed_object(request, sender, instance, **kwargs): ObjectChange.objects.filter(time__lt=cutoff)._raw_delete(using=DEFAULT_DB_ALIAS) -def _handle_deleted_object(request, sender, instance, **kwargs): +def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs): """ Fires when an object is deleted. """ + if not hasattr(instance, 'to_objectchange'): + return + # Record an ObjectChange if applicable if hasattr(instance, 'to_objectchange'): objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE) @@ -80,7 +98,7 @@ def _handle_deleted_object(request, sender, instance, **kwargs): objectchange.save() # Enqueue webhooks - enqueue_webhooks(instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) + enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) # Increment metric counters model_deletes.labels(instance._meta.model_name).inc() diff --git a/netbox/extras/tests/test_webhooks.py b/netbox/extras/tests/test_webhooks.py index eff9cdb97..57db6dd02 100644 --- a/netbox/extras/tests/test_webhooks.py +++ b/netbox/extras/tests/test_webhooks.py @@ -11,8 +11,8 @@ from rest_framework import status from dcim.models import Site from extras.choices import ObjectChangeActionChoices -from extras.models import Webhook -from extras.webhooks import enqueue_webhooks, generate_signature +from extras.models import Tag, Webhook +from extras.webhooks import enqueue_object, flush_webhooks, generate_signature from extras.webhooks_worker import process_webhook from utilities.testing import APITestCase @@ -20,11 +20,10 @@ from utilities.testing import APITestCase class WebhookTest(APITestCase): def setUp(self): - super().setUp() self.queue = django_rq.get_queue('default') - self.queue.empty() # Begin each test with an empty queue + self.queue.empty() @classmethod def setUpTestData(cls): @@ -34,38 +33,104 @@ class WebhookTest(APITestCase): DUMMY_SECRET = "LOOKATMEIMASECRETSTRING" webhooks = Webhook.objects.bulk_create(( - Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'), - Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), - Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), + Webhook(name='Webhook 1', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'), + Webhook(name='Webhook 2', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), + Webhook(name='Webhook 3', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), )) for webhook in webhooks: webhook.content_types.set([site_ct]) + Tag.objects.bulk_create(( + Tag(name='Foo', slug='foo'), + Tag(name='Bar', slug='bar'), + Tag(name='Baz', slug='baz'), + )) + def test_enqueue_webhook_create(self): # Create an object via the REST API data = { - 'name': 'Test Site', - 'slug': 'test-site', + 'name': 'Site 1', + 'slug': 'site-1', + 'tags': [ + {'name': 'Foo'}, + {'name': 'Bar'}, + ] } url = reverse('dcim-api:site-list') self.add_permissions('dcim.add_site') response = self.client.post(url, data, format='json', **self.header) self.assertHttpStatus(response, status.HTTP_201_CREATED) self.assertEqual(Site.objects.count(), 1) + self.assertEqual(Site.objects.first().tags.count(), 2) # Verify that a job was queued for the object creation webhook self.assertEqual(self.queue.count, 1) job = self.queue.jobs[0] self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True)) - self.assertEqual(job.kwargs['data']['id'], response.data['id']) - self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE) + self.assertEqual(job.kwargs['model_name'], 'site') + self.assertEqual(job.kwargs['data']['id'], response.data['id']) + self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags'])) + self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1') + self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo']) + + def test_enqueue_webhook_bulk_create(self): + # Create multiple objects via the REST API + data = [ + { + 'name': 'Site 1', + 'slug': 'site-1', + 'tags': [ + {'name': 'Foo'}, + {'name': 'Bar'}, + ] + }, + { + 'name': 'Site 2', + 'slug': 'site-2', + 'tags': [ + {'name': 'Foo'}, + {'name': 'Bar'}, + ] + }, + { + 'name': 'Site 3', + 'slug': 'site-3', + 'tags': [ + {'name': 'Foo'}, + {'name': 'Bar'}, + ] + }, + ] + url = reverse('dcim-api:site-list') + self.add_permissions('dcim.add_site') + response = self.client.post(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_201_CREATED) + self.assertEqual(Site.objects.count(), 3) + self.assertEqual(Site.objects.first().tags.count(), 2) + + # Verify that a webhook was queued for each object + self.assertEqual(self.queue.count, 3) + for i, job in enumerate(self.queue.jobs): + self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True)) + self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE) + self.assertEqual(job.kwargs['model_name'], 'site') + self.assertEqual(job.kwargs['data']['id'], response.data[i]['id']) + self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags'])) + self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name']) + self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo']) def test_enqueue_webhook_update(self): - # Update an object via the REST API site = Site.objects.create(name='Site 1', slug='site-1') + site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar'])) + + # Update an object via the REST API data = { + 'name': 'Site X', 'comments': 'Updated the site', + 'tags': [ + {'name': 'Baz'} + ] } url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk}) self.add_permissions('dcim.change_site') @@ -76,13 +141,72 @@ class WebhookTest(APITestCase): self.assertEqual(self.queue.count, 1) job = self.queue.jobs[0] self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True)) - self.assertEqual(job.kwargs['data']['id'], site.pk) - self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(job.kwargs['model_name'], 'site') + self.assertEqual(job.kwargs['data']['id'], site.pk) + self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags'])) + self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1') + self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo']) + self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X') + self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz']) + + def test_enqueue_webhook_bulk_update(self): + sites = ( + Site(name='Site 1', slug='site-1'), + Site(name='Site 2', slug='site-2'), + Site(name='Site 3', slug='site-3'), + ) + Site.objects.bulk_create(sites) + for site in sites: + site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar'])) + + # Update three objects via the REST API + data = [ + { + 'id': sites[0].pk, + 'name': 'Site X', + 'tags': [ + {'name': 'Baz'} + ] + }, + { + 'id': sites[1].pk, + 'name': 'Site Y', + 'tags': [ + {'name': 'Baz'} + ] + }, + { + 'id': sites[2].pk, + 'name': 'Site Z', + 'tags': [ + {'name': 'Baz'} + ] + }, + ] + url = reverse('dcim-api:site-list') + self.add_permissions('dcim.change_site') + response = self.client.patch(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + # Verify that a job was queued for the object update webhook + self.assertEqual(self.queue.count, 3) + for i, job in enumerate(self.queue.jobs): + self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True)) + self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(job.kwargs['model_name'], 'site') + self.assertEqual(job.kwargs['data']['id'], data[i]['id']) + self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags'])) + self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name) + self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo']) + self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name']) + self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz']) def test_enqueue_webhook_delete(self): - # Delete an object via the REST API site = Site.objects.create(name='Site 1', slug='site-1') + site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar'])) + + # Delete an object via the REST API url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk}) self.add_permissions('dcim.delete_site') response = self.client.delete(url, **self.header) @@ -92,9 +216,40 @@ class WebhookTest(APITestCase): self.assertEqual(self.queue.count, 1) job = self.queue.jobs[0] self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True)) - self.assertEqual(job.kwargs['data']['id'], site.pk) - self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE) + self.assertEqual(job.kwargs['model_name'], 'site') + self.assertEqual(job.kwargs['data']['id'], site.pk) + self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1') + self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo']) + + def test_enqueue_webhook_bulk_delete(self): + sites = ( + Site(name='Site 1', slug='site-1'), + Site(name='Site 2', slug='site-2'), + Site(name='Site 3', slug='site-3'), + ) + Site.objects.bulk_create(sites) + for site in sites: + site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar'])) + + # Delete three objects via the REST API + data = [ + {'id': site.pk} for site in sites + ] + url = reverse('dcim-api:site-list') + self.add_permissions('dcim.delete_site') + response = self.client.delete(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT) + + # Verify that a job was queued for the object update webhook + self.assertEqual(self.queue.count, 3) + for i, job in enumerate(self.queue.jobs): + self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True)) + self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE) + self.assertEqual(job.kwargs['model_name'], 'site') + self.assertEqual(job.kwargs['data']['id'], sites[i].pk) + self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name) + self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo']) def test_webhooks_worker(self): @@ -125,13 +280,16 @@ class WebhookTest(APITestCase): return HttpResponse() # Enqueue a webhook for processing + webhooks_queue = [] site = Site.objects.create(name='Site 1', slug='site-1') - enqueue_webhooks( + enqueue_object( + webhooks_queue, instance=site, user=self.user, request_id=request_id, action=ObjectChangeActionChoices.ACTION_CREATE ) + flush_webhooks(webhooks_queue) # Retrieve the job from queue job = self.queue.jobs[0] diff --git a/netbox/extras/webhooks.py b/netbox/extras/webhooks.py index bd645aca9..6c4598247 100644 --- a/netbox/extras/webhooks.py +++ b/netbox/extras/webhooks.py @@ -12,6 +12,26 @@ from .models import Webhook from .registry import registry +def serialize_for_webhook(instance): + """ + Return a serialized representation of the given instance suitable for use in a webhook. + """ + serializer_class = get_serializer_for_model(instance.__class__) + serializer_context = { + 'request': None, + } + serializer = serializer_class(instance, context=serializer_context) + + return serializer.data + + +def get_snapshots(instance, action): + return { + 'prechange': getattr(instance, '_prechange_snapshot', None), + 'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None, + } + + def generate_signature(request_body, secret): """ Return a cryptographic signature that can be used to verify the authenticity of webhook data. @@ -24,10 +44,10 @@ def generate_signature(request_body, secret): return hmac_prep.hexdigest() -def enqueue_webhooks(instance, user, request_id, action): +def enqueue_object(queue, instance, user, request_id, action): """ - Find Webhook(s) assigned to this instance + action and enqueue them - to be processed + Enqueue a serialized representation of a created/updated/deleted object for the processing of + webhooks once the request has completed. """ # Determine whether this type of object supports webhooks app_label = instance._meta.app_label @@ -35,41 +55,44 @@ def enqueue_webhooks(instance, user, request_id, action): if model_name not in registry['model_features']['webhooks'].get(app_label, []): return - # Retrieve any applicable Webhooks - content_type = ContentType.objects.get_for_model(instance) - action_flag = { - ObjectChangeActionChoices.ACTION_CREATE: 'type_create', - ObjectChangeActionChoices.ACTION_UPDATE: 'type_update', - ObjectChangeActionChoices.ACTION_DELETE: 'type_delete', - }[action] - webhooks = Webhook.objects.filter(content_types=content_type, enabled=True, **{action_flag: True}) + queue.append({ + 'content_type': ContentType.objects.get_for_model(instance), + 'object_id': instance.pk, + 'event': action, + 'data': serialize_for_webhook(instance), + 'snapshots': get_snapshots(instance, action), + 'username': user.username, + 'request_id': request_id + }) - if webhooks.exists(): - # Get the Model's API serializer class and serialize the object - serializer_class = get_serializer_for_model(instance.__class__) - serializer_context = { - 'request': None, - } - serializer = serializer_class(instance, context=serializer_context) +def flush_webhooks(queue): + """ + Flush a list of object representation to RQ for webhook processing. + """ + rq_queue = get_queue('default') - # Gather pre- and post-change snapshots - snapshots = { - 'prechange': getattr(instance, '_prechange_snapshot', None), - 'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None, - } + for data in queue: + + # Collect Webhooks that apply for this object and action + content_type = data['content_type'] + action_flag = { + ObjectChangeActionChoices.ACTION_CREATE: 'type_create', + ObjectChangeActionChoices.ACTION_UPDATE: 'type_update', + ObjectChangeActionChoices.ACTION_DELETE: 'type_delete', + }[data['event']] + # TODO: Cache these so we're not calling multiple times for bulk operations + webhooks = Webhook.objects.filter(content_types=content_type, enabled=True, **{action_flag: True}) - # Enqueue the webhooks - webhook_queue = get_queue('default') for webhook in webhooks: - webhook_queue.enqueue( + rq_queue.enqueue( "extras.webhooks_worker.process_webhook", webhook=webhook, - model_name=instance._meta.model_name, - event=action, - data=serializer.data, - snapshots=snapshots, + model_name=content_type.model, + event=data['event'], + data=data['data'], + snapshots=data['snapshots'], timestamp=str(timezone.now()), - username=user.username, - request_id=request_id + username=data['username'], + request_id=data['request_id'] )