diff --git a/netbox/extras/conditions.py b/netbox/extras/conditions.py new file mode 100644 index 000000000..6aa6e776f --- /dev/null +++ b/netbox/extras/conditions.py @@ -0,0 +1,122 @@ +import functools + +__all__ = ( + 'Condition', + 'ConditionSet', +) + + +LOGIC_TYPES = ( + 'and', + 'or' +) + + +def is_ruleset(data): + """ + Determine whether the given dictionary looks like a rule set. + """ + return type(data) is dict and len(data) == 1 and list(data.keys())[0] in LOGIC_TYPES + + +class Condition: + """ + An individual conditional rule that evaluates a single attribute and its value. + + :param attr: The name of the attribute being evaluated + :param value: The value being compared + :param op: The logical operation to use when evaluating the value (default: 'eq') + """ + EQ = 'eq' + NEQ = 'neq' + GT = 'gt' + GTE = 'gte' + LT = 'lt' + LTE = 'lte' + IN = 'in' + CONTAINS = 'contains' + + OPERATORS = ( + EQ, NEQ, GT, GTE, LT, LTE, IN, CONTAINS + ) + + def __init__(self, attr, value, op=EQ): + self.attr = attr + self.value = value + if op not in self.OPERATORS: + raise ValueError(f"Unknown operator: {op}") + self.eval_func = getattr(self, f'eval_{op}') + + def eval(self, data): + """ + Evaluate the provided data to determine whether it matches the condition. + """ + value = functools.reduce(dict.get, self.attr.split('.'), data) + return self.eval_func(value) + + # Equivalency + + def eval_eq(self, value): + return value == self.value + + def eval_neq(self, value): + return value != self.value + + # Numeric comparisons + + def eval_gt(self, value): + return value > self.value + + def eval_gte(self, value): + return value >= self.value + + def eval_lt(self, value): + return value < self.value + + def eval_lte(self, value): + return value <= self.value + + # Membership + + def eval_in(self, value): + return value in self.value + + def eval_contains(self, value): + return self.value in value + + +class ConditionSet: + """ + A set of one or more Condition to be evaluated per the prescribed logic (AND or OR). Example: + + {"and": [ + {"attr": "foo", "op": "eq", "value": 1}, + {"attr": "bar", "op": "neq", "value": 2} + ]} + + :param ruleset: A dictionary mapping a logical operator to a list of conditional rules + """ + def __init__(self, ruleset): + if type(ruleset) is not dict: + raise ValueError(f"Ruleset must be a dictionary, not {type(ruleset)}.") + if len(ruleset) != 1: + raise ValueError(f"Ruleset must have exactly one logical operator (found {len(ruleset)})") + + # Determine the logic type + logic = list(ruleset.keys())[0] + if type(logic) is not str or logic.lower() not in LOGIC_TYPES: + raise ValueError(f"Invalid logic type: {logic} (must be 'and' or 'or')") + self.logic = logic.lower() + + # Compile the set of Conditions + self.conditions = [ + ConditionSet(rule) if is_ruleset(rule) else Condition(**rule) + for rule in ruleset[self.logic] + ] + + def eval(self, data): + """ + Evaluate the provided data to determine whether it matches this set of conditions. + """ + func = any if self.logic == 'or' else all + return func(d.eval(data) for d in self.conditions) diff --git a/netbox/extras/tests/test_conditions.py b/netbox/extras/tests/test_conditions.py new file mode 100644 index 000000000..7defca5b5 --- /dev/null +++ b/netbox/extras/tests/test_conditions.py @@ -0,0 +1,160 @@ +from django.test import TestCase + +from extras.conditions import Condition, ConditionSet + + +class ConditionTestCase(TestCase): + + def test_dotted_path_access(self): + c = Condition('a.b.c', 1, 'eq') + self.assertTrue(c.eval({'a': {'b': {'c': 1}}})) + self.assertFalse(c.eval({'a': {'b': {'c': 2}}})) + self.assertFalse(c.eval({'a': {'b': {'x': 1}}})) + + def test_undefined_attr(self): + c = Condition('x', 1, 'eq') + self.assertFalse(c.eval({})) + self.assertTrue(c.eval({'x': 1})) + + # + # Operator tests + # + + def test_default_operator(self): + c = Condition('x', 1) + self.assertEqual(c.eval_func, c.eval_eq) + + def test_eq(self): + c = Condition('x', 1, 'eq') + self.assertTrue(c.eval({'x': 1})) + self.assertFalse(c.eval({'x': 2})) + + def test_neq(self): + c = Condition('x', 1, 'neq') + self.assertFalse(c.eval({'x': 1})) + self.assertTrue(c.eval({'x': 2})) + + def test_gt(self): + c = Condition('x', 1, 'gt') + self.assertTrue(c.eval({'x': 2})) + self.assertFalse(c.eval({'x': 1})) + + def test_gte(self): + c = Condition('x', 1, 'gte') + self.assertTrue(c.eval({'x': 2})) + self.assertTrue(c.eval({'x': 1})) + self.assertFalse(c.eval({'x': 0})) + + def test_lt(self): + c = Condition('x', 2, 'lt') + self.assertTrue(c.eval({'x': 1})) + self.assertFalse(c.eval({'x': 2})) + + def test_lte(self): + c = Condition('x', 2, 'lte') + self.assertTrue(c.eval({'x': 1})) + self.assertTrue(c.eval({'x': 2})) + self.assertFalse(c.eval({'x': 3})) + + def test_in(self): + c = Condition('x', [1, 2, 3], 'in') + self.assertTrue(c.eval({'x': 1})) + self.assertFalse(c.eval({'x': 9})) + + def test_contains(self): + c = Condition('x', 1, 'contains') + self.assertTrue(c.eval({'x': [1, 2, 3]})) + self.assertFalse(c.eval({'x': [2, 3, 4]})) + + +class ConditionSetTest(TestCase): + + def test_empty(self): + with self.assertRaises(ValueError): + ConditionSet({}) + + def test_invalid_logic(self): + with self.assertRaises(ValueError): + ConditionSet({'foo': []}) + + def test_and_single_depth(self): + cs = ConditionSet({ + 'and': [ + {'attr': 'a', 'value': 1, 'op': 'eq'}, + {'attr': 'b', 'value': 2, 'op': 'eq'}, + ] + }) + self.assertTrue(cs.eval({'a': 1, 'b': 2})) + self.assertFalse(cs.eval({'a': 1, 'b': 3})) + + def test_or_single_depth(self): + cs = ConditionSet({ + 'or': [ + {'attr': 'a', 'value': 1, 'op': 'eq'}, + {'attr': 'b', 'value': 1, 'op': 'eq'}, + ] + }) + self.assertTrue(cs.eval({'a': 1, 'b': 2})) + self.assertTrue(cs.eval({'a': 2, 'b': 1})) + self.assertFalse(cs.eval({'a': 2, 'b': 2})) + + def test_and_multi_depth(self): + cs = ConditionSet({ + 'and': [ + {'attr': 'a', 'value': 1, 'op': 'eq'}, + {'and': [ + {'attr': 'b', 'value': 2, 'op': 'eq'}, + {'attr': 'c', 'value': 3, 'op': 'eq'}, + ]} + ] + }) + self.assertTrue(cs.eval({'a': 1, 'b': 2, 'c': 3})) + self.assertFalse(cs.eval({'a': 9, 'b': 2, 'c': 3})) + self.assertFalse(cs.eval({'a': 1, 'b': 9, 'c': 3})) + self.assertFalse(cs.eval({'a': 1, 'b': 2, 'c': 9})) + + def test_or_multi_depth(self): + cs = ConditionSet({ + 'or': [ + {'attr': 'a', 'value': 1, 'op': 'eq'}, + {'or': [ + {'attr': 'b', 'value': 2, 'op': 'eq'}, + {'attr': 'c', 'value': 3, 'op': 'eq'}, + ]} + ] + }) + self.assertTrue(cs.eval({'a': 1, 'b': 9, 'c': 9})) + self.assertTrue(cs.eval({'a': 9, 'b': 2, 'c': 9})) + self.assertTrue(cs.eval({'a': 9, 'b': 9, 'c': 3})) + self.assertFalse(cs.eval({'a': 9, 'b': 9, 'c': 9})) + + def test_mixed_and(self): + cs = ConditionSet({ + 'and': [ + {'attr': 'a', 'value': 1, 'op': 'eq'}, + {'or': [ + {'attr': 'b', 'value': 2, 'op': 'eq'}, + {'attr': 'c', 'value': 3, 'op': 'eq'}, + ]} + ] + }) + self.assertTrue(cs.eval({'a': 1, 'b': 2, 'c': 9})) + self.assertTrue(cs.eval({'a': 1, 'b': 9, 'c': 3})) + self.assertFalse(cs.eval({'a': 1, 'b': 9, 'c': 9})) + self.assertFalse(cs.eval({'a': 9, 'b': 2, 'c': 3})) + + def test_mixed_or(self): + cs = ConditionSet({ + 'or': [ + {'attr': 'a', 'value': 1, 'op': 'eq'}, + {'and': [ + {'attr': 'b', 'value': 2, 'op': 'eq'}, + {'attr': 'c', 'value': 3, 'op': 'eq'}, + ]} + ] + }) + self.assertTrue(cs.eval({'a': 1, 'b': 9, 'c': 9})) + self.assertTrue(cs.eval({'a': 9, 'b': 2, 'c': 3})) + self.assertTrue(cs.eval({'a': 1, 'b': 2, 'c': 9})) + self.assertFalse(cs.eval({'a': 9, 'b': 2, 'c': 9})) + self.assertFalse(cs.eval({'a': 9, 'b': 9, 'c': 3}))