Merge pull request #6516 from netbox-community/6284-m2m-webhooks

Closes #6284: Fix redundant webhooks
This commit is contained in:
Jeremy Stretch 2021-06-01 13:09:21 -04:00 committed by GitHub
commit f561b2d955
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 265 additions and 58 deletions

View File

@ -11,6 +11,7 @@
### Bug Fixes ### Bug Fixes
* [#6064](https://github.com/netbox-community/netbox/issues/6064) - Fix object permission assignments for user and group models * [#6064](https://github.com/netbox-community/netbox/issues/6064) - Fix object permission assignments for user and group models
* [#6284](https://github.com/netbox-community/netbox/issues/6284) - Avoid sending redundant webhooks when adding/removing tags
* [#6496](https://github.com/netbox-community/netbox/issues/6496) - Fix upgrade script when Python installed in nonstandard path * [#6496](https://github.com/netbox-community/netbox/issues/6496) - Fix upgrade script when Python installed in nonstandard path
* [#6502](https://github.com/netbox-community/netbox/issues/6502) - Correct permissions evaluation for running a report via the REST API * [#6502](https://github.com/netbox-community/netbox/issues/6502) - Correct permissions evaluation for running a report via the REST API

View File

@ -4,6 +4,7 @@ from django.db.models.signals import m2m_changed, pre_delete, post_save
from extras.signals import _handle_changed_object, _handle_deleted_object from extras.signals import _handle_changed_object, _handle_deleted_object
from utilities.utils import curry from utilities.utils import curry
from .webhooks import flush_webhooks
@contextmanager @contextmanager
@ -14,9 +15,11 @@ def change_logging(request):
:param request: WSGIRequest object with a unique `id` set :param request: WSGIRequest object with a unique `id` set
""" """
webhook_queue = []
# Curry signals receivers to pass the current request # Curry signals receivers to pass the current request
handle_changed_object = curry(_handle_changed_object, request) handle_changed_object = curry(_handle_changed_object, request, webhook_queue)
handle_deleted_object = curry(_handle_deleted_object, request) handle_deleted_object = curry(_handle_deleted_object, request, webhook_queue)
# Connect our receivers to the post_save and post_delete signals. # Connect our receivers to the post_save and post_delete signals.
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object') post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
@ -30,3 +33,7 @@ def change_logging(request):
post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object') pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object')
# Flush queued webhooks to RQ
flush_webhooks(webhook_queue)
del webhook_queue

View File

@ -12,17 +12,27 @@ from prometheus_client import Counter
from .choices import ObjectChangeActionChoices from .choices import ObjectChangeActionChoices
from .models import CustomField, ObjectChange from .models import CustomField, ObjectChange
from .webhooks import enqueue_webhooks from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
# #
# Change logging/webhooks # Change logging/webhooks
# #
def _handle_changed_object(request, sender, instance, **kwargs): def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
""" """
Fires when an object is created or updated. Fires when an object is created or updated.
""" """
def is_same_object(instance, webhook_data):
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request.id == webhook_data['request_id']
)
if not hasattr(instance, 'to_objectchange'):
return
m2m_changed = False m2m_changed = False
# Determine the type of change being made # Determine the type of change being made
@ -53,8 +63,13 @@ def _handle_changed_object(request, sender, instance, **kwargs):
objectchange.request_id = request.id objectchange.request_id = request.id
objectchange.save() objectchange.save()
# Enqueue webhooks # If this is an M2M change, update the previously queued webhook (from post_save)
enqueue_webhooks(instance, request.user, request.id, action) if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(webhook_queue, instance, request.user, request.id, action)
# Increment metric counters # Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE: if action == ObjectChangeActionChoices.ACTION_CREATE:
@ -68,10 +83,13 @@ def _handle_changed_object(request, sender, instance, **kwargs):
ObjectChange.objects.filter(time__lt=cutoff)._raw_delete(using=DEFAULT_DB_ALIAS) ObjectChange.objects.filter(time__lt=cutoff)._raw_delete(using=DEFAULT_DB_ALIAS)
def _handle_deleted_object(request, sender, instance, **kwargs): def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs):
""" """
Fires when an object is deleted. Fires when an object is deleted.
""" """
if not hasattr(instance, 'to_objectchange'):
return
# Record an ObjectChange if applicable # Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'): if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE) objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
@ -80,7 +98,7 @@ def _handle_deleted_object(request, sender, instance, **kwargs):
objectchange.save() objectchange.save()
# Enqueue webhooks # Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
# Increment metric counters # Increment metric counters
model_deletes.labels(instance._meta.model_name).inc() model_deletes.labels(instance._meta.model_name).inc()

View File

@ -11,8 +11,8 @@ from rest_framework import status
from dcim.models import Site from dcim.models import Site
from extras.choices import ObjectChangeActionChoices from extras.choices import ObjectChangeActionChoices
from extras.models import Webhook from extras.models import Tag, Webhook
from extras.webhooks import enqueue_webhooks, generate_signature from extras.webhooks import enqueue_object, flush_webhooks, generate_signature
from extras.webhooks_worker import process_webhook from extras.webhooks_worker import process_webhook
from utilities.testing import APITestCase from utilities.testing import APITestCase
@ -20,11 +20,10 @@ from utilities.testing import APITestCase
class WebhookTest(APITestCase): class WebhookTest(APITestCase):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.queue = django_rq.get_queue('default') self.queue = django_rq.get_queue('default')
self.queue.empty() # Begin each test with an empty queue self.queue.empty()
@classmethod @classmethod
def setUpTestData(cls): def setUpTestData(cls):
@ -34,38 +33,104 @@ class WebhookTest(APITestCase):
DUMMY_SECRET = "LOOKATMEIMASECRETSTRING" DUMMY_SECRET = "LOOKATMEIMASECRETSTRING"
webhooks = Webhook.objects.bulk_create(( webhooks = Webhook.objects.bulk_create((
Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'), Webhook(name='Webhook 1', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), Webhook(name='Webhook 2', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), Webhook(name='Webhook 3', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
)) ))
for webhook in webhooks: for webhook in webhooks:
webhook.content_types.set([site_ct]) webhook.content_types.set([site_ct])
Tag.objects.bulk_create((
Tag(name='Foo', slug='foo'),
Tag(name='Bar', slug='bar'),
Tag(name='Baz', slug='baz'),
))
def test_enqueue_webhook_create(self): def test_enqueue_webhook_create(self):
# Create an object via the REST API # Create an object via the REST API
data = { data = {
'name': 'Test Site', 'name': 'Site 1',
'slug': 'test-site', 'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
} }
url = reverse('dcim-api:site-list') url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site') self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header) response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED) self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 1) self.assertEqual(Site.objects.count(), 1)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a job was queued for the object creation webhook # Verify that a job was queued for the object creation webhook
self.assertEqual(self.queue.count, 1) self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0] job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True)) self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE) self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_create(self):
# Create multiple objects via the REST API
data = [
{
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 2',
'slug': 'site-2',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 3',
'slug': 'site-3',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 3)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a webhook was queued for each object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_update(self): def test_enqueue_webhook_update(self):
# Update an object via the REST API
site = Site.objects.create(name='Site 1', slug='site-1') site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update an object via the REST API
data = { data = {
'name': 'Site X',
'comments': 'Updated the site', 'comments': 'Updated the site',
'tags': [
{'name': 'Baz'}
]
} }
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk}) url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site') self.add_permissions('dcim.change_site')
@ -76,13 +141,72 @@ class WebhookTest(APITestCase):
self.assertEqual(self.queue.count, 1) self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0] job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True)) self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE) self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_bulk_update(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update three objects via the REST API
data = [
{
'id': sites[0].pk,
'name': 'Site X',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[1].pk,
'name': 'Site Y',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[2].pk,
'name': 'Site Z',
'tags': [
{'name': 'Baz'}
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_delete(self): def test_enqueue_webhook_delete(self):
# Delete an object via the REST API
site = Site.objects.create(name='Site 1', slug='site-1') site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete an object via the REST API
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk}) url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.delete_site') self.add_permissions('dcim.delete_site')
response = self.client.delete(url, **self.header) response = self.client.delete(url, **self.header)
@ -92,9 +216,40 @@ class WebhookTest(APITestCase):
self.assertEqual(self.queue.count, 1) self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0] job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True)) self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE) self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_delete(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete three objects via the REST API
data = [
{'id': site.pk} for site in sites
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_webhooks_worker(self): def test_webhooks_worker(self):
@ -125,13 +280,16 @@ class WebhookTest(APITestCase):
return HttpResponse() return HttpResponse()
# Enqueue a webhook for processing # Enqueue a webhook for processing
webhooks_queue = []
site = Site.objects.create(name='Site 1', slug='site-1') site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_webhooks( enqueue_object(
webhooks_queue,
instance=site, instance=site,
user=self.user, user=self.user,
request_id=request_id, request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE action=ObjectChangeActionChoices.ACTION_CREATE
) )
flush_webhooks(webhooks_queue)
# Retrieve the job from queue # Retrieve the job from queue
job = self.queue.jobs[0] job = self.queue.jobs[0]

View File

@ -12,6 +12,26 @@ from .models import Webhook
from .registry import registry from .registry import registry
def serialize_for_webhook(instance):
"""
Return a serialized representation of the given instance suitable for use in a webhook.
"""
serializer_class = get_serializer_for_model(instance.__class__)
serializer_context = {
'request': None,
}
serializer = serializer_class(instance, context=serializer_context)
return serializer.data
def get_snapshots(instance, action):
return {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None,
}
def generate_signature(request_body, secret): def generate_signature(request_body, secret):
""" """
Return a cryptographic signature that can be used to verify the authenticity of webhook data. Return a cryptographic signature that can be used to verify the authenticity of webhook data.
@ -24,10 +44,10 @@ def generate_signature(request_body, secret):
return hmac_prep.hexdigest() return hmac_prep.hexdigest()
def enqueue_webhooks(instance, user, request_id, action): def enqueue_object(queue, instance, user, request_id, action):
""" """
Find Webhook(s) assigned to this instance + action and enqueue them Enqueue a serialized representation of a created/updated/deleted object for the processing of
to be processed webhooks once the request has completed.
""" """
# Determine whether this type of object supports webhooks # Determine whether this type of object supports webhooks
app_label = instance._meta.app_label app_label = instance._meta.app_label
@ -35,41 +55,44 @@ def enqueue_webhooks(instance, user, request_id, action):
if model_name not in registry['model_features']['webhooks'].get(app_label, []): if model_name not in registry['model_features']['webhooks'].get(app_label, []):
return return
# Retrieve any applicable Webhooks queue.append({
content_type = ContentType.objects.get_for_model(instance) 'content_type': ContentType.objects.get_for_model(instance),
action_flag = { 'object_id': instance.pk,
ObjectChangeActionChoices.ACTION_CREATE: 'type_create', 'event': action,
ObjectChangeActionChoices.ACTION_UPDATE: 'type_update', 'data': serialize_for_webhook(instance),
ObjectChangeActionChoices.ACTION_DELETE: 'type_delete', 'snapshots': get_snapshots(instance, action),
}[action] 'username': user.username,
webhooks = Webhook.objects.filter(content_types=content_type, enabled=True, **{action_flag: True}) 'request_id': request_id
})
if webhooks.exists():
# Get the Model's API serializer class and serialize the object def flush_webhooks(queue):
serializer_class = get_serializer_for_model(instance.__class__) """
serializer_context = { Flush a list of object representation to RQ for webhook processing.
'request': None, """
} rq_queue = get_queue('default')
serializer = serializer_class(instance, context=serializer_context)
# Gather pre- and post-change snapshots for data in queue:
snapshots = {
'prechange': getattr(instance, '_prechange_snapshot', None), # Collect Webhooks that apply for this object and action
'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None, content_type = data['content_type']
} action_flag = {
ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
ObjectChangeActionChoices.ACTION_DELETE: 'type_delete',
}[data['event']]
# TODO: Cache these so we're not calling multiple times for bulk operations
webhooks = Webhook.objects.filter(content_types=content_type, enabled=True, **{action_flag: True})
# Enqueue the webhooks
webhook_queue = get_queue('default')
for webhook in webhooks: for webhook in webhooks:
webhook_queue.enqueue( rq_queue.enqueue(
"extras.webhooks_worker.process_webhook", "extras.webhooks_worker.process_webhook",
webhook=webhook, webhook=webhook,
model_name=instance._meta.model_name, model_name=content_type.model,
event=action, event=data['event'],
data=serializer.data, data=data['data'],
snapshots=snapshots, snapshots=data['snapshots'],
timestamp=str(timezone.now()), timestamp=str(timezone.now()),
username=user.username, username=data['username'],
request_id=request_id request_id=data['request_id']
) )