Fixes #19633: Log all evaluations of invalid event rule conditions (#19885)
Some checks are pending
CI / build (20.x, 3.10) (push) Waiting to run
CI / build (20.x, 3.11) (push) Waiting to run
CI / build (20.x, 3.12) (push) Waiting to run

* flush_events() should catch only import errors

* Fixes #19633: Log all evaluations of invalid event rule conditions

* Correct comment
This commit is contained in:
Jeremy Stretch 2025-07-15 11:25:25 -04:00 committed by GitHub
parent f777bfee2e
commit e5d6c71171
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 48 additions and 21 deletions

View File

@ -158,6 +158,7 @@ LOGGING = {
* `netbox.<app>.<model>` - Generic form for model-specific log messages * `netbox.<app>.<model>` - Generic form for model-specific log messages
* `netbox.auth.*` - Authentication events * `netbox.auth.*` - Authentication events
* `netbox.api.views.*` - Views which handle business logic for the REST API * `netbox.api.views.*` - Views which handle business logic for the REST API
* `netbox.event_rules` - Event rules
* `netbox.reports.*` - Report execution (`module.name`) * `netbox.reports.*` - Report execution (`module.name`)
* `netbox.scripts.*` - Custom script execution (`module.name`) * `netbox.scripts.*` - Custom script execution (`module.name`)
* `netbox.views.*` - Views which handle business logic for the web UI * `netbox.views.*` - Views which handle business logic for the web UI

View File

@ -1,13 +1,14 @@
import functools import functools
import operator
import re import re
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
__all__ = ( __all__ = (
'Condition', 'Condition',
'ConditionSet', 'ConditionSet',
'InvalidCondition',
) )
AND = 'and' AND = 'and'
OR = 'or' OR = 'or'
@ -19,6 +20,10 @@ def is_ruleset(data):
return type(data) is dict and len(data) == 1 and list(data.keys())[0] in (AND, OR) return type(data) is dict and len(data) == 1 and list(data.keys())[0] in (AND, OR)
class InvalidCondition(Exception):
pass
class Condition: class Condition:
""" """
An individual conditional rule that evaluates a single attribute and its value. An individual conditional rule that evaluates a single attribute and its value.
@ -61,6 +66,7 @@ class Condition:
self.attr = attr self.attr = attr
self.value = value self.value = value
self.op = op
self.eval_func = getattr(self, f'eval_{op}') self.eval_func = getattr(self, f'eval_{op}')
self.negate = negate self.negate = negate
@ -70,16 +76,17 @@ class Condition:
""" """
def _get(obj, key): def _get(obj, key):
if isinstance(obj, list): if isinstance(obj, list):
return [dict.get(i, key) for i in obj] return [operator.getitem(item or {}, key) for item in obj]
return operator.getitem(obj or {}, key)
return dict.get(obj, key)
try: try:
value = functools.reduce(_get, self.attr.split('.'), data) value = functools.reduce(_get, self.attr.split('.'), data)
except TypeError: except KeyError:
# Invalid key path raise InvalidCondition(f"Invalid key path: {self.attr}")
value = None try:
result = self.eval_func(value) result = self.eval_func(value)
except TypeError as e:
raise InvalidCondition(f"Invalid data type at '{self.attr}' for '{self.op}' evaluation: {e}")
if self.negate: if self.negate:
return not result return not result

View File

@ -192,5 +192,5 @@ def flush_events(events):
try: try:
func = import_string(name) func = import_string(name)
func(events) func(events)
except Exception as e: except ImportError as e:
logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e)) logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))

View File

