Select RQ queue based on action object type

This commit is contained in:
Jeremy Stretch 2023-11-27 09:40:17 -05:00
parent d14e868e46
commit e34eff2e87

View File

@ -69,17 +69,22 @@ def enqueue_object(queue, instance, user, request_id, action):
def process_event_rules(event_rules, model_name, event, data, username, snapshots=None, request_id=None): def process_event_rules(event_rules, model_name, event, data, username, snapshots=None, request_id=None):
rq_queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(rq_queue_name)
for event_rule in event_rules: for event_rule in event_rules:
if event_rule.action_type == EventRuleActionChoices.WEBHOOK: if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
processor = "extras.webhooks_worker.process_webhook" processor = "extras.webhooks_worker.process_webhook"
queue_class = 'webhook'
elif event_rule.action_type == EventRuleActionChoices.SCRIPT: elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
processor = "extras.scripts_worker.process_script" processor = "extras.scripts_worker.process_script"
queue_class = 'script'
else: else:
raise ValueError(f"Unknown action type for an event rule: {event_rule.action_type}") raise ValueError(f"Unknown action type for an event rule: {event_rule.action_type}")
# Select the appropriate RQ queue based on the action object type
queue_name = get_config().QUEUE_MAPPINGS.get(queue_class, RQ_QUEUE_DEFAULT)
rq_queue = get_queue(queue_name)
# Compile the task parameters
params = { params = {
"event_rule": event_rule, "event_rule": event_rule,
"model_name": model_name, "model_name": model_name,
@ -90,12 +95,12 @@ def process_event_rules(event_rules, model_name, event, data, username, snapshot
"username": username, "username": username,
"retry": get_rq_retry() "retry": get_rq_retry()
} }
if snapshots: if snapshots:
params["snapshots"] = snapshots params["snapshots"] = snapshots
if request_id: if request_id:
params["request_id"] = request_id params["request_id"] = request_id
# Enqueue the task
rq_queue.enqueue( rq_queue.enqueue(
processor, processor,
**params **params