Use core event types when queuing events

This commit is contained in:
Jeremy Stretch 2024-07-08 13:38:01 -04:00
parent 2bc7b208f1
commit 1dd5d0a0f9
6 changed files with 52 additions and 46 deletions

View File

@ -2,6 +2,16 @@ from django.utils.translation import gettext as _
from netbox.events import * from netbox.events import *
__all__ = (
'JOB_COMPLETED',
'JOB_ERRORED',
'JOB_FAILED',
'JOB_STARTED',
'OBJECT_CREATED',
'OBJECT_DELETED',
'OBJECT_UPDATED',
)
# Object events # Object events
OBJECT_CREATED = 'object_created' OBJECT_CREATED = 'object_created'
OBJECT_UPDATED = 'object_updated' OBJECT_UPDATED = 'object_updated'

View File

@ -13,7 +13,6 @@ from django.utils.translation import gettext as _
from core.choices import JobStatusChoices from core.choices import JobStatusChoices
from core.models import ObjectType from core.models import ObjectType
from core.signals import job_end, job_start from core.signals import job_end, job_start
from extras.constants import EVENT_JOB_END, EVENT_JOB_START
from netbox.config import get_config from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT from netbox.constants import RQ_QUEUE_DEFAULT
from utilities.querysets import RestrictedQuerySet from utilities.querysets import RestrictedQuerySet

View File

@ -1,12 +1,6 @@
from core.events import *
from extras.choices import LogLevelChoices from extras.choices import LogLevelChoices
# Events
EVENT_CREATE = 'create'
EVENT_UPDATE = 'update'
EVENT_DELETE = 'delete'
EVENT_JOB_START = 'job_start'
EVENT_JOB_END = 'job_end'
# Custom fields # Custom fields
CUSTOMFIELD_EMPTY_VALUES = (None, '', []) CUSTOMFIELD_EMPTY_VALUES = (None, '', [])
@ -14,11 +8,13 @@ CUSTOMFIELD_EMPTY_VALUES = (None, '', [])
HTTP_CONTENT_TYPE_JSON = 'application/json' HTTP_CONTENT_TYPE_JSON = 'application/json'
WEBHOOK_EVENT_TYPES = { WEBHOOK_EVENT_TYPES = {
EVENT_CREATE: 'created', OBJECT_CREATED: 'created',
EVENT_UPDATE: 'updated', OBJECT_UPDATED: 'updated',
EVENT_DELETE: 'deleted', OBJECT_DELETED: 'deleted',
EVENT_JOB_START: 'job_started', JOB_STARTED: 'job_started',
EVENT_JOB_END: 'job_ended', JOB_COMPLETED: 'job_ended',
JOB_FAILED: 'job_ended',
JOB_ERRORED: 'job_ended',
} }
# Dashboard # Dashboard

View File

