14132 base process_event

This commit is contained in:
Arthur 2023-10-31 15:59:27 -07:00
parent 4ef7ab83d0
commit e74fb89a30
8 changed files with 35 additions and 74 deletions

View File

@ -155,8 +155,8 @@ class Job(models.Model):
self.status = JobStatusChoices.STATUS_RUNNING self.status = JobStatusChoices.STATUS_RUNNING
self.save() self.save()
# Handle webhooks # Handle events
self.trigger_webhooks(event=EVENT_JOB_START) self.trigger_events(event=EVENT_JOB_START)
def terminate(self, status=JobStatusChoices.STATUS_COMPLETED): def terminate(self, status=JobStatusChoices.STATUS_COMPLETED):
""" """
@ -171,8 +171,8 @@ class Job(models.Model):
self.completed = timezone.now() self.completed = timezone.now()
self.save() self.save()
# Handle webhooks # Handle events
self.trigger_webhooks(event=EVENT_JOB_END) self.trigger_events(event=EVENT_JOB_END)
@classmethod @classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs): def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
@ -209,23 +209,23 @@ class Job(models.Model):
return job return job
def trigger_webhooks(self, event): def trigger_events(self, event):
from extras.models import Webhook from extras.models import EventRule
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = django_rq.get_queue(rq_queue_name, is_async=False) rq_queue = django_rq.get_queue(rq_queue_name, is_async=False)
# Fetch any webhooks matching this object type and action # Fetch any webhooks matching this object type and action
webhooks = Webhook.objects.filter( event_rules = EventRule.objects.filter(
**{f'type_{event}': True}, **{f'type_{event}': True},
content_types=self.object_type, content_types=self.object_type,
enabled=True enabled=True
) )
for webhook in webhooks: for event_rule in event_rules:
rq_queue.enqueue( rq_queue.enqueue(
"extras.webhooks_worker.process_webhook", "extras.events_worker.process_event",
webhook=webhook, event_rule=event_rule,
model_name=self.object_type.model, model_name=self.object_type.model,
event=event, event=event,
data=self.data, data=self.data,

View File

@ -1,7 +1,7 @@
from contextlib import contextmanager from contextlib import contextmanager
from netbox.context import current_request, events_queue from netbox.context import current_request, events_queue
from .webhooks import flush_events from .events import flush_events
@contextmanager @contextmanager

View File

@ -15,7 +15,7 @@ from .choices import *
from .models import Webhook from .models import Webhook
def serialize_for_webhook(instance): def serialize_for_event(instance):
""" """
Return a serialized representation of the given instance suitable for use in a webhook. Return a serialized representation of the given instance suitable for use in a webhook.
""" """
@ -43,18 +43,6 @@ def get_snapshots(instance, action):
return snapshots return snapshots
def generate_signature(request_body, secret):
"""
Return a cryptographic signature that can be used to verify the authenticity of webhook data.
"""
hmac_prep = hmac.new(
key=secret.encode('utf8'),
msg=request_body,
digestmod=hashlib.sha512
)
return hmac_prep.hexdigest()
def enqueue_object(queue, instance, user, request_id, action): def enqueue_object(queue, instance, user, request_id, action):
""" """
Enqueue a serialized representation of a created/updated/deleted object for the processing of Enqueue a serialized representation of a created/updated/deleted object for the processing of
@ -70,7 +58,7 @@ def enqueue_object(queue, instance, user, request_id, action):
'content_type': ContentType.objects.get_for_model(instance), 'content_type': ContentType.objects.get_for_model(instance),
'object_id': instance.pk, 'object_id': instance.pk,
'event': action, 'event': action,
'data': serialize_for_webhook(instance), 'data': serialize_for_event(instance),
'snapshots': get_snapshots(instance, action), 'snapshots': get_snapshots(instance, action),
'username': user.username, 'username': user.username,
'request_id': request_id 'request_id': request_id
@ -83,7 +71,7 @@ def flush_events(queue):
""" """
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(rq_queue_name) rq_queue = get_queue(rq_queue_name)
webhooks_cache = { events_cache = {
'type_create': {}, 'type_create': {},
'type_update': {}, 'type_update': {},
'type_delete': {}, 'type_delete': {},
@ -99,18 +87,18 @@ def flush_events(queue):
content_type = data['content_type'] content_type = data['content_type']
# Cache applicable Webhooks # Cache applicable Webhooks
if content_type not in webhooks_cache[action_flag]: if content_type not in events_cache[action_flag]:
webhooks_cache[action_flag][content_type] = Webhook.objects.filter( events_cache[action_flag][content_type] = Webhook.objects.filter(
**{action_flag: True}, **{action_flag: True},
content_types=content_type, content_types=content_type,
enabled=True enabled=True
) )
webhooks = webhooks_cache[action_flag][content_type] event_rules = events_cache[action_flag][content_type]
for webhook in webhooks: for event_rule in event_rules:
rq_queue.enqueue( rq_queue.enqueue(
"extras.webhooks_worker.process_webhook", "extras.events_worker.process_event",
webhook=webhook, event_rule=event_rule,
model_name=content_type.model, model_name=content_type.model,
event=data['event'], event=data['event'],
data=data['data'], data=data['data'],

View File

@ -15,7 +15,7 @@ from netbox.signals import post_clean
from utilities.exceptions import AbortRequest from utilities.exceptions import AbortRequest
from .choices import ObjectChangeActionChoices from .choices import ObjectChangeActionChoices
from .models import ConfigRevision, CustomField, ObjectChange, TaggedItem from .models import ConfigRevision, CustomField, ObjectChange, TaggedItem
from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook from .events import enqueue_object, get_snapshots, serialize_for_event
# #
# Change logging/webhooks # Change logging/webhooks
@ -84,7 +84,7 @@ def handle_changed_object(sender, instance, **kwargs):
queue = events_queue.get() queue = events_queue.get()
if m2m_changed and queue and is_same_object(instance, queue[-1], request.id): if m2m_changed and queue and is_same_object(instance, queue[-1], request.id):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
queue[-1]['data'] = serialize_for_webhook(instance) queue[-1]['data'] = serialize_for_event(instance)
queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange'] queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else: else:
enqueue_object(queue, instance, request.user, request.id, action) enqueue_object(queue, instance, request.user, request.id, action)

View File

@ -13,7 +13,8 @@ from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
from extras.choices import ObjectChangeActionChoices from extras.choices import ObjectChangeActionChoices
from extras.models import Tag, EventRule from extras.models import Tag, EventRule
from extras.webhooks import enqueue_object, flush_events, generate_signature, serialize_for_webhook from extras.events import enqueue_object, flush_events, serialize_for_event
from extras.webhooks import generate_signature
from extras.webhooks_worker import eval_conditions, process_webhook from extras.webhooks_worker import eval_conditions, process_webhook
from utilities.testing import APITestCase from utilities.testing import APITestCase
@ -272,14 +273,14 @@ class EventRuleTest(APITestCase):
# Create a Site to evaluate # Create a Site to evaluate
site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING) site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING)
data = serialize_for_webhook(site) data = serialize_for_event(site)
# Evaluate the conditions (status='staging') # Evaluate the conditions (status='staging')
self.assertFalse(eval_conditions(webhook, data)) self.assertFalse(eval_conditions(webhook, data))
# Change the site's status # Change the site's status
site.status = SiteStatusChoices.STATUS_ACTIVE site.status = SiteStatusChoices.STATUS_ACTIVE
data = serialize_for_webhook(site) data = serialize_for_event(site)
# Evaluate the conditions (status='active') # Evaluate the conditions (status='active')
self.assertTrue(eval_conditions(webhook, data)) self.assertTrue(eval_conditions(webhook, data))

View File

@ -13,7 +13,8 @@ from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
from extras.choices import ObjectChangeActionChoices from extras.choices import ObjectChangeActionChoices
from extras.models import Tag, Webhook from extras.models import Tag, Webhook
from extras.webhooks import enqueue_object, flush_events, generate_signature, serialize_for_webhook from extras.events import enqueue_object, flush_events, serialize_for_event
from extras.webhooks import generate_signature
from extras.webhooks_worker import eval_conditions, process_webhook from extras.webhooks_worker import eval_conditions, process_webhook
from utilities.testing import APITestCase from utilities.testing import APITestCase
@ -272,14 +273,14 @@ class WebhookTest(APITestCase):
# Create a Site to evaluate # Create a Site to evaluate
site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING) site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING)
data = serialize_for_webhook(site) data = serialize_for_event(site)
# Evaluate the conditions (status='staging') # Evaluate the conditions (status='staging')
self.assertFalse(eval_conditions(webhook, data)) self.assertFalse(eval_conditions(webhook, data))
# Change the site's status # Change the site's status
site.status = SiteStatusChoices.STATUS_ACTIVE site.status = SiteStatusChoices.STATUS_ACTIVE
data = serialize_for_webhook(site) data = serialize_for_event(site)
# Evaluate the conditions (status='active') # Evaluate the conditions (status='active')
self.assertTrue(eval_conditions(webhook, data)) self.assertTrue(eval_conditions(webhook, data))

View File

@ -12,43 +12,10 @@ from .webhooks import generate_signature
logger = logging.getLogger('netbox.webhooks_worker') logger = logging.getLogger('netbox.webhooks_worker')
def eval_conditions(webhook, data): def process_webhook(webhook, model_name, event, data, timestamp, username, request_id=None):
"""
Test whether the given data meets the conditions of the webhook (if any). Return True
if met or no conditions are specified.
"""
if not webhook.conditions:
return True
logger.debug(f'Evaluating webhook conditions: {webhook.conditions}')
if ConditionSet(webhook.conditions).eval(data):
return True
return False
@job('default')
def process_webhook(webhook, model_name, event, data, timestamp, username, request_id=None, snapshots=None):
""" """
Make a POST request to the defined Webhook Make a POST request to the defined Webhook
""" """
# Evaluate webhook conditions (if any)
if not eval_conditions(webhook, data):
return
# Prepare context data for headers & body templates
context = {
'event': WEBHOOK_EVENT_TYPES[event],
'timestamp': timestamp,
'model': model_name,
'username': username,
'request_id': request_id,
'data': data,
}
if snapshots:
context.update({
'snapshots': snapshots
})
# Build the headers for the HTTP request # Build the headers for the HTTP request
headers = { headers = {

View File

@ -174,6 +174,10 @@ STORAGE_CONFIG = getattr(configuration, 'STORAGE_CONFIG', {})
TIME_FORMAT = getattr(configuration, 'TIME_FORMAT', 'g:i a') TIME_FORMAT = getattr(configuration, 'TIME_FORMAT', 'g:i a')
TIME_ZONE = getattr(configuration, 'TIME_ZONE', 'UTC') TIME_ZONE = getattr(configuration, 'TIME_ZONE', 'UTC')
ENABLE_LOCALIZATION = getattr(configuration, 'ENABLE_LOCALIZATION', False) ENABLE_LOCALIZATION = getattr(configuration, 'ENABLE_LOCALIZATION', False)
NETBOX_EVENTS_PIPELINE = getattr(configuration, 'NETBOX_EVENTS_PIPELINE', (
'extras.events_worker.process_event_rules',
))
# Check for hard-coded dynamic config parameters # Check for hard-coded dynamic config parameters
for param in PARAMS: for param in PARAMS: