Standardize queued event attribute to event_type; change content_type to object_type

This commit is contained in:
Jeremy Stretch 2024-07-08 14:26:44 -04:00
parent 1dd5d0a0f9
commit 744d3d8197
5 changed files with 74 additions and 59 deletions

View File

@ -8,6 +8,7 @@ CUSTOMFIELD_EMPTY_VALUES = (None, '', [])
HTTP_CONTENT_TYPE_JSON = 'application/json'
WEBHOOK_EVENT_TYPES = {
# Map registered event types to public webhook "event" equivalents
OBJECT_CREATED: 'created',
OBJECT_UPDATED: 'updated',
OBJECT_DELETED: 'deleted',

View File

@ -8,7 +8,6 @@ from django.utils.module_loading import import_string
from django.utils.translation import gettext as _
from django_rq import get_queue
from core.choices import ObjectChangeActionChoices
from core.events import *
from core.models import Job
from netbox.config import get_config
@ -36,12 +35,12 @@ def serialize_for_event(instance):
return serializer.data
def get_snapshots(instance, action):
def get_snapshots(instance, event_type):
snapshots = {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': None,
}
if action != ObjectChangeActionChoices.ACTION_DELETE:
if event_type != OBJECT_DELETED:
# Use model's serialize_object() method if defined; fall back to serialize_object() utility function
if hasattr(instance, 'serialize_object'):
snapshots['postchange'] = instance.serialize_object()
@ -51,7 +50,7 @@ def get_snapshots(instance, action):
return snapshots
def enqueue_object(queue, instance, user, request_id, action):
def enqueue_event(queue, instance, user, request_id, event_type):
"""
Enqueue a serialized representation of a created/updated/deleted object for the processing of
events once the request has completed.
@ -66,24 +65,21 @@ def enqueue_object(queue, instance, user, request_id, action):
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['data'] = serialize_for_event(instance)
queue[key]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
else:
queue[key] = {
'content_type': ContentType.objects.get_for_model(instance),
'object_type': ContentType.objects.get_for_model(instance),
'object_id': instance.pk,
'event': action,
'event_type': event_type,
'data': serialize_for_event(instance),
'snapshots': get_snapshots(instance, action),
'snapshots': get_snapshots(instance, event_type),
'username': user.username,
'request_id': request_id
}
def process_event_rules(event_rules, object_type, event, data, username=None, snapshots=None, request_id=None):
if username:
user = get_user_model().objects.get(username=username)
else:
user = None
def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request_id=None):
user = get_user_model().objects.get(username=username) if username else None
for event_rule in event_rules:
@ -102,7 +98,7 @@ def process_event_rules(event_rules, object_type, event, data, username=None, sn
params = {
"event_rule": event_rule,
"model_name": object_type.model,
"event": event,
"event_type": event_type,
"data": data,
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
@ -140,7 +136,7 @@ def process_event_rules(event_rules, object_type, event, data, username=None, sn
event_rule.action_object.notify(
object_type=object_type,
object_id=data['id'],
event_name=event
event_name=event_type
)
else:
@ -158,33 +154,39 @@ def process_event_queue(events):
'type_update': {},
'type_delete': {},
}
event_actions = {
# TODO: Add EventRule support for dynamically registered event types
OBJECT_CREATED: 'type_create',
OBJECT_UPDATED: 'type_update',
OBJECT_DELETED: 'type_delete',
JOB_STARTED: 'type_job_start',
JOB_COMPLETED: 'type_job_end',
# Map failed & errored jobs to type_job_end
JOB_FAILED: 'type_job_end',
JOB_ERRORED: 'type_job_end',
}
for data in events:
action_flag = {
# TODO: Add EventRule support for dynamically registered event types
OBJECT_CREATED: 'type_create',
OBJECT_UPDATED: 'type_update',
OBJECT_DELETED: 'type_delete',
JOB_STARTED: 'type_job_start',
JOB_COMPLETED: 'type_job_end',
# Map failed & errored jobs to type_job_end
JOB_FAILED: 'type_job_end',
JOB_ERRORED: 'type_job_end',
}[data['event']]
content_type = data['content_type']
for event in events:
action_flag = event_actions[event['event_type']]
object_type = event['object_type']
# Cache applicable Event Rules
if content_type not in events_cache[action_flag]:
events_cache[action_flag][content_type] = EventRule.objects.filter(
if object_type not in events_cache[action_flag]:
events_cache[action_flag][object_type] = EventRule.objects.filter(
**{action_flag: True},
object_types=content_type,
object_types=object_type,
enabled=True
)
event_rules = events_cache[action_flag][content_type]
event_rules = events_cache[action_flag][object_type]
process_event_rules(
event_rules, content_type, data['event'], data['data'], data['username'],
snapshots=data['snapshots'], request_id=data['request_id']
event_rules=event_rules,
object_type=object_type,
event_type=event['event_type'],
data=event['data'],
username=event['username'],
snapshots=event['snapshots'],
request_id=event['request_id']
)

View File

@ -21,7 +21,7 @@ from netbox.models.features import ChangeLoggingMixin
from netbox.registry import registry
from netbox.signals import post_clean
from utilities.exceptions import AbortRequest
from .events import enqueue_object
from .events import enqueue_event
from .models import CustomField, TaggedItem
from .validators import CustomValidator
@ -73,23 +73,23 @@ def handle_changed_object(sender, instance, **kwargs):
# Determine the type of change being made
if kwargs.get('created'):
action = OBJECT_CREATED
event_type = OBJECT_CREATED
elif 'created' in kwargs:
action = OBJECT_UPDATED
event_type = OBJECT_UPDATED
elif kwargs.get('action') in ['post_add', 'post_remove'] and kwargs['pk_set']:
# m2m_changed with objects added or removed
m2m_changed = True
action = OBJECT_UPDATED
event_type = OBJECT_UPDATED
else:
return
# Create/update an ObjectChange record for this change
change_event = {
action = {
OBJECT_CREATED: ObjectChangeActionChoices.ACTION_CREATE,
OBJECT_UPDATED: ObjectChangeActionChoices.ACTION_UPDATE,
OBJECT_DELETED: ObjectChangeActionChoices.ACTION_DELETE,
}[action]
objectchange = instance.to_objectchange(change_event)
}[event_type]
objectchange = instance.to_objectchange(action)
# If this is a many-to-many field change, check for a previous ObjectChange instance recorded
# for this object by this request and update it
if m2m_changed and (
@ -112,13 +112,13 @@ def handle_changed_object(sender, instance, **kwargs):
# Enqueue the object for event processing
queue = events_queue.get()
enqueue_object(queue, instance, request.user, request.id, action)
enqueue_event(queue, instance, request.user, request.id, event_type)
events_queue.set(queue)
# Increment metric counters
if action == OBJECT_CREATED:
if event_type == OBJECT_CREATED:
model_inserts.labels(instance._meta.model_name).inc()
elif action == OBJECT_UPDATED:
elif event_type == OBJECT_UPDATED:
model_updates.labels(instance._meta.model_name).inc()
@ -174,7 +174,7 @@ def handle_deleted_object(sender, instance, **kwargs):
# Enqueue the object for event processing
queue = events_queue.get()
enqueue_object(queue, instance, request.user, request.id, OBJECT_DELETED)
enqueue_event(queue, instance, request.user, request.id, OBJECT_DELETED)
events_queue.set(queue)
# Increment metric counters
@ -276,7 +276,13 @@ def process_job_start_event_rules(sender, **kwargs):
"""
event_rules = EventRule.objects.filter(type_job_start=True, enabled=True, object_types=sender.object_type)
username = sender.user.username if sender.user else None
process_event_rules(event_rules, sender.object_type, JOB_STARTED, sender.data, username)
process_event_rules(
event_rules=event_rules,
object_type=sender.object_type,
event_type=JOB_STARTED,
data=sender.data,
username=username
)
@receiver(job_end)
@ -286,7 +292,13 @@ def process_job_end_event_rules(sender, **kwargs):
"""
event_rules = EventRule.objects.filter(type_job_end=True, enabled=True, object_types=sender.object_type)
username = sender.user.username if sender.user else None
process_event_rules(event_rules, sender.object_type, JOB_COMPLETED, sender.data, username)
process_event_rules(
event_rules=event_rules,
object_type=sender.object_type,
event_type=JOB_COMPLETED,
data=sender.data,
username=username
)
#

View File

@ -14,7 +14,7 @@ from core.models import ObjectType
from dcim.choices import SiteStatusChoices
from dcim.models import Site
from extras.choices import EventRuleActionChoices
from extras.events import enqueue_object, flush_events, serialize_for_event
from extras.events import enqueue_event, flush_events, serialize_for_event
from extras.models import EventRule, Tag, Webhook
from extras.webhooks import generate_signature, send_webhook
from netbox.context_managers import event_tracking
@ -132,7 +132,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], OBJECT_CREATED)
self.assertEqual(job.kwargs['event_type'], OBJECT_CREATED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
@ -182,7 +182,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], OBJECT_CREATED)
self.assertEqual(job.kwargs['event_type'], OBJECT_CREATED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
@ -213,7 +213,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
@ -269,7 +269,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
@ -295,7 +295,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], OBJECT_DELETED)
self.assertEqual(job.kwargs['event_type'], OBJECT_DELETED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
@ -328,7 +328,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], OBJECT_DELETED)
self.assertEqual(job.kwargs['event_type'], OBJECT_DELETED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
@ -365,12 +365,12 @@ class EventRuleTest(APITestCase):
# Enqueue a webhook for processing
webhooks_queue = {}
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_object(
enqueue_event(
webhooks_queue,
instance=site,
user=self.user,
request_id=request_id,
action=OBJECT_CREATED
event_type=OBJECT_CREATED
)
flush_events(list(webhooks_queue.values()))
@ -378,7 +378,7 @@ class EventRuleTest(APITestCase):
job = self.queue.jobs[0]
# Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send:
with patch.object(Session, 'send', dummy_send):
send_webhook(**job.kwargs)
def test_duplicate_triggers(self):

View File

@ -25,7 +25,7 @@ def generate_signature(request_body, secret):
@job('default')
def send_webhook(event_rule, model_name, event, data, timestamp, username, request_id=None, snapshots=None):
def send_webhook(event_rule, model_name, event_type, data, timestamp, username, request_id=None, snapshots=None):
"""
Make a POST request to the defined Webhook
"""
@ -33,7 +33,7 @@ def send_webhook(event_rule, model_name, event, data, timestamp, username, reque
# Prepare context data for headers & body templates
context = {
'event': WEBHOOK_EVENT_TYPES[event],
'event': WEBHOOK_EVENT_TYPES.get(event_type, event_type),
'timestamp': timestamp,
'model': model_name,
'username': username,