14132 review changes

This commit is contained in:
Arthur 2023-11-16 07:59:10 -08:00
parent 7a50706605
commit dcb6483222
4 changed files with 47 additions and 29 deletions

View File

@ -12,7 +12,7 @@ from django.utils.translation import gettext as _
from core.choices import JobStatusChoices
from extras.constants import EVENT_JOB_END, EVENT_JOB_START
from extras.utils import FeatureQuery
from extras.utils import FeatureQuery, process_event_rules
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT
from utilities.querysets import RestrictedQuerySet
@ -219,9 +219,6 @@ class Job(models.Model):
def trigger_events(self, event):
from extras.models import EventRule
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = django_rq.get_queue(rq_queue_name, is_async=False)
# Fetch any webhooks matching this object type and action
event_rules = EventRule.objects.filter(
**{f'type_{event}': True},
@ -229,6 +226,11 @@ class Job(models.Model):
enabled=True
)
process_event_rules(event_rules, event, self.data, self.user.username)
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = django_rq.get_queue(rq_queue_name, is_async=False)
for event_rule in event_rules:
rq_queue.enqueue(
"extras.events_worker.process_event",

View File

@ -14,6 +14,7 @@ from utilities.rqworker import get_rq_retry
from utilities.utils import serialize_object
from .choices import *
from .models import EventRule
from .utils import process_event_rules
logger = logging.getLogger('netbox.events_processor')
@ -68,12 +69,10 @@ def enqueue_object(queue, instance, user, request_id, action):
})
def process_event_rules(queue):
def process_event_queue(queue):
"""
Flush a list of object representation to RQ for EventRule processing.
"""
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(rq_queue_name)
events_cache = {
'type_create': {},
'type_update': {},
@ -81,7 +80,6 @@ def process_event_rules(queue):
}
for data in queue:
action_flag = {
ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
@ -98,26 +96,7 @@ def process_event_rules(queue):
)
event_rules = events_cache[action_flag][content_type]
for event_rule in event_rules:
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
processor = "extras.webhooks_worker.process_webhook"
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
processor = "extras.scripts_worker.process_script"
else:
return
rq_queue.enqueue(
processor,
event_rule=event_rule,
model_name=content_type.model,
event=data['event'],
data=data['data'],
snapshots=data['snapshots'],
timestamp=str(timezone.now()),
username=data['username'],
request_id=data['request_id'],
retry=get_rq_retry()
)
process_event_rules(event_rules, data['event'], data['data'], data['username'], data['snapshots'], data['request_id'])
def import_module(name):

View File

@ -1,9 +1,12 @@
import logging
from django.db.models import Q
from django.utils.deconstruct import deconstructible
from django_rq import get_queue
from taggit.managers import _TaggableManager
from extras.conditions import ConditionSet
from extras.choices import EventRuleActionChoices
from netbox.config import get_config
from netbox.registry import registry
logger = logging.getLogger('netbox.extras.utils')
@ -111,3 +114,37 @@ def eval_conditions(event_rule, data):
return True
return False
def process_event_rules(event_rules, event, data, username, snapshots=None, request_id=None):
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(rq_queue_name)
for event_rule in event_rules:
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
processor = "extras.webhooks_worker.process_webhook"
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
processor = "extras.scripts_worker.process_script"
else:
raise ValueError(f"Unknown Event Rule action type: {event_rule.action_type}")
params = {
"event_rule": event_rule,
"model_name": content_type.model,
"event": event,
"data": data,
"snapshots": snapshots,
"timestamp": str(timezone.now()),
"username": username,
"retry": get_rq_retry()
}
if snapshots:
params["snapshots"] = snapshots
if request_id:
params["request_id"] = request_id
rq_queue.enqueue(
processor,
**params
)

View File

@ -116,7 +116,7 @@ DEVELOPER = getattr(configuration, 'DEVELOPER', False)
DOCS_ROOT = getattr(configuration, 'DOCS_ROOT', os.path.join(os.path.dirname(BASE_DIR), 'docs'))
EMAIL = getattr(configuration, 'EMAIL', {})
EVENTS_PIPELINE = getattr(configuration, 'EVENTS_PIPELINE', (
'extras.events.process_event_rules',
'extras.events.process_event_queue',
))
EXEMPT_VIEW_PERMISSIONS = getattr(configuration, 'EXEMPT_VIEW_PERMISSIONS', [])
FIELD_CHOICES = getattr(configuration, 'FIELD_CHOICES', {})