14132 refactor pipeline code

This commit is contained in:
Arthur 2023-11-14 15:28:37 -08:00
parent 179564d6ff
commit 90cf3c8b9d
6 changed files with 66 additions and 67 deletions

View File

@ -1,6 +1,9 @@
import hashlib import hashlib
import hmac import hmac
import logging
import sys
from django.conf import settings
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.utils import timezone from django.utils import timezone
from django_rq import get_queue from django_rq import get_queue
@ -14,6 +17,8 @@ from utilities.utils import serialize_object
from .choices import * from .choices import *
from .models import EventRule, Webhook from .models import EventRule, Webhook
logger = logging.getLogger('netbox.events_processor')
def serialize_for_event(instance): def serialize_for_event(instance):
""" """
@ -65,7 +70,7 @@ def enqueue_object(queue, instance, user, request_id, action):
}) })
def flush_events(queue): def process_event_rules(queue):
""" """
Flush a list of object representation to RQ for webhook processing. Flush a list of object representation to RQ for webhook processing.
""" """
@ -96,8 +101,15 @@ def flush_events(queue):
event_rules = events_cache[action_flag][content_type] event_rules = events_cache[action_flag][content_type]
for event_rule in event_rules: for event_rule in event_rules:
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
processor = "extras.webhooks_worker.process_webhook"
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
processor = "extras.scripts_worker.process_script"
else:
return
rq_queue.enqueue( rq_queue.enqueue(
"extras.events_worker.process_event", processor,
event_rule=event_rule, event_rule=event_rule,
model_name=content_type.model, model_name=content_type.model,
event=data['event'], event=data['event'],
@ -108,3 +120,26 @@ def flush_events(queue):
request_id=data['request_id'], request_id=data['request_id'],
retry=get_rq_retry() retry=get_rq_retry()
) )
def import_module(name):
__import__(name)
return sys.modules[name]
def module_member(name):
mod, member = name.rsplit(".", 1)
module = import_module(mod)
return getattr(module, member)
def flush_events(queue):
"""
Flush a list of object representation to RQ for webhook processing.
"""
for name in settings.NETBOX_EVENTS_PIPELINE:
try:
func = module_member(name)
func(queue)
except Exception as e:
logger.error(f"Cannot import events pipeline {name} error: {e}")

View File

@ -1,64 +0,0 @@
import logging
import requests
import sys
from django.conf import settings
from django_rq import job
from jinja2.exceptions import TemplateError
from .conditions import ConditionSet
from .choices import EventRuleActionChoices
from .constants import WEBHOOK_EVENT_TYPES
from .scripts_worker import process_script
from .webhooks import generate_signature
from .webhooks_worker import process_webhook
logger = logging.getLogger('netbox.events_worker')
def eval_conditions(event_rule, data):
"""
Test whether the given data meets the conditions of the event rule (if any). Return True
if met or no conditions are specified.
"""
if not event_rule.conditions:
return True
logger.debug(f'Evaluating event rule conditions: {event_rule.conditions}')
if ConditionSet(event_rule.conditions).eval(data):
return True
return False
def import_module(name):
__import__(name)
return sys.modules[name]
def module_member(name):
mod, member = name.rsplit(".", 1)
module = import_module(mod)
return getattr(module, member)
def process_event_rules(event_rule, model_name, event, data, timestamp, username, request_id, snapshots):
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
process_webhook(event_rule, model_name, event, data, timestamp, username, request_id, snapshots)
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
process_script(event_rule, model_name, event, data, timestamp, username, request_id, snapshots)
@job('default')
def process_event(event_rule, model_name, event, data, timestamp, username, request_id=None, snapshots=None):
"""
Make a POST request to the defined Webhook
"""
# Evaluate event rule conditions (if any)
if not eval_conditions(event_rule, data):
return
# process the events pipeline
for name in settings.NETBOX_EVENTS_PIPELINE:
func = module_member(name)
func(event_rule, model_name, event, data, timestamp, username, request_id, snapshots)

