From dcb64832227e21ad5c6f32bbdd40dcea48a1c94b Mon Sep 17 00:00:00 2001 From: Arthur Date: Thu, 16 Nov 2023 07:59:10 -0800 Subject: [PATCH] 14132 review changes --- netbox/core/models/jobs.py | 10 ++++++---- netbox/extras/events.py | 27 +++------------------------ netbox/extras/utils.py | 37 +++++++++++++++++++++++++++++++++++++ netbox/netbox/settings.py | 2 +- 4 files changed, 47 insertions(+), 29 deletions(-) diff --git a/netbox/core/models/jobs.py b/netbox/core/models/jobs.py index 497c6d700..0b43de7f0 100644 --- a/netbox/core/models/jobs.py +++ b/netbox/core/models/jobs.py @@ -12,7 +12,7 @@ from django.utils.translation import gettext as _ from core.choices import JobStatusChoices from extras.constants import EVENT_JOB_END, EVENT_JOB_START -from extras.utils import FeatureQuery +from extras.utils import FeatureQuery, process_event_rules from netbox.config import get_config from netbox.constants import RQ_QUEUE_DEFAULT from utilities.querysets import RestrictedQuerySet @@ -219,9 +219,6 @@ class Job(models.Model): def trigger_events(self, event): from extras.models import EventRule - rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) - rq_queue = django_rq.get_queue(rq_queue_name, is_async=False) - # Fetch any webhooks matching this object type and action event_rules = EventRule.objects.filter( **{f'type_{event}': True}, @@ -229,6 +226,11 @@ class Job(models.Model): enabled=True ) + process_event_rules(event_rules, event, self.data, self.user.username) + + rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) + rq_queue = django_rq.get_queue(rq_queue_name, is_async=False) + for event_rule in event_rules: rq_queue.enqueue( "extras.events_worker.process_event", diff --git a/netbox/extras/events.py b/netbox/extras/events.py index 52b452ae2..15f6be7b8 100644 --- a/netbox/extras/events.py +++ b/netbox/extras/events.py @@ -14,6 +14,7 @@ from utilities.rqworker import get_rq_retry from utilities.utils import serialize_object from .choices import * from .models import EventRule +from .utils import process_event_rules logger = logging.getLogger('netbox.events_processor') @@ -68,12 +69,10 @@ def enqueue_object(queue, instance, user, request_id, action): }) -def process_event_rules(queue): +def process_event_queue(queue): """ Flush a list of object representation to RQ for EventRule processing. """ - rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) - rq_queue = get_queue(rq_queue_name) events_cache = { 'type_create': {}, 'type_update': {}, @@ -81,7 +80,6 @@ def process_event_rules(queue): } for data in queue: - action_flag = { ObjectChangeActionChoices.ACTION_CREATE: 'type_create', ObjectChangeActionChoices.ACTION_UPDATE: 'type_update', @@ -98,26 +96,7 @@ def process_event_rules(queue): ) event_rules = events_cache[action_flag][content_type] - for event_rule in event_rules: - if event_rule.action_type == EventRuleActionChoices.WEBHOOK: - processor = "extras.webhooks_worker.process_webhook" - elif event_rule.action_type == EventRuleActionChoices.SCRIPT: - processor = "extras.scripts_worker.process_script" - else: - return - - rq_queue.enqueue( - processor, - event_rule=event_rule, - model_name=content_type.model, - event=data['event'], - data=data['data'], - snapshots=data['snapshots'], - timestamp=str(timezone.now()), - username=data['username'], - request_id=data['request_id'], - retry=get_rq_retry() - ) + process_event_rules(event_rules, data['event'], data['data'], data['username'], data['snapshots'], data['request_id']) def import_module(name): diff --git a/netbox/extras/utils.py b/netbox/extras/utils.py index e1be391e4..8c8d7d246 100644 --- a/netbox/extras/utils.py +++ b/netbox/extras/utils.py @@ -1,9 +1,12 @@ import logging from django.db.models import Q from django.utils.deconstruct import deconstructible +from django_rq import get_queue from taggit.managers import _TaggableManager from extras.conditions import ConditionSet +from extras.choices import EventRuleActionChoices +from netbox.config import get_config from netbox.registry import registry logger = logging.getLogger('netbox.extras.utils') @@ -111,3 +114,37 @@ def eval_conditions(event_rule, data): return True return False + + +def process_event_rules(event_rules, event, data, username, snapshots=None, request_id=None): + rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT) + rq_queue = get_queue(rq_queue_name) + + for event_rule in event_rules: + if event_rule.action_type == EventRuleActionChoices.WEBHOOK: + processor = "extras.webhooks_worker.process_webhook" + elif event_rule.action_type == EventRuleActionChoices.SCRIPT: + processor = "extras.scripts_worker.process_script" + else: + raise ValueError(f"Unknown Event Rule action type: {event_rule.action_type}") + + params = { + "event_rule": event_rule, + "model_name": content_type.model, + "event": event, + "data": data, + "snapshots": snapshots, + "timestamp": str(timezone.now()), + "username": username, + "retry": get_rq_retry() + } + + if snapshots: + params["snapshots"] = snapshots + if request_id: + params["request_id"] = request_id + + rq_queue.enqueue( + processor, + **params + ) diff --git a/netbox/netbox/settings.py b/netbox/netbox/settings.py index 8dc7a6d46..d85d2af5c 100644 --- a/netbox/netbox/settings.py +++ b/netbox/netbox/settings.py @@ -116,7 +116,7 @@ DEVELOPER = getattr(configuration, 'DEVELOPER', False) DOCS_ROOT = getattr(configuration, 'DOCS_ROOT', os.path.join(os.path.dirname(BASE_DIR), 'docs')) EMAIL = getattr(configuration, 'EMAIL', {}) EVENTS_PIPELINE = getattr(configuration, 'EVENTS_PIPELINE', ( - 'extras.events.process_event_rules', + 'extras.events.process_event_queue', )) EXEMPT_VIEW_PERMISSIONS = getattr(configuration, 'EXEMPT_VIEW_PERMISSIONS', []) FIELD_CHOICES = getattr(configuration, 'FIELD_CHOICES', {})