Optimize script jobs by skipping the intermediate event task

This commit is contained in:
Jeremy Stretch 2023-11-30 16:02:56 -05:00
parent c45bdf4884
commit a9b4a81421
3 changed files with 56 additions and 89 deletions

View File

@ -1,11 +1,14 @@
import logging import logging
from django.conf import settings from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.utils import timezone from django.utils import timezone
from django.utils.module_loading import import_string from django.utils.module_loading import import_string
from django_rq import get_queue from django_rq import get_queue
from core.models import Job
from netbox.config import get_config from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.registry import registry from netbox.registry import registry
@ -13,7 +16,7 @@ from utilities.api import get_serializer_for_model
from utilities.rqworker import get_rq_retry from utilities.rqworker import get_rq_retry
from utilities.utils import serialize_object from utilities.utils import serialize_object
from .choices import * from .choices import *
from .models import EventRule from .models import EventRule, ScriptModule
logger = logging.getLogger('netbox.events_processor') logger = logging.getLogger('netbox.events_processor')
@ -69,43 +72,65 @@ def enqueue_object(queue, instance, user, request_id, action):
def process_event_rules(event_rules, model_name, event, data, username, snapshots=None, request_id=None): def process_event_rules(event_rules, model_name, event, data, username, snapshots=None, request_id=None):
try:
user = get_user_model().objects.get(username=username)
except ObjectDoesNotExist:
user = None
for event_rule in event_rules: for event_rule in event_rules:
# Evaluate event rule conditions (if any)
if not event_rule.eval_conditions(data):
return
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK: if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
processor = "extras.webhooks_worker.process_webhook"
queue_class = 'webhook' # Select the appropriate RQ queue
queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(queue_name)
# Compile the task parameters
params = {
"event_rule": event_rule,
"model_name": model_name,
"event": event,
"data": data,
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
"username": username,
"retry": get_rq_retry()
}
if snapshots:
params["snapshots"] = snapshots
if request_id:
params["request_id"] = request_id
# Enqueue the task
rq_queue.enqueue(
"extras.webhooks_worker.process_webhook",
**params
)
# Scripts
elif event_rule.action_type == EventRuleActionChoices.SCRIPT: elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
processor = "extras.scripts_worker.process_script" # Resolve the script from action parameters
queue_class = 'script' script_module = event_rule.action_object
_, script_name = event_rule.action_parameters['script_choice'].split(":", maxsplit=1)
script = script_module.scripts[script_name]()
# Enqueue a Job to record the script's execution
Job.enqueue(
"extras.scripts.run_script",
instance=script_module,
name=script.class_name,
user=user,
data=data
)
else: else:
raise ValueError(f"Unknown action type for an event rule: {event_rule.action_type}") raise ValueError(f"Unknown action type for an event rule: {event_rule.action_type}")
# Select the appropriate RQ queue based on the action object type
queue_name = get_config().QUEUE_MAPPINGS.get(queue_class, RQ_QUEUE_DEFAULT)
rq_queue = get_queue(queue_name)
# Compile the task parameters
params = {
"event_rule": event_rule,
"model_name": model_name,
"event": event,
"data": data,
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
"username": username,
"retry": get_rq_retry()
}
if snapshots:
params["snapshots"] = snapshots
if request_id:
params["request_id"] = request_id
# Enqueue the task
rq_queue.enqueue(
processor,
**params
)
def process_event_queue(events): def process_event_queue(events):
""" """

View File

@ -1,54 +0,0 @@
import logging
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from django_rq import job
from core.models import Job
from extras.models import ScriptModule
from extras.scripts import run_script
logger = logging.getLogger('netbox.scripts_worker')
@job('default')
def process_script(event_rule, data, username, **kwargs):
"""
Run the requested script
"""
if not event_rule.eval_conditions(data):
return
script_choice = None
if event_rule.action_parameters and 'script_choice' in event_rule.action_parameters:
script_choice = event_rule.action_parameters['script_choice']
if script_choice:
module_id, script_name = script_choice.split(":", maxsplit=1)
else:
logger.warning(f"event run script - event_rule: {event_rule.id} no script_choice selected")
return
try:
module = ScriptModule.objects.get(pk=module_id)
except ScriptModule.DoesNotExist:
logger.warning(f"event run script - script module_id: {module_id} script_name: {script_name}")
return
try:
user = get_user_model().objects.get(username=username)
except ObjectDoesNotExist:
logger.warning(f"event run script - user does not exist username: {username} script_name: {script_name}")
return
script = module.scripts[script_name]()
Job.enqueue(
run_script,
instance=module,
name=script.class_name,
user=user,
schedule_at=None,
interval=None,
data=event_rule.action_data,
)

View File

@ -16,10 +16,6 @@ def process_webhook(event_rule, model_name, event, data, timestamp, username, re
""" """
Make a POST request to the defined Webhook Make a POST request to the defined Webhook
""" """
if not event_rule.eval_conditions(data):
return
webhook = event_rule.action_object webhook = event_rule.action_object
# Prepare context data for headers & body templates # Prepare context data for headers & body templates