mirror of
https://github.com/netbox-community/netbox.git
synced 2026-01-24 04:22:41 -06:00
Compare commits
2 Commits
21259-obje
...
21260-even
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e69fc9a4b4 | ||
|
|
a6c6a58fb9 |
@@ -9,7 +9,6 @@ from django.db import connection, models
|
||||
from django.db.models import Q
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from netbox.context import object_types_cache
|
||||
from netbox.plugins import PluginConfig
|
||||
from netbox.registry import registry
|
||||
from utilities.string import title
|
||||
@@ -71,12 +70,6 @@ class ObjectTypeManager(models.Manager):
|
||||
"""
|
||||
from netbox.models.features import get_model_features, model_is_public
|
||||
|
||||
# Check the request cache before hitting the database
|
||||
cache = object_types_cache.get()
|
||||
if cache is not None:
|
||||
if ot := cache.get((model._meta.model, for_concrete_model)):
|
||||
return ot
|
||||
|
||||
# TODO: Remove this in NetBox v5.0
|
||||
# If the ObjectType table has not yet been provisioned (e.g. because we're in a pre-v4.4 migration),
|
||||
# fall back to ContentType.
|
||||
@@ -103,10 +96,6 @@ class ObjectTypeManager(models.Manager):
|
||||
features=get_model_features(model),
|
||||
)[0]
|
||||
|
||||
# Populate the request cache to avoid redundant lookups
|
||||
if cache is not None:
|
||||
cache[(model._meta.model, for_concrete_model)] = ot
|
||||
|
||||
return ot
|
||||
|
||||
def get_for_models(self, *models, for_concrete_models=True):
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from collections import UserDict, defaultdict
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
@@ -12,7 +12,6 @@ from core.models import ObjectType
|
||||
from netbox.config import get_config
|
||||
from netbox.constants import RQ_QUEUE_DEFAULT
|
||||
from netbox.models.features import has_feature
|
||||
from users.models import User
|
||||
from utilities.api import get_serializer_for_model
|
||||
from utilities.request import copy_safe_request
|
||||
from utilities.rqworker import get_rq_retry
|
||||
@@ -23,6 +22,19 @@ from .models import EventRule
|
||||
logger = logging.getLogger('netbox.events_processor')
|
||||
|
||||
|
||||
class EventContext(UserDict):
|
||||
"""
|
||||
A custom dictionary that automatically serializes its associated object on demand.
|
||||
"""
|
||||
|
||||
def __getitem__(self, item):
|
||||
if item == 'data' and 'data' not in self:
|
||||
data = serialize_for_event(self['object'])
|
||||
self.__setitem__('data', data)
|
||||
return data
|
||||
return super().__getitem__(item)
|
||||
|
||||
|
||||
def serialize_for_event(instance):
|
||||
"""
|
||||
Return a serialized representation of the given instance suitable for use in a queued event.
|
||||
@@ -66,37 +78,42 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
assert instance.pk is not None
|
||||
key = f'{app_label}.{model_name}:{instance.pk}'
|
||||
if key in queue:
|
||||
queue[key]['data'] = serialize_for_event(instance)
|
||||
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
|
||||
# If the object is being deleted, update any prior "update" event to "delete"
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['event_type'] = event_type
|
||||
else:
|
||||
queue[key] = {
|
||||
'object_type': ObjectType.objects.get_for_model(instance),
|
||||
'object_id': instance.pk,
|
||||
'event_type': event_type,
|
||||
'data': serialize_for_event(instance),
|
||||
'snapshots': get_snapshots(instance, event_type),
|
||||
'request': request,
|
||||
queue[key] = EventContext(
|
||||
object_type=ObjectType.objects.get_for_model(instance),
|
||||
object_id=instance.pk,
|
||||
object=instance,
|
||||
event_type=event_type,
|
||||
snapshots=get_snapshots(instance, event_type),
|
||||
request=request,
|
||||
user=request.user,
|
||||
# Legacy request attributes for backward compatibility
|
||||
'username': request.user.username,
|
||||
'request_id': request.id,
|
||||
}
|
||||
username=request.user.username,
|
||||
request_id=request.id,
|
||||
)
|
||||
# Force serialization of objects prior to them actually being deleted
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['data'] = serialize_for_event(instance)
|
||||
|
||||
|
||||
def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request=None):
|
||||
user = None # To be resolved from the username if needed
|
||||
def process_event_rules(event_rules, object_type, event):
|
||||
"""
|
||||
Process a list of EventRules against an event.
|
||||
"""
|
||||
|
||||
for event_rule in event_rules:
|
||||
|
||||
# Evaluate event rule conditions (if any)
|
||||
if not event_rule.eval_conditions(data):
|
||||
if not event_rule.eval_conditions(event['data']):
|
||||
continue
|
||||
|
||||
# Compile event data
|
||||
event_data = event_rule.action_data or {}
|
||||
event_data.update(data)
|
||||
event_data.update(event['data'])
|
||||
|
||||
# Webhooks
|
||||
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
|
||||
@@ -109,50 +126,43 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
|
||||
params = {
|
||||
"event_rule": event_rule,
|
||||
"object_type": object_type,
|
||||
"event_type": event_type,
|
||||
"event_type": event['event_type'],
|
||||
"data": event_data,
|
||||
"snapshots": snapshots,
|
||||
"snapshots": event['snapshots'],
|
||||
"timestamp": timezone.now().isoformat(),
|
||||
"username": username,
|
||||
"username": event['username'],
|
||||
"retry": get_rq_retry()
|
||||
}
|
||||
if snapshots:
|
||||
params["snapshots"] = snapshots
|
||||
if request:
|
||||
if 'snapshots' in event:
|
||||
params['snapshots'] = event['snapshots']
|
||||
if 'request' in event:
|
||||
# Exclude FILES - webhooks don't need uploaded files,
|
||||
# which can cause pickle errors with Pillow.
|
||||
params["request"] = copy_safe_request(request, include_files=False)
|
||||
params['request'] = copy_safe_request(event['request'], include_files=False)
|
||||
|
||||
# Enqueue the task
|
||||
rq_queue.enqueue(
|
||||
"extras.webhooks.send_webhook",
|
||||
**params
|
||||
)
|
||||
rq_queue.enqueue('extras.webhooks.send_webhook', **params)
|
||||
|
||||
# Scripts
|
||||
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
|
||||
# Resolve the script from action parameters
|
||||
script = event_rule.action_object.python_class()
|
||||
|
||||
# Retrieve the User if not already resolved
|
||||
if user is None:
|
||||
user = User.objects.get(username=username)
|
||||
|
||||
# Enqueue a Job to record the script's execution
|
||||
from extras.jobs import ScriptJob
|
||||
params = {
|
||||
"instance": event_rule.action_object,
|
||||
"name": script.name,
|
||||
"user": user,
|
||||
"user": event['user'],
|
||||
"data": event_data
|
||||
}
|
||||
if snapshots:
|
||||
params["snapshots"] = snapshots
|
||||
if request:
|
||||
params["request"] = copy_safe_request(request)
|
||||
ScriptJob.enqueue(
|
||||
**params
|
||||
)
|
||||
if 'snapshots' in event:
|
||||
params['snapshots'] = event['snapshots']
|
||||
if 'request' in event:
|
||||
params['request'] = copy_safe_request(event['request'])
|
||||
|
||||
# Enqueue the job
|
||||
ScriptJob.enqueue(**params)
|
||||
|
||||
# Notification groups
|
||||
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
|
||||
@@ -161,7 +171,7 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
|
||||
object_type=object_type,
|
||||
object_id=event_data['id'],
|
||||
object_repr=event_data.get('display'),
|
||||
event_type=event_type
|
||||
event_type=event['event_type']
|
||||
)
|
||||
|
||||
else:
|
||||
@@ -173,6 +183,8 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
|
||||
def process_event_queue(events):
|
||||
"""
|
||||
Flush a list of object representation to RQ for EventRule processing.
|
||||
|
||||
This is the default processor listed in EVENTS_PIPELINE.
|
||||
"""
|
||||
events_cache = defaultdict(dict)
|
||||
|
||||
@@ -192,11 +204,7 @@ def process_event_queue(events):
|
||||
process_event_rules(
|
||||
event_rules=event_rules,
|
||||
object_type=object_type,
|
||||
event_type=event['event_type'],
|
||||
data=event['data'],
|
||||
username=event['username'],
|
||||
snapshots=event['snapshots'],
|
||||
request=event['request'],
|
||||
event=event,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ from django.dispatch import receiver
|
||||
|
||||
from core.events import *
|
||||
from core.signals import job_end, job_start
|
||||
from extras.events import process_event_rules
|
||||
from extras.events import EventContext, process_event_rules
|
||||
from extras.models import EventRule, Notification, Subscription
|
||||
from netbox.config import get_config
|
||||
from netbox.models.features import has_feature
|
||||
@@ -102,14 +102,12 @@ def process_job_start_event_rules(sender, **kwargs):
|
||||
enabled=True,
|
||||
object_types=sender.object_type
|
||||
)
|
||||
username = sender.user.username if sender.user else None
|
||||
process_event_rules(
|
||||
event_rules=event_rules,
|
||||
object_type=sender.object_type,
|
||||
event = EventContext(
|
||||
event_type=JOB_STARTED,
|
||||
data=sender.data,
|
||||
username=username
|
||||
user=sender.user,
|
||||
)
|
||||
process_event_rules(event_rules, sender.object_type, event)
|
||||
|
||||
|
||||
@receiver(job_end)
|
||||
@@ -122,14 +120,12 @@ def process_job_end_event_rules(sender, **kwargs):
|
||||
enabled=True,
|
||||
object_types=sender.object_type
|
||||
)
|
||||
username = sender.user.username if sender.user else None
|
||||
process_event_rules(
|
||||
event_rules=event_rules,
|
||||
object_type=sender.object_type,
|
||||
event = EventContext(
|
||||
event_type=JOB_COMPLETED,
|
||||
data=sender.data,
|
||||
username=username
|
||||
user=sender.user,
|
||||
)
|
||||
process_event_rules(event_rules, sender.object_type, event)
|
||||
|
||||
|
||||
#
|
||||
|
||||
@@ -3,10 +3,8 @@ from contextvars import ContextVar
|
||||
__all__ = (
|
||||
'current_request',
|
||||
'events_queue',
|
||||
'object_types_cache',
|
||||
)
|
||||
|
||||
|
||||
current_request = ContextVar('current_request', default=None)
|
||||
events_queue = ContextVar('events_queue', default=dict())
|
||||
object_types_cache = ContextVar('object_types_cache', default=None)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from contextlib import contextmanager
|
||||
|
||||
from netbox.context import current_request, events_queue, object_types_cache
|
||||
from netbox.context import current_request, events_queue
|
||||
from netbox.utils import register_request_processor
|
||||
from extras.events import flush_events
|
||||
|
||||
@@ -16,7 +16,6 @@ def event_tracking(request):
|
||||
"""
|
||||
current_request.set(request)
|
||||
events_queue.set({})
|
||||
object_types_cache.set({})
|
||||
|
||||
yield
|
||||
|
||||
@@ -27,4 +26,3 @@ def event_tracking(request):
|
||||
# Clear context vars
|
||||
current_request.set(None)
|
||||
events_queue.set({})
|
||||
object_types_cache.set(None)
|
||||
|
||||
Reference in New Issue
Block a user