mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-13 16:47:34 -06:00
Merge f41da06d9b
into 9c2cd66162
This commit is contained in:
commit
b4dc90b182
@ -59,6 +59,7 @@ class Condition:
|
||||
if op not in self.TYPES[type(value)]:
|
||||
raise ValueError(_("Invalid type for {op} operation: {value}").format(op=op, value=type(value)))
|
||||
|
||||
self.op = op
|
||||
self.attr = attr
|
||||
self.value = value
|
||||
self.eval_func = getattr(self, f'eval_{op}')
|
||||
@ -79,11 +80,15 @@ class Condition:
|
||||
except TypeError:
|
||||
# Invalid key path
|
||||
value = None
|
||||
|
||||
if value is None:
|
||||
# Handle comparison case when value is None.
|
||||
if self.op in (self.GT, self.GTE, self.LT, self.LTE, self.IN, self.CONTAINS, self.REGEX):
|
||||
return False ^ self.negate
|
||||
|
||||
result = self.eval_func(value)
|
||||
|
||||
if self.negate:
|
||||
return not result
|
||||
return result
|
||||
return result ^ self.negate
|
||||
|
||||
# Equivalency
|
||||
|
||||
|
@ -11,6 +11,7 @@ from django_rq import get_queue
|
||||
from core.events import *
|
||||
from netbox.config import get_config
|
||||
from netbox.constants import RQ_QUEUE_DEFAULT
|
||||
from netbox.context_managers import suppress_event_exp
|
||||
from netbox.registry import registry
|
||||
from users.models import User
|
||||
from utilities.api import get_serializer_for_model
|
||||
@ -86,71 +87,73 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
|
||||
|
||||
for event_rule in event_rules:
|
||||
|
||||
# Evaluate event rule conditions (if any)
|
||||
if not event_rule.eval_conditions(data):
|
||||
continue
|
||||
with suppress_event_exp(logger):
|
||||
|
||||
# Compile event data
|
||||
event_data = event_rule.action_data or {}
|
||||
event_data.update(data)
|
||||
# Evaluate event rule conditions (if any)
|
||||
if not event_rule.eval_conditions(data):
|
||||
continue
|
||||
|
||||
# Webhooks
|
||||
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
|
||||
# Compile event data
|
||||
event_data = event_rule.action_data or {}
|
||||
event_data.update(data)
|
||||
|
||||
# Select the appropriate RQ queue
|
||||
queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
|
||||
rq_queue = get_queue(queue_name)
|
||||
# Webhooks
|
||||
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
|
||||
|
||||
# Compile the task parameters
|
||||
params = {
|
||||
"event_rule": event_rule,
|
||||
"model_name": object_type.model,
|
||||
"event_type": event_type,
|
||||
"data": event_data,
|
||||
"snapshots": snapshots,
|
||||
"timestamp": timezone.now().isoformat(),
|
||||
"username": username,
|
||||
"retry": get_rq_retry()
|
||||
}
|
||||
if snapshots:
|
||||
params["snapshots"] = snapshots
|
||||
if request_id:
|
||||
params["request_id"] = request_id
|
||||
# Select the appropriate RQ queue
|
||||
queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
|
||||
rq_queue = get_queue(queue_name)
|
||||
|
||||
# Enqueue the task
|
||||
rq_queue.enqueue(
|
||||
"extras.webhooks.send_webhook",
|
||||
**params
|
||||
)
|
||||
# Compile the task parameters
|
||||
params = {
|
||||
"event_rule": event_rule,
|
||||
"model_name": object_type.model,
|
||||
"event_type": event_type,
|
||||
"data": event_data,
|
||||
"snapshots": snapshots,
|
||||
"timestamp": timezone.now().isoformat(),
|
||||
"username": username,
|
||||
"retry": get_rq_retry()
|
||||
}
|
||||
if snapshots:
|
||||
params["snapshots"] = snapshots
|
||||
if request_id:
|
||||
params["request_id"] = request_id
|
||||
|
||||
# Scripts
|
||||
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
|
||||
# Resolve the script from action parameters
|
||||
script = event_rule.action_object.python_class()
|
||||
# Enqueue the task
|
||||
rq_queue.enqueue(
|
||||
"extras.webhooks.send_webhook",
|
||||
**params
|
||||
)
|
||||
|
||||
# Enqueue a Job to record the script's execution
|
||||
from extras.jobs import ScriptJob
|
||||
ScriptJob.enqueue(
|
||||
instance=event_rule.action_object,
|
||||
name=script.name,
|
||||
user=user,
|
||||
data=event_data
|
||||
)
|
||||
# Scripts
|
||||
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
|
||||
# Resolve the script from action parameters
|
||||
script = event_rule.action_object.python_class()
|
||||
|
||||
# Notification groups
|
||||
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
|
||||
# Bulk-create notifications for all members of the notification group
|
||||
event_rule.action_object.notify(
|
||||
object_type=object_type,
|
||||
object_id=event_data['id'],
|
||||
object_repr=event_data.get('display'),
|
||||
event_type=event_type
|
||||
)
|
||||
# Enqueue a Job to record the script's execution
|
||||
from extras.jobs import ScriptJob
|
||||
ScriptJob.enqueue(
|
||||
instance=event_rule.action_object,
|
||||
name=script.name,
|
||||
user=user,
|
||||
data=event_data
|
||||
)
|
||||
|
||||
else:
|
||||
raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
|
||||
action_type=event_rule.action_type
|
||||
))
|
||||
# Notification groups
|
||||
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
|
||||
# Bulk-create notifications for all members of the notification group
|
||||
event_rule.action_object.notify(
|
||||
object_type=object_type,
|
||||
object_id=event_data['id'],
|
||||
object_repr=event_data.get('display'),
|
||||
event_type=event_type
|
||||
)
|
||||
|
||||
else:
|
||||
raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
|
||||
action_type=event_rule.action_type
|
||||
))
|
||||
|
||||
|
||||
def process_event_queue(events):
|
||||
@ -160,27 +163,28 @@ def process_event_queue(events):
|
||||
events_cache = defaultdict(dict)
|
||||
|
||||
for event in events:
|
||||
event_type = event['event_type']
|
||||
object_type = event['object_type']
|
||||
with suppress_event_exp(logger):
|
||||
event_type = event['event_type']
|
||||
object_type = event['object_type']
|
||||
|
||||
# Cache applicable Event Rules
|
||||
if object_type not in events_cache[event_type]:
|
||||
events_cache[event_type][object_type] = EventRule.objects.filter(
|
||||
event_types__contains=[event['event_type']],
|
||||
object_types=object_type,
|
||||
enabled=True
|
||||
# Cache applicable Event Rules
|
||||
if object_type not in events_cache[event_type]:
|
||||
events_cache[event_type][object_type] = EventRule.objects.filter(
|
||||
event_types__contains=[event['event_type']],
|
||||
object_types=object_type,
|
||||
enabled=True
|
||||
)
|
||||
event_rules = events_cache[event_type][object_type]
|
||||
|
||||
process_event_rules(
|
||||
event_rules=event_rules,
|
||||
object_type=object_type,
|
||||
event_type=event['event_type'],
|
||||
data=event['data'],
|
||||
username=event['username'],
|
||||
snapshots=event['snapshots'],
|
||||
request_id=event['request_id']
|
||||
)
|
||||
event_rules = events_cache[event_type][object_type]
|
||||
|
||||
process_event_rules(
|
||||
event_rules=event_rules,
|
||||
object_type=object_type,
|
||||
event_type=event['event_type'],
|
||||
data=event['data'],
|
||||
username=event['username'],
|
||||
snapshots=event['snapshots'],
|
||||
request_id=event['request_id']
|
||||
)
|
||||
|
||||
|
||||
def flush_events(events):
|
||||
|
@ -62,65 +62,89 @@ class ConditionTestCase(TestCase):
|
||||
|
||||
def test_eq(self):
|
||||
c = Condition('x', 1, 'eq')
|
||||
self.assertTrue(c.eval({'x': 1}))
|
||||
self.assertFalse(c.eval({'x': 2}))
|
||||
self.assertTrue(c.eval({'x': 1})) # 1 == 1 → True
|
||||
self.assertFalse(c.eval({'x': 2})) # 2 == 1 → False
|
||||
self.assertFalse(c.eval({'x': None})) # None == 1 → False
|
||||
self.assertFalse(c.eval({'z': 1})) # Missing 'x' → treated as None → False
|
||||
|
||||
def test_eq_negated(self):
|
||||
c = Condition('x', 1, 'eq', negate=True)
|
||||
self.assertFalse(c.eval({'x': 1}))
|
||||
self.assertTrue(c.eval({'x': 2}))
|
||||
self.assertFalse(c.eval({'x': 1})) # not (1 == 1) → False
|
||||
self.assertTrue(c.eval({'x': 2})) # not (2 == 1) → True
|
||||
self.assertTrue(c.eval({'x': None})) # not (None == 1) → True
|
||||
self.assertTrue(c.eval({'z': 1})) # Missing 'x' → treated as None → True
|
||||
|
||||
def test_gt(self):
|
||||
c = Condition('x', 1, 'gt')
|
||||
self.assertTrue(c.eval({'x': 2}))
|
||||
self.assertFalse(c.eval({'x': 1}))
|
||||
self.assertTrue(c.eval({'x': 2})) # 2 > 1 → True
|
||||
self.assertFalse(c.eval({'x': 1})) # 1 > 1 → False
|
||||
self.assertFalse(c.eval({'x': None})) # None > 1 → False (safe handling)
|
||||
self.assertFalse(c.eval({'z': 1})) # Missing 'x' → treated as None → False
|
||||
|
||||
def test_gte(self):
|
||||
c = Condition('x', 1, 'gte')
|
||||
self.assertTrue(c.eval({'x': 2}))
|
||||
self.assertTrue(c.eval({'x': 1}))
|
||||
self.assertFalse(c.eval({'x': 0}))
|
||||
self.assertTrue(c.eval({'x': 2})) # 2 >= 1 → True
|
||||
self.assertTrue(c.eval({'x': 1})) # 1 >= 1 → True
|
||||
self.assertFalse(c.eval({'x': 0})) # 0 >= 1 → False
|
||||
self.assertFalse(c.eval({'x': None})) # None >= 1 → False
|
||||
self.assertFalse(c.eval({'z': 1})) # Missing 'x' → False
|
||||
|
||||
def test_lt(self):
|
||||
c = Condition('x', 2, 'lt')
|
||||
self.assertTrue(c.eval({'x': 1}))
|
||||
self.assertFalse(c.eval({'x': 2}))
|
||||
self.assertTrue(c.eval({'x': 1})) # 1 < 2 → True
|
||||
self.assertFalse(c.eval({'x': 2})) # 2 < 2 → False
|
||||
self.assertFalse(c.eval({'x': None})) # None < 2 → False
|
||||
self.assertFalse(c.eval({'z': 1})) # Missing 'x' → False
|
||||
|
||||
def test_lte(self):
|
||||
c = Condition('x', 2, 'lte')
|
||||
self.assertTrue(c.eval({'x': 1}))
|
||||
self.assertTrue(c.eval({'x': 2}))
|
||||
self.assertFalse(c.eval({'x': 3}))
|
||||
self.assertTrue(c.eval({'x': 1})) # 1 <= 2 → True
|
||||
self.assertTrue(c.eval({'x': 2})) # 2 <= 2 → True
|
||||
self.assertFalse(c.eval({'x': 3})) # 3 <= 2 → False
|
||||
self.assertFalse(c.eval({'x': None})) # None <= 2 → False
|
||||
self.assertFalse(c.eval({'z': 1})) # Missing 'x' → False
|
||||
|
||||
def test_in(self):
|
||||
c = Condition('x', [1, 2, 3], 'in')
|
||||
self.assertTrue(c.eval({'x': 1}))
|
||||
self.assertFalse(c.eval({'x': 9}))
|
||||
self.assertTrue(c.eval({'x': 1})) # 1 in [1,2,3] → True
|
||||
self.assertFalse(c.eval({'x': 9})) # 9 in [1,2,3] → False
|
||||
self.assertFalse(c.eval({'x': None})) # None in [1,2,3] → False
|
||||
self.assertFalse(c.eval({'z': 1})) # Missing 'x' → False
|
||||
|
||||
def test_in_negated(self):
|
||||
c = Condition('x', [1, 2, 3], 'in', negate=True)
|
||||
self.assertFalse(c.eval({'x': 1}))
|
||||
self.assertTrue(c.eval({'x': 9}))
|
||||
self.assertFalse(c.eval({'x': 1})) # not (1 in [1,2,3]) → False
|
||||
self.assertTrue(c.eval({'x': 9})) # not (9 in [1,2,3]) → True
|
||||
self.assertTrue(c.eval({'x': None})) # not (None in [1,2,3]) → True
|
||||
self.assertTrue(c.eval({'z': 1})) # Missing 'x' → True
|
||||
|
||||
def test_contains(self):
|
||||
c = Condition('x', 1, 'contains')
|
||||
self.assertTrue(c.eval({'x': [1, 2, 3]}))
|
||||
self.assertFalse(c.eval({'x': [2, 3, 4]}))
|
||||
self.assertTrue(c.eval({'x': [1, 2, 3]})) # 1 in [1,2,3] → True
|
||||
self.assertFalse(c.eval({'x': [2, 3, 4]})) # 1 in [2,3,4] → False
|
||||
self.assertFalse(c.eval({'x': None})) # 1 in None → False
|
||||
self.assertFalse(c.eval({'z': [1, 2, 3]})) # Missing 'x' → False
|
||||
|
||||
def test_contains_negated(self):
|
||||
c = Condition('x', 1, 'contains', negate=True)
|
||||
self.assertFalse(c.eval({'x': [1, 2, 3]}))
|
||||
self.assertTrue(c.eval({'x': [2, 3, 4]}))
|
||||
self.assertFalse(c.eval({'x': [1, 2, 3]})) # not (1 in [1,2,3]) → False
|
||||
self.assertTrue(c.eval({'x': [2, 3, 4]})) # not (1 in [2,3,4]) → True
|
||||
self.assertTrue(c.eval({'x': None})) # not (1 in None) → True
|
||||
self.assertTrue(c.eval({'z': [1, 2, 3]})) # Missing 'x' → True
|
||||
|
||||
def test_regex(self):
|
||||
c = Condition('x', '[a-z]+', 'regex')
|
||||
self.assertTrue(c.eval({'x': 'abc'}))
|
||||
self.assertFalse(c.eval({'x': '123'}))
|
||||
self.assertTrue(c.eval({'x': 'abc'})) # 'abc' matches regex → True
|
||||
self.assertFalse(c.eval({'x': '123'})) # '123' doesn't match → False
|
||||
self.assertFalse(c.eval({'x': None})) # None doesn't match → False
|
||||
self.assertFalse(c.eval({'z': 'abc'})) # Missing 'x' → False
|
||||
|
||||
def test_regex_negated(self):
|
||||
c = Condition('x', '[a-z]+', 'regex', negate=True)
|
||||
self.assertFalse(c.eval({'x': 'abc'}))
|
||||
self.assertTrue(c.eval({'x': '123'}))
|
||||
self.assertFalse(c.eval({'x': 'abc'})) # not (match) → False
|
||||
self.assertTrue(c.eval({'x': '123'})) # not (no match) → True
|
||||
self.assertTrue(c.eval({'x': None})) # not (None match) → True
|
||||
self.assertTrue(c.eval({'z': 'abc'})) # Missing 'x' → True
|
||||
|
||||
|
||||
class ConditionSetTest(TestCase):
|
||||
|
@ -390,6 +390,70 @@ class EventRuleTest(APITestCase):
|
||||
with patch.object(Session, 'send', dummy_send):
|
||||
send_webhook(**job.kwargs)
|
||||
|
||||
def test_suppression_flush_events(self):
|
||||
|
||||
# Use context manager to verify that exception was logged as we expected
|
||||
with self.assertLogs('netbox.events_processor', level='ERROR') as cm:
|
||||
|
||||
# Get the known event
|
||||
event = EventRule.objects.get(name='Event Rule 2')
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_PLANNED)
|
||||
|
||||
# Update an object via the REST API
|
||||
data = {
|
||||
'name': 'Site X',
|
||||
'status': SiteStatusChoices.STATUS_ACTIVE,
|
||||
}
|
||||
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
|
||||
self.add_permissions('dcim.change_site')
|
||||
response = self.client.patch(url, data, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_200_OK)
|
||||
|
||||
# Verify that a background task was queued for the updated object
|
||||
self.assertEqual(self.queue.count, 1)
|
||||
job = self.queue.jobs[0]
|
||||
self.assertEqual(job.kwargs['event_rule'], event)
|
||||
self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
|
||||
self.assertEqual(job.kwargs['model_name'], 'site')
|
||||
self.assertEqual(job.kwargs['data']['id'], site.pk)
|
||||
self.assertEqual(job.kwargs['data']['status']['value'], SiteStatusChoices.STATUS_ACTIVE)
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['status'], SiteStatusChoices.STATUS_PLANNED)
|
||||
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
|
||||
self.assertEqual(job.kwargs['snapshots']['postchange']['status'], SiteStatusChoices.STATUS_ACTIVE)
|
||||
|
||||
# Update the event with non-existent
|
||||
event.action_type = 'non-existent-action-type'
|
||||
event.save()
|
||||
|
||||
# Cleanup queue
|
||||
self.queue.empty()
|
||||
|
||||
# Verify that a queue is empty
|
||||
self.assertEqual(self.queue.count, 0)
|
||||
|
||||
# Update an object via the REST API
|
||||
data = {
|
||||
'name': 'Site X',
|
||||
'status': SiteStatusChoices.STATUS_PLANNED,
|
||||
}
|
||||
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
|
||||
self.add_permissions('dcim.change_site')
|
||||
response = self.client.patch(url, data, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_200_OK)
|
||||
|
||||
# Verify that a queue is still empty
|
||||
self.assertEqual(self.queue.count, 0)
|
||||
|
||||
# Verify that we have only one ERROR message in our log
|
||||
self.assertEqual(len(cm.output), 1)
|
||||
|
||||
# Verify message format
|
||||
pattern = (r"Error ValueError in ValueError at .*? - Unknown action type for"
|
||||
r" an event rule: non-existent-action-type")
|
||||
self.assertRegex(cm.output[0], pattern)
|
||||
|
||||
def test_duplicate_triggers(self):
|
||||
"""
|
||||
Test for erroneous duplicate event triggers resulting from saving an object multiple times
|
||||
|
@ -1,8 +1,8 @@
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
|
||||
from netbox.context import current_request, events_queue
|
||||
from netbox.utils import register_request_processor
|
||||
from extras.events import flush_events
|
||||
|
||||
|
||||
@register_request_processor
|
||||
@ -14,6 +14,8 @@ def event_tracking(request):
|
||||
|
||||
:param request: WSGIRequest object with a unique `id` set
|
||||
"""
|
||||
from extras.events import flush_events
|
||||
|
||||
current_request.set(request)
|
||||
events_queue.set({})
|
||||
|
||||
@ -26,3 +28,19 @@ def event_tracking(request):
|
||||
# Clear context vars
|
||||
current_request.set(None)
|
||||
events_queue.set({})
|
||||
|
||||
|
||||
@contextmanager
|
||||
def suppress_event_exp(logger=None):
|
||||
"""
|
||||
Suppress exceptions that may occur during event handling.
|
||||
"""
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
if logger:
|
||||
tb = e.__traceback__
|
||||
last_frame = tb.tb_frame if tb else None
|
||||
filename = last_frame.f_code.co_filename if last_frame else 'unknown'
|
||||
lineno = tb.tb_lineno if tb else 0
|
||||
logger.error(f"Error {e.__class__.__name__} in {e.__class__.__name__} at {filename}:{lineno} - {str(e)}")
|
||||
|
Loading…
Reference in New Issue
Block a user