View File

@ -11,15 +11,20 @@ from extras.conditions import ConditionSet
from extras.constants import WEBHOOK_EVENT_TYPES from extras.constants import WEBHOOK_EVENT_TYPES
from extras.models import ScriptModule from extras.models import ScriptModule
from extras.scripts import run_script from extras.scripts import run_script
from extras.utils import eval_conditions
from extras.webhooks import generate_signature from extras.webhooks import generate_signature
logger = logging.getLogger('netbox.webhooks_worker') logger = logging.getLogger('netbox.webhooks_worker')
@job('default')
def process_script(event_rule, model_name, event, data, timestamp, username, request_id=None, snapshots=None): def process_script(event_rule, model_name, event, data, timestamp, username, request_id=None, snapshots=None):
""" """
Run the requested script Run the requested script
""" """
if not eval_conditions(event_rule, data):
return
module_id = event_rule.action_object_identifier.split(":")[0] module_id = event_rule.action_object_identifier.split(":")[0]
script_name = event_rule.action_object_identifier.split(":")[1] script_name = event_rule.action_object_identifier.split(":")[1]

View File

@ -1,9 +1,12 @@
import logging
from django.db.models import Q from django.db.models import Q
from django.utils.deconstruct import deconstructible from django.utils.deconstruct import deconstructible
from taggit.managers import _TaggableManager from taggit.managers import _TaggableManager
from netbox.registry import registry from netbox.registry import registry
logger = logging.getLogger('netbox.extras.utils')
def is_taggable(obj): def is_taggable(obj):
""" """
@ -92,3 +95,18 @@ def is_report(obj):
return issubclass(obj, Report) and obj != Report return issubclass(obj, Report) and obj != Report
except TypeError: except TypeError:
return False return False
def eval_conditions(event_rule, data):
"""
Test whether the given data meets the conditions of the event rule (if any). Return True
if met or no conditions are specified.
"""
if not event_rule.conditions:
return True
logger.debug(f'Evaluating event rule conditions: {event_rule.conditions}')
if ConditionSet(event_rule.conditions).eval(data):
return True
return False

View File

@ -7,16 +7,21 @@ from jinja2.exceptions import TemplateError
from .conditions import ConditionSet from .conditions import ConditionSet
from .constants import WEBHOOK_EVENT_TYPES from .constants import WEBHOOK_EVENT_TYPES
from .utils import eval_conditions
from .webhooks import generate_signature from .webhooks import generate_signature
logger = logging.getLogger('netbox.webhooks_worker') logger = logging.getLogger('netbox.webhooks_worker')
@job('default')
def process_webhook(event_rule, model_name, event, data, timestamp, username, request_id=None, snapshots=None): def process_webhook(event_rule, model_name, event, data, timestamp, username, request_id=None, snapshots=None):
""" """
Make a POST request to the defined Webhook Make a POST request to the defined Webhook
""" """
if not eval_conditions(event_rule, data):
return
webhook = event_rule.action_object webhook = event_rule.action_object
# Prepare context data for headers & body templates # Prepare context data for headers & body templates

View File

@ -175,7 +175,7 @@ TIME_FORMAT = getattr(configuration, 'TIME_FORMAT', 'g:i a')
TIME_ZONE = getattr(configuration, 'TIME_ZONE', 'UTC') TIME_ZONE = getattr(configuration, 'TIME_ZONE', 'UTC')
ENABLE_LOCALIZATION = getattr(configuration, 'ENABLE_LOCALIZATION', False) ENABLE_LOCALIZATION = getattr(configuration, 'ENABLE_LOCALIZATION', False)
NETBOX_EVENTS_PIPELINE = getattr(configuration, 'NETBOX_EVENTS_PIPELINE', ( NETBOX_EVENTS_PIPELINE = getattr(configuration, 'NETBOX_EVENTS_PIPELINE', (
'extras.events_worker.process_event_rules', 'extras.events.process_event_rules',
)) ))