From e74fb89a30dcc3803307a8dcac35cda9685be29f Mon Sep 17 00:00:00 2001 From: Arthur Date: Tue, 31 Oct 2023 15:59:27 -0700 Subject: [PATCH] 14132 base process_event --- netbox/core/models/jobs.py | 20 +++++++------- netbox/extras/context_managers.py | 2 +- netbox/extras/{webhooks.py => events.py} | 30 ++++++-------------- netbox/extras/signals.py | 4 +-- netbox/extras/tests/test_event_rules.py | 7 +++-- netbox/extras/tests/test_webhooks.py | 7 +++-- netbox/extras/webhooks_worker.py | 35 +----------------------- netbox/netbox/settings.py | 4 +++ 8 files changed, 35 insertions(+), 74 deletions(-) rename netbox/extras/{webhooks.py => events.py} (81%) diff --git a/netbox/core/models/jobs.py b/netbox/core/models/jobs.py index 61b0e64fa..08e894f59 100644 --- a/netbox/core/models/jobs.py +++ b/netbox/core/models/jobs.py @@ -155,8 +155,8 @@ class Job(models.Model): self.status = JobStatusChoices.STATUS_RUNNING self.save() - # Handle webhooks - self.trigger_webhooks(event=EVENT_JOB_START) + # Handle events + self.trigger_events(event=EVENT_JOB_START) def terminate(self, status=JobStatusChoices.STATUS_COMPLETED): """ @@ -171,8 +171,8 @@ class Job(models.Model): self.completed = timezone.now() self.save() - # Handle webhooks - self.trigger_webhooks(event=EVENT_JOB_END) + # Handle events + self.trigger_events(event=EVENT_JOB_END) @classmethod def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs): @@ -209,23 +209,23 @@ class Job(models.Model): return job - def trigger_webhooks(self, event): - from extras.models import Webhook + def trigger_events(self, event): + from extras.models import EventRule rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) rq_queue = django_rq.get_queue(rq_queue_name, is_async=False) # Fetch any webhooks matching this object type and action - webhooks = Webhook.objects.filter( + event_rules = EventRule.objects.filter( **{f'type_{event}': True}, content_types=self.object_type, enabled=True ) - for webhook in webhooks: + for event_rule in event_rules: rq_queue.enqueue( - "extras.webhooks_worker.process_webhook", - webhook=webhook, + "extras.events_worker.process_event", + event_rule=event_rule, model_name=self.object_type.model, event=event, data=self.data, diff --git a/netbox/extras/context_managers.py b/netbox/extras/context_managers.py index c3c78f98b..dc4d3dd17 100644 --- a/netbox/extras/context_managers.py +++ b/netbox/extras/context_managers.py @@ -1,7 +1,7 @@ from contextlib import contextmanager from netbox.context import current_request, events_queue -from .webhooks import flush_events +from .events import flush_events @contextmanager diff --git a/netbox/extras/webhooks.py b/netbox/extras/events.py similarity index 81% rename from netbox/extras/webhooks.py rename to netbox/extras/events.py index 2dc474e8a..98a81e5cb 100644 --- a/netbox/extras/webhooks.py +++ b/netbox/extras/events.py @@ -15,7 +15,7 @@ from .choices import * from .models import Webhook -def serialize_for_webhook(instance): +def serialize_for_event(instance): """ Return a serialized representation of the given instance suitable for use in a webhook. """ @@ -43,18 +43,6 @@ def get_snapshots(instance, action): return snapshots -def generate_signature(request_body, secret): - """ - Return a cryptographic signature that can be used to verify the authenticity of webhook data. - """ - hmac_prep = hmac.new( - key=secret.encode('utf8'), - msg=request_body, - digestmod=hashlib.sha512 - ) - return hmac_prep.hexdigest() - - def enqueue_object(queue, instance, user, request_id, action): """ Enqueue a serialized representation of a created/updated/deleted object for the processing of @@ -70,7 +58,7 @@ def enqueue_object(queue, instance, user, request_id, action): 'content_type': ContentType.objects.get_for_model(instance), 'object_id': instance.pk, 'event': action, - 'data': serialize_for_webhook(instance), + 'data': serialize_for_event(instance), 'snapshots': get_snapshots(instance, action), 'username': user.username, 'request_id': request_id @@ -83,7 +71,7 @@ def flush_events(queue): """ rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) rq_queue = get_queue(rq_queue_name) - webhooks_cache = { + events_cache = { 'type_create': {}, 'type_update': {}, 'type_delete': {}, @@ -99,18 +87,18 @@ def flush_events(queue): content_type = data['content_type'] # Cache applicable Webhooks - if content_type not in webhooks_cache[action_flag]: - webhooks_cache[action_flag][content_type] = Webhook.objects.filter( + if content_type not in events_cache[action_flag]: + events_cache[action_flag][content_type] = Webhook.objects.filter( **{action_flag: True}, content_types=content_type, enabled=True ) - webhooks = webhooks_cache[action_flag][content_type] + event_rules = events_cache[action_flag][content_type] - for webhook in webhooks: + for event_rule in event_rules: rq_queue.enqueue( - "extras.webhooks_worker.process_webhook", - webhook=webhook, + "extras.events_worker.process_event", + event_rule=event_rule, model_name=content_type.model, event=data['event'], data=data['data'], diff --git a/netbox/extras/signals.py b/netbox/extras/signals.py index 86d88b6c3..a0ea07456 100644 --- a/netbox/extras/signals.py +++ b/netbox/extras/signals.py @@ -15,7 +15,7 @@ from netbox.signals import post_clean from utilities.exceptions import AbortRequest from .choices import ObjectChangeActionChoices from .models import ConfigRevision, CustomField, ObjectChange, TaggedItem -from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook +from .events import enqueue_object, get_snapshots, serialize_for_event # # Change logging/webhooks @@ -84,7 +84,7 @@ def handle_changed_object(sender, instance, **kwargs): queue = events_queue.get() if m2m_changed and queue and is_same_object(instance, queue[-1], request.id): instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments - queue[-1]['data'] = serialize_for_webhook(instance) + queue[-1]['data'] = serialize_for_event(instance) queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] else: enqueue_object(queue, instance, request.user, request.id, action) diff --git a/netbox/extras/tests/test_event_rules.py b/netbox/extras/tests/test_event_rules.py index d8bfe4229..53f8afb7d 100644 --- a/netbox/extras/tests/test_event_rules.py +++ b/netbox/extras/tests/test_event_rules.py @@ -13,7 +13,8 @@ from dcim.choices import SiteStatusChoices from dcim.models import Site from extras.choices import ObjectChangeActionChoices from extras.models import Tag, EventRule -from extras.webhooks import enqueue_object, flush_events, generate_signature, serialize_for_webhook +from extras.events import enqueue_object, flush_events, serialize_for_event +from extras.webhooks import generate_signature from extras.webhooks_worker import eval_conditions, process_webhook from utilities.testing import APITestCase @@ -272,14 +273,14 @@ class EventRuleTest(APITestCase): # Create a Site to evaluate site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING) - data = serialize_for_webhook(site) + data = serialize_for_event(site) # Evaluate the conditions (status='staging') self.assertFalse(eval_conditions(webhook, data)) # Change the site's status site.status = SiteStatusChoices.STATUS_ACTIVE - data = serialize_for_webhook(site) + data = serialize_for_event(site) # Evaluate the conditions (status='active') self.assertTrue(eval_conditions(webhook, data)) diff --git a/netbox/extras/tests/test_webhooks.py b/netbox/extras/tests/test_webhooks.py index c30925081..e86814e4c 100644 --- a/netbox/extras/tests/test_webhooks.py +++ b/netbox/extras/tests/test_webhooks.py @@ -13,7 +13,8 @@ from dcim.choices import SiteStatusChoices from dcim.models import Site from extras.choices import ObjectChangeActionChoices from extras.models import Tag, Webhook -from extras.webhooks import enqueue_object, flush_events, generate_signature, serialize_for_webhook +from extras.events import enqueue_object, flush_events, serialize_for_event +from extras.webhooks import generate_signature from extras.webhooks_worker import eval_conditions, process_webhook from utilities.testing import APITestCase @@ -272,14 +273,14 @@ class WebhookTest(APITestCase): # Create a Site to evaluate site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING) - data = serialize_for_webhook(site) + data = serialize_for_event(site) # Evaluate the conditions (status='staging') self.assertFalse(eval_conditions(webhook, data)) # Change the site's status site.status = SiteStatusChoices.STATUS_ACTIVE - data = serialize_for_webhook(site) + data = serialize_for_event(site) # Evaluate the conditions (status='active') self.assertTrue(eval_conditions(webhook, data)) diff --git a/netbox/extras/webhooks_worker.py b/netbox/extras/webhooks_worker.py index 438231b7e..9f64fc405 100644 --- a/netbox/extras/webhooks_worker.py +++ b/netbox/extras/webhooks_worker.py @@ -12,43 +12,10 @@ from .webhooks import generate_signature logger = logging.getLogger('netbox.webhooks_worker') -def eval_conditions(webhook, data): - """ - Test whether the given data meets the conditions of the webhook (if any). Return True - if met or no conditions are specified. - """ - if not webhook.conditions: - return True - - logger.debug(f'Evaluating webhook conditions: {webhook.conditions}') - if ConditionSet(webhook.conditions).eval(data): - return True - - return False - - -@job('default') -def process_webhook(webhook, model_name, event, data, timestamp, username, request_id=None, snapshots=None): +def process_webhook(webhook, model_name, event, data, timestamp, username, request_id=None): """ Make a POST request to the defined Webhook """ - # Evaluate webhook conditions (if any) - if not eval_conditions(webhook, data): - return - - # Prepare context data for headers & body templates - context = { - 'event': WEBHOOK_EVENT_TYPES[event], - 'timestamp': timestamp, - 'model': model_name, - 'username': username, - 'request_id': request_id, - 'data': data, - } - if snapshots: - context.update({ - 'snapshots': snapshots - }) # Build the headers for the HTTP request headers = { diff --git a/netbox/netbox/settings.py b/netbox/netbox/settings.py index 36f56b0f6..72d22d84d 100644 --- a/netbox/netbox/settings.py +++ b/netbox/netbox/settings.py @@ -174,6 +174,10 @@ STORAGE_CONFIG = getattr(configuration, 'STORAGE_CONFIG', {}) TIME_FORMAT = getattr(configuration, 'TIME_FORMAT', 'g:i a') TIME_ZONE = getattr(configuration, 'TIME_ZONE', 'UTC') ENABLE_LOCALIZATION = getattr(configuration, 'ENABLE_LOCALIZATION', False) +NETBOX_EVENTS_PIPELINE = getattr(configuration, 'NETBOX_EVENTS_PIPELINE', ( + 'extras.events_worker.process_event_rules', +)) + # Check for hard-coded dynamic config parameters for param in PARAMS: