conditions.py - Change condition with self.negate to result XOR self.negate for compactness.
context_managers.py added context manager suppress_event_exp to prevent missing exceptions in cycles related to sequential event handling
events.py - using cm suppress_event_exp.
test_event_rules.py - added test to emulate the exception in process_event_function and verify that exception was logged properly.
This commit is contained in:
Petr Voronov 2025-07-10 17:31:06 +03:00
parent ca9b2947ba
commit f41da06d9b
4 changed files with 167 additions and 82 deletions

View File

@ -80,16 +80,15 @@ class Condition:
except TypeError:
# Invalid key path
value = None
if value is None:
# Handle comparison case when value is None.
if self.op in (self.GT, self.GTE, self.LT, self.LTE, self.IN, self.CONTAINS, self.REGEX):
result = False
else:
result = self.eval_func(value)
return False ^ self.negate
if self.negate:
return not result
return result
result = self.eval_func(value)
return result ^ self.negate
# Equivalency

View File

@ -11,6 +11,7 @@ from django_rq import get_queue
from core.events import *
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.context_managers import suppress_event_exp
from netbox.registry import registry
from users.models import User
from utilities.api import get_serializer_for_model
@ -86,71 +87,73 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
for event_rule in event_rules:
# Evaluate event rule conditions (if any)
if not event_rule.eval_conditions(data):
continue
with suppress_event_exp(logger):
# Compile event data
event_data = event_rule.action_data or {}
event_data.update(data)
# Evaluate event rule conditions (if any)
if not event_rule.eval_conditions(data):
continue
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
# Compile event data
event_data = event_rule.action_data or {}
event_data.update(data)
# Select the appropriate RQ queue
queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(queue_name)
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
# Compile the task parameters
params = {
"event_rule": event_rule,
"model_name": object_type.model,
"event_type": event_type,
"data": event_data,
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
"username": username,
"retry": get_rq_retry()
}
if snapshots:
params["snapshots"] = snapshots
if request_id:
params["request_id"] = request_id
# Select the appropriate RQ queue
queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(queue_name)
# Enqueue the task
rq_queue.enqueue(
"extras.webhooks.send_webhook",
**params
)
# Compile the task parameters
params = {
"event_rule": event_rule,
"model_name": object_type.model,
"event_type": event_type,
"data": event_data,
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
"username": username,
"retry": get_rq_retry()
}
if snapshots:
params["snapshots"] = snapshots
if request_id:
params["request_id"] = request_id
# Scripts
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
# Resolve the script from action parameters
script = event_rule.action_object.python_class()
# Enqueue the task
rq_queue.enqueue(
"extras.webhooks.send_webhook",
**params
)
# Enqueue a Job to record the script's execution
from extras.jobs import ScriptJob
ScriptJob.enqueue(
instance=event_rule.action_object,
name=script.name,
user=user,
data=event_data
)
# Scripts
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
# Resolve the script from action parameters
script = event_rule.action_object.python_class()
# Notification groups
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
# Bulk-create notifications for all members of the notification group
event_rule.action_object.notify(
object_type=object_type,
object_id=event_data['id'],
object_repr=event_data.get('display'),
event_type=event_type
)
# Enqueue a Job to record the script's execution
from extras.jobs import ScriptJob
ScriptJob.enqueue(
instance=event_rule.action_object,
name=script.name,
user=user,
data=event_data
)
else:
raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
action_type=event_rule.action_type
))
# Notification groups
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
# Bulk-create notifications for all members of the notification group
event_rule.action_object.notify(
object_type=object_type,
object_id=event_data['id'],
object_repr=event_data.get('display'),
event_type=event_type
)
else:
raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
action_type=event_rule.action_type
))
def process_event_queue(events):
@ -160,27 +163,28 @@ def process_event_queue(events):
events_cache = defaultdict(dict)
for event in events:
event_type = event['event_type']
object_type = event['object_type']
with suppress_event_exp(logger):
event_type = event['event_type']
object_type = event['object_type']
# Cache applicable Event Rules
if object_type not in events_cache[event_type]:
events_cache[event_type][object_type] = EventRule.objects.filter(
event_types__contains=[event['event_type']],
object_types=object_type,
enabled=True
# Cache applicable Event Rules
if object_type not in events_cache[event_type]:
events_cache[event_type][object_type] = EventRule.objects.filter(
event_types__contains=[event['event_type']],
object_types=object_type,
enabled=True
)
event_rules = events_cache[event_type][object_type]
process_event_rules(
event_rules=event_rules,
object_type=object_type,
event_type=event['event_type'],
data=event['data'],
username=event['username'],
snapshots=event['snapshots'],
request_id=event['request_id']
)
event_rules = events_cache[event_type][object_type]
process_event_rules(
event_rules=event_rules,
object_type=object_type,
event_type=event['event_type'],
data=event['data'],
username=event['username'],
snapshots=event['snapshots'],
request_id=event['request_id']
)
def flush_events(events):

View File

@ -390,6 +390,70 @@ class EventRuleTest(APITestCase):
with patch.object(Session, 'send', dummy_send):
send_webhook(**job.kwargs)
def test_suppression_flush_events(self):
# Use context manager to verify that exception was logged as we expected
with self.assertLogs('netbox.events_processor', level='ERROR') as cm:
# Get the known event
event = EventRule.objects.get(name='Event Rule 2')
site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_PLANNED)
# Update an object via the REST API
data = {
'name': 'Site X',
'status': SiteStatusChoices.STATUS_ACTIVE,
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a background task was queued for the updated object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], event)
self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['data']['status']['value'], SiteStatusChoices.STATUS_ACTIVE)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['status'], SiteStatusChoices.STATUS_PLANNED)
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['status'], SiteStatusChoices.STATUS_ACTIVE)
# Update the event with non-existent
event.action_type = 'non-existent-action-type'
event.save()
# Cleanup queue
self.queue.empty()
# Verify that a queue is empty
self.assertEqual(self.queue.count, 0)
# Update an object via the REST API
data = {
'name': 'Site X',
'status': SiteStatusChoices.STATUS_PLANNED,
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a queue is still empty
self.assertEqual(self.queue.count, 0)
# Verify that we have only one ERROR message in our log
self.assertEqual(len(cm.output), 1)
# Verify message format
pattern = (r"Error ValueError in ValueError at .*? - Unknown action type for"
r" an event rule: non-existent-action-type")
self.assertRegex(cm.output[0], pattern)
def test_duplicate_triggers(self):
"""
Test for erroneous duplicate event triggers resulting from saving an object multiple times

View File

@ -1,8 +1,8 @@
import traceback
from contextlib import contextmanager
from netbox.context import current_request, events_queue
from netbox.utils import register_request_processor
from extras.events import flush_events
@register_request_processor
@ -14,6 +14,8 @@ def event_tracking(request):
:param request: WSGIRequest object with a unique `id` set
"""
from extras.events import flush_events
current_request.set(request)
events_queue.set({})
@ -26,3 +28,19 @@ def event_tracking(request):
# Clear context vars
current_request.set(None)
events_queue.set({})
@contextmanager
def suppress_event_exp(logger=None):
"""
Suppress exceptions that may occur during event handling.
"""
try:
yield
except Exception as e:
if logger:
tb = e.__traceback__
last_frame = tb.tb_frame if tb else None
filename = last_frame.f_code.co_filename if last_frame else 'unknown'
lineno = tb.tb_lineno if tb else 0
logger.error(f"Error {e.__class__.__name__} in {e.__class__.__name__} at {filename}:{lineno} - {str(e)}")