Use Django's import_string() to reference pipeline functions

This commit is contained in:
Jeremy Stretch 2023-11-27 09:43:10 -05:00
parent e34eff2e87
commit 0066eed445

View File

@ -5,6 +5,7 @@ from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django_rq import get_queue
from django.utils import timezone
from django.utils.module_loading import import_string
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT
@ -140,17 +141,6 @@ def process_event_queue(events):
)
def import_module(name):
__import__(name)
return sys.modules[name]
def module_member(name):
mod, member = name.rsplit(".", 1)
module = import_module(mod)
return getattr(module, member)
def flush_events(queue):
"""
Flush a list of object representation to RQ for webhook processing.
@ -158,7 +148,7 @@ def flush_events(queue):
if queue:
for name in settings.EVENTS_PIPELINE:
try:
func = module_member(name)
func = import_string(name)
func(queue)
except Exception as e:
logger.error(f"Cannot import events pipeline {name} error: {e}")