14132 fix tests

This commit is contained in:
Arthur 2023-11-13 12:15:10 -08:00
parent a43cbcb1c1
commit 4956203a29
3 changed files with 317 additions and 5 deletions

View File

@ -47,3 +47,36 @@ class EventRuleTest(APITestCase):
Tag(name='Bar', slug='bar'), Tag(name='Bar', slug='bar'),
Tag(name='Baz', slug='baz'), Tag(name='Baz', slug='baz'),
)) ))
'''
def test_webhook_conditions(self):
# Create a conditional Webhook
webhook = Webhook(
name='Conditional Webhook',
type_create=True,
type_update=True,
payload_url='http://localhost:9000/',
conditions={
'and': [
{
'attr': 'status.value',
'value': 'active',
}
]
}
)
# Create a Site to evaluate
site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING)
data = serialize_for_event(site)
# Evaluate the conditions (status='staging')
self.assertFalse(eval_conditions(webhook, data))
# Change the site's status
site.status = SiteStatusChoices.STATUS_ACTIVE
data = serialize_for_event(site)
# Evaluate the conditions (status='active')
self.assertTrue(eval_conditions(webhook, data))
'''

View File

@ -11,10 +11,9 @@ from rest_framework import status
from dcim.choices import SiteStatusChoices from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
from extras.choices import ObjectChangeActionChoices from extras.choices import ObjectChangeActionChoices, EventRuleActionChoices
from extras.models import Tag, Webhook from extras.models import Tag, Webhook, EventRule
from extras.events import enqueue_object, flush_events, serialize_for_event from extras.events import enqueue_object, flush_events
from extras.events_worker import eval_conditions
from extras.webhooks import generate_signature from extras.webhooks import generate_signature
from extras.webhooks_worker import process_webhook from extras.webhooks_worker import process_webhook
from utilities.testing import APITestCase from utilities.testing import APITestCase
@ -42,8 +41,288 @@ class WebhookTest(APITestCase):
Webhook(name='Webhook 3', payload_url=DUMMY_URL, secret=DUMMY_SECRET), Webhook(name='Webhook 3', payload_url=DUMMY_URL, secret=DUMMY_SECRET),
)) ))
ct = ContentType.objects.get(app_label='extras', model='webhook')
event_rules = EventRule.objects.bulk_create((
EventRule(
name='Webhook Event 1',
type_create=True,
action_type=EventRuleActionChoices.WEBHOOK,
action_object_type=ct,
action_object_id=webhooks[0].id
),
EventRule(
name='Webhook Event 2',
type_update=True,
action_type=EventRuleActionChoices.WEBHOOK,
action_object_type=ct,
action_object_id=webhooks[0].id
),
EventRule(
name='Webhook Event 3',
type_delete=True,
action_type=EventRuleActionChoices.WEBHOOK,
action_object_type=ct,
action_object_id=webhooks[0].id
),
))
for event_rule in event_rules:
event_rule.content_types.set([site_ct])
Tag.objects.bulk_create(( Tag.objects.bulk_create((
Tag(name='Foo', slug='foo'), Tag(name='Foo', slug='foo'),
Tag(name='Bar', slug='bar'), Tag(name='Bar', slug='bar'),
Tag(name='Baz', slug='baz'), Tag(name='Baz', slug='baz'),
)) ))
def test_enqueue_webhook_create(self):
# Create an object via the REST API
data = {
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
}
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 1)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a job was queued for the object creation webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_create(self):
# Create multiple objects via the REST API
data = [
{
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 2',
'slug': 'site-2',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 3',
'slug': 'site-3',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 3)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a webhook was queued for each object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_update(self):
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update an object via the REST API
data = {
'name': 'Site X',
'comments': 'Updated the site',
'tags': [
{'name': 'Baz'}
]
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_bulk_update(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update three objects via the REST API
data = [
{
'id': sites[0].pk,
'name': 'Site X',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[1].pk,
'name': 'Site Y',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[2].pk,
'name': 'Site Z',
'tags': [
{'name': 'Baz'}
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_delete(self):
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete an object via the REST API
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_delete(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete three objects via the REST API
data = [
{'id': site.pk} for site in sites
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_webhooks_worker(self):
request_id = uuid.uuid4()
def dummy_send(_, request, **kwargs):
"""
A dummy implementation of Session.send() to be used for testing.
Always returns a 200 HTTP response.
"""
event = EventRule.objects.get(type_create=True)
webhook = event.action_object
signature = generate_signature(request.body, webhook.secret)
# Validate the outgoing request headers
self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
self.assertEqual(request.headers['X-Hook-Signature'], signature)
self.assertEqual(request.headers['X-Foo'], 'Bar')
# Validate the outgoing request body
body = json.loads(request.body)
self.assertEqual(body['event'], 'created')
self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
self.assertEqual(body['model'], 'site')
self.assertEqual(body['username'], 'testuser')
self.assertEqual(body['request_id'], str(request_id))
self.assertEqual(body['data']['name'], 'Site 1')
return HttpResponse()
# Enqueue a webhook for processing
webhooks_queue = []
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_object(
webhooks_queue,
instance=site,
user=self.user,
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
flush_events(webhooks_queue)
# Retrieve the job from queue
job = self.queue.jobs[0]
# Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send:
process_webhook(**job.kwargs)

View File

@ -17,7 +17,7 @@ def process_webhook(event_rule, model_name, event, data, timestamp, username, re
Make a POST request to the defined Webhook Make a POST request to the defined Webhook
""" """
webhook = event_rule.object webhook = event_rule.action_object
# Prepare context data for headers & body templates # Prepare context data for headers & body templates
context = { context = {