@ -9,9 +9,8 @@ from django.utils.translation import gettext as _
from django_rq import get_queue from django_rq import get_queue
from core.choices import ObjectChangeActionChoices from core.choices import ObjectChangeActionChoices
from core.events import JOB_COMPLETED, JOB_STARTED, OBJECT_CREATED, OBJECT_DELETED, OBJECT_UPDATED from core.events import *
from core.models import Job from core.models import Job
from extras.constants import EVENT_CREATE, EVENT_DELETE, EVENT_JOB_END, EVENT_JOB_START, EVENT_UPDATE
from netbox.config import get_config from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.registry import registry from netbox.registry import registry
@ -138,18 +137,10 @@ def process_event_rules(event_rules, object_type, event, data, username=None, sn
# Notification groups # Notification groups
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION: elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
# Bulk-create notifications for all members of the notification group # Bulk-create notifications for all members of the notification group
# TODO: Support dynamic events upstream
event_name = {
EVENT_CREATE: OBJECT_CREATED,
EVENT_UPDATE: OBJECT_UPDATED,
EVENT_DELETE: OBJECT_DELETED,
EVENT_JOB_START: JOB_STARTED,
EVENT_JOB_END: JOB_COMPLETED,
}[event]
event_rule.action_object.notify( event_rule.action_object.notify(
object_type=object_type, object_type=object_type,
object_id=data['id'], object_id=data['id'],
event_name=event_name event_name=event
) )
else: else:
@ -170,9 +161,15 @@ def process_event_queue(events):
for data in events: for data in events:
action_flag = { action_flag = {
ObjectChangeActionChoices.ACTION_CREATE: 'type_create', # TODO: Add EventRule support for dynamically registered event types
ObjectChangeActionChoices.ACTION_UPDATE: 'type_update', OBJECT_CREATED: 'type_create',
ObjectChangeActionChoices.ACTION_DELETE: 'type_delete', OBJECT_UPDATED: 'type_update',
OBJECT_DELETED: 'type_delete',
JOB_STARTED: 'type_job_start',
JOB_COMPLETED: 'type_job_end',
# Map failed & errored jobs to type_job_end
JOB_FAILED: 'type_job_end',
JOB_ERRORED: 'type_job_end',
}[data['event']] }[data['event']]
content_type = data['content_type'] content_type = data['content_type']

View File

@ -10,10 +10,9 @@ from django.utils.translation import gettext_lazy as _
from django_prometheus.models import model_deletes, model_inserts, model_updates from django_prometheus.models import model_deletes, model_inserts, model_updates
from core.choices import ObjectChangeActionChoices from core.choices import ObjectChangeActionChoices
from core.events import OBJECT_UPDATED from core.events import *
from core.models import ObjectChange, ObjectType from core.models import ObjectChange, ObjectType
from core.signals import job_end, job_start from core.signals import job_end, job_start
from extras.constants import EVENT_JOB_END, EVENT_JOB_START
from extras.events import process_event_rules from extras.events import process_event_rules
from extras.models import EventRule, Notification, Subscription from extras.models import EventRule, Notification, Subscription
from netbox.config import get_config from netbox.config import get_config
@ -74,18 +73,23 @@ def handle_changed_object(sender, instance, **kwargs):
# Determine the type of change being made # Determine the type of change being made
if kwargs.get('created'): if kwargs.get('created'):
action = ObjectChangeActionChoices.ACTION_CREATE action = OBJECT_CREATED
elif 'created' in kwargs: elif 'created' in kwargs:
action = ObjectChangeActionChoices.ACTION_UPDATE action = OBJECT_UPDATED
elif kwargs.get('action') in ['post_add', 'post_remove'] and kwargs['pk_set']: elif kwargs.get('action') in ['post_add', 'post_remove'] and kwargs['pk_set']:
# m2m_changed with objects added or removed # m2m_changed with objects added or removed
m2m_changed = True m2m_changed = True
action = ObjectChangeActionChoices.ACTION_UPDATE action = OBJECT_UPDATED
else: else:
return return
# Create/update an ObjectChange record for this change # Create/update an ObjectChange record for this change
objectchange = instance.to_objectchange(action) change_event = {
OBJECT_CREATED: ObjectChangeActionChoices.ACTION_CREATE,
OBJECT_UPDATED: ObjectChangeActionChoices.ACTION_UPDATE,
OBJECT_DELETED: ObjectChangeActionChoices.ACTION_DELETE,
}[action]
objectchange = instance.to_objectchange(change_event)
# If this is a many-to-many field change, check for a previous ObjectChange instance recorded # If this is a many-to-many field change, check for a previous ObjectChange instance recorded
# for this object by this request and update it # for this object by this request and update it
if m2m_changed and ( if m2m_changed and (
@ -112,9 +116,9 @@ def handle_changed_object(sender, instance, **kwargs):
events_queue.set(queue) events_queue.set(queue)
# Increment metric counters # Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE: if action == OBJECT_CREATED:
model_inserts.labels(instance._meta.model_name).inc() model_inserts.labels(instance._meta.model_name).inc()
elif action == ObjectChangeActionChoices.ACTION_UPDATE: elif action == OBJECT_UPDATED:
model_updates.labels(instance._meta.model_name).inc() model_updates.labels(instance._meta.model_name).inc()
@ -170,7 +174,7 @@ def handle_deleted_object(sender, instance, **kwargs):
# Enqueue the object for event processing # Enqueue the object for event processing
queue = events_queue.get() queue = events_queue.get()
enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE) enqueue_object(queue, instance, request.user, request.id, OBJECT_DELETED)
events_queue.set(queue) events_queue.set(queue)
# Increment metric counters # Increment metric counters
@ -272,7 +276,7 @@ def process_job_start_event_rules(sender, **kwargs):
""" """
event_rules = EventRule.objects.filter(type_job_start=True, enabled=True, object_types=sender.object_type) event_rules = EventRule.objects.filter(type_job_start=True, enabled=True, object_types=sender.object_type)
username = sender.user.username if sender.user else None username = sender.user.username if sender.user else None
process_event_rules(event_rules, sender.object_type, EVENT_JOB_START, sender.data, username) process_event_rules(event_rules, sender.object_type, JOB_STARTED, sender.data, username)
@receiver(job_end) @receiver(job_end)
@ -282,7 +286,7 @@ def process_job_end_event_rules(sender, **kwargs):
""" """
event_rules = EventRule.objects.filter(type_job_end=True, enabled=True, object_types=sender.object_type) event_rules = EventRule.objects.filter(type_job_end=True, enabled=True, object_types=sender.object_type)
username = sender.user.username if sender.user else None username = sender.user.username if sender.user else None
process_event_rules(event_rules, sender.object_type, EVENT_JOB_END, sender.data, username) process_event_rules(event_rules, sender.object_type, JOB_COMPLETED, sender.data, username)
# #

View File

@ -9,7 +9,7 @@ from django.urls import reverse
from requests import Session from requests import Session
from rest_framework import status from rest_framework import status
from core.choices import ObjectChangeActionChoices from core.events import *
from core.models import ObjectType from core.models import ObjectType
from dcim.choices import SiteStatusChoices from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
@ -132,7 +132,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 1) self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0] job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True)) self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE) self.assertEqual(job.kwargs['event'], OBJECT_CREATED)
self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id']) self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags'])) self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
@ -182,7 +182,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 3) self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs): for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True)) self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE) self.assertEqual(job.kwargs['event'], OBJECT_CREATED)
self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id']) self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags'])) self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
@ -213,7 +213,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 1) self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0] job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True)) self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE) self.assertEqual(job.kwargs['event'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk) self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags'])) self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
@ -269,7 +269,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 3) self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs): for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True)) self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE) self.assertEqual(job.kwargs['event'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id']) self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags'])) self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
@ -295,7 +295,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 1) self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0] job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True)) self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE) self.assertEqual(job.kwargs['event'], OBJECT_DELETED)
self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk) self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1') self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
@ -328,7 +328,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(self.queue.count, 3) self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs): for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True)) self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE) self.assertEqual(job.kwargs['event'], OBJECT_DELETED)
self.assertEqual(job.kwargs['model_name'], 'site') self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk) self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name) self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
@ -370,7 +370,7 @@ class EventRuleTest(APITestCase):
instance=site, instance=site,
user=self.user, user=self.user,
request_id=request_id, request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE action=OBJECT_CREATED
) )
flush_events(list(webhooks_queue.values())) flush_events(list(webhooks_queue.values()))