@ -13,7 +13,7 @@ from rest_framework.utils.encoders import JSONEncoder
from core.models import ObjectType from core.models import ObjectType
from extras.choices import * from extras.choices import *
from extras.conditions import ConditionSet from extras.conditions import ConditionSet, InvalidCondition
from extras.constants import * from extras.constants import *
from extras.utils import image_upload from extras.utils import image_upload
from extras.models.mixins import RenderTemplateMixin from extras.models.mixins import RenderTemplateMixin
@ -142,7 +142,15 @@ class EventRule(CustomFieldsMixin, ExportTemplatesMixin, TagsMixin, ChangeLogged
if not self.conditions: if not self.conditions:
return True return True
return ConditionSet(self.conditions).eval(data) logger = logging.getLogger('netbox.event_rules')
try:
result = ConditionSet(self.conditions).eval(data)
logger.debug(f'{self.name}: Evaluated as {result}')
return result
except InvalidCondition as e:
logger.error(f"{self.name}: Evaluation failed. {e}")
return False
class Webhook(CustomFieldsMixin, ExportTemplatesMixin, TagsMixin, ChangeLoggedModel): class Webhook(CustomFieldsMixin, ExportTemplatesMixin, TagsMixin, ChangeLoggedModel):

View File

@ -4,7 +4,7 @@ from django.test import TestCase
from core.events import * from core.events import *
from dcim.choices import SiteStatusChoices from dcim.choices import SiteStatusChoices
from dcim.models import Site from dcim.models import Site
from extras.conditions import Condition, ConditionSet from extras.conditions import Condition, ConditionSet, InvalidCondition
from extras.events import serialize_for_event from extras.events import serialize_for_event
from extras.forms import EventRuleForm from extras.forms import EventRuleForm
from extras.models import EventRule, Webhook from extras.models import EventRule, Webhook
@ -12,16 +12,11 @@ from extras.models import EventRule, Webhook
class ConditionTestCase(TestCase): class ConditionTestCase(TestCase):
def test_dotted_path_access(self):
c = Condition('a.b.c', 1, 'eq')
self.assertTrue(c.eval({'a': {'b': {'c': 1}}}))
self.assertFalse(c.eval({'a': {'b': {'c': 2}}}))
self.assertFalse(c.eval({'a': {'b': {'x': 1}}}))
def test_undefined_attr(self): def test_undefined_attr(self):
c = Condition('x', 1, 'eq') c = Condition('x', 1, 'eq')
self.assertFalse(c.eval({}))
self.assertTrue(c.eval({'x': 1})) self.assertTrue(c.eval({'x': 1}))
with self.assertRaises(InvalidCondition):
c.eval({})
# #
# Validation tests # Validation tests
@ -37,10 +32,13 @@ class ConditionTestCase(TestCase):
# dict type is unsupported # dict type is unsupported
Condition('x', 1, dict()) Condition('x', 1, dict())
def test_invalid_op_type(self): def test_invalid_op_types(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
# 'gt' supports only numeric values # 'gt' supports only numeric values
Condition('x', 'foo', 'gt') Condition('x', 'foo', 'gt')
with self.assertRaises(ValueError):
# 'in' supports only iterable values
Condition('x', 123, 'in')
# #
# Nested attrs tests # Nested attrs tests
@ -50,7 +48,10 @@ class ConditionTestCase(TestCase):
c = Condition('x.y.z', 1) c = Condition('x.y.z', 1)
self.assertTrue(c.eval({'x': {'y': {'z': 1}}})) self.assertTrue(c.eval({'x': {'y': {'z': 1}}}))
self.assertFalse(c.eval({'x': {'y': {'z': 2}}})) self.assertFalse(c.eval({'x': {'y': {'z': 2}}}))
self.assertFalse(c.eval({'a': {'b': {'c': 1}}})) with self.assertRaises(InvalidCondition):
c.eval({'x': {'y': None}})
with self.assertRaises(InvalidCondition):
c.eval({'x': {'y': {'a': 1}}})
# #
# Operator tests # Operator tests
@ -74,23 +75,31 @@ class ConditionTestCase(TestCase):
c = Condition('x', 1, 'gt') c = Condition('x', 1, 'gt')
self.assertTrue(c.eval({'x': 2})) self.assertTrue(c.eval({'x': 2}))
self.assertFalse(c.eval({'x': 1})) self.assertFalse(c.eval({'x': 1}))
with self.assertRaises(InvalidCondition):
c.eval({'x': 'foo'}) # Invalid type
def test_gte(self): def test_gte(self):
c = Condition('x', 1, 'gte') c = Condition('x', 1, 'gte')
self.assertTrue(c.eval({'x': 2})) self.assertTrue(c.eval({'x': 2}))
self.assertTrue(c.eval({'x': 1})) self.assertTrue(c.eval({'x': 1}))
self.assertFalse(c.eval({'x': 0})) self.assertFalse(c.eval({'x': 0}))
with self.assertRaises(InvalidCondition):
c.eval({'x': 'foo'}) # Invalid type
def test_lt(self): def test_lt(self):
c = Condition('x', 2, 'lt') c = Condition('x', 2, 'lt')
self.assertTrue(c.eval({'x': 1})) self.assertTrue(c.eval({'x': 1}))
self.assertFalse(c.eval({'x': 2})) self.assertFalse(c.eval({'x': 2}))
with self.assertRaises(InvalidCondition):
c.eval({'x': 'foo'}) # Invalid type
def test_lte(self): def test_lte(self):
c = Condition('x', 2, 'lte') c = Condition('x', 2, 'lte')
self.assertTrue(c.eval({'x': 1})) self.assertTrue(c.eval({'x': 1}))
self.assertTrue(c.eval({'x': 2})) self.assertTrue(c.eval({'x': 2}))
self.assertFalse(c.eval({'x': 3})) self.assertFalse(c.eval({'x': 3}))
with self.assertRaises(InvalidCondition):
c.eval({'x': 'foo'}) # Invalid type
def test_in(self): def test_in(self):
c = Condition('x', [1, 2, 3], 'in') c = Condition('x', [1, 2, 3], 'in')
@ -106,6 +115,8 @@ class ConditionTestCase(TestCase):
c = Condition('x', 1, 'contains') c = Condition('x', 1, 'contains')
self.assertTrue(c.eval({'x': [1, 2, 3]})) self.assertTrue(c.eval({'x': [1, 2, 3]}))
self.assertFalse(c.eval({'x': [2, 3, 4]})) self.assertFalse(c.eval({'x': [2, 3, 4]}))
with self.assertRaises(InvalidCondition):
c.eval({'x': 123}) # Invalid type
def test_contains_negated(self): def test_contains_negated(self):
c = Condition('x', 1, 'contains', negate=True) c = Condition('x', 1, 'contains', negate=True)