Move ObjectChange creation into signal receivers

This commit is contained in:
Jeremy Stretch 2020-08-14 17:03:45 -04:00
parent b4299241fe
commit 4ee8e473eb
6 changed files with 89 additions and 141 deletions

View File

@ -1,64 +1,9 @@
import random
import threading
import uuid import uuid
from copy import deepcopy
from datetime import timedelta
from django.conf import settings from django.db.models.signals import m2m_changed, pre_delete, post_save
from django.contrib import messages
from django.db.models.signals import pre_delete, post_save
from django.utils import timezone
from django_prometheus.models import model_deletes, model_inserts, model_updates
from redis.exceptions import RedisError
from extras.utils import is_taggable from utilities.utils import curry
from utilities.api import is_api_request from .signals import _handle_changed_object, _handle_deleted_object
from utilities.querysets import DummyQuerySet
from .choices import ObjectChangeActionChoices
from .models import ObjectChange
from .signals import purge_changelog
from .webhooks import enqueue_webhooks
_thread_locals = threading.local()
def handle_changed_object(sender, instance, **kwargs):
"""
Fires when an object is created or updated.
"""
# Queue the object for processing once the request completes
action = ObjectChangeActionChoices.ACTION_CREATE if kwargs['created'] else ObjectChangeActionChoices.ACTION_UPDATE
_thread_locals.changed_objects.append(
(instance, action)
)
def handle_deleted_object(sender, instance, **kwargs):
"""
Fires when an object is deleted.
"""
# Cache custom fields prior to copying the instance
if hasattr(instance, 'cache_custom_fields'):
instance.cache_custom_fields()
# Create a copy of the object being deleted
copy = deepcopy(instance)
# Preserve tags
if is_taggable(instance):
copy.tags = DummyQuerySet(instance.tags.all())
# Queue the copy of the object for processing once the request completes
_thread_locals.changed_objects.append(
(copy, ObjectChangeActionChoices.ACTION_DELETE)
)
def purge_objectchange_cache(sender, **kwargs):
"""
Delete any queued object changes waiting to be written.
"""
_thread_locals.changed_objects = []
class ObjectChangeMiddleware(object): class ObjectChangeMiddleware(object):
@ -80,73 +25,35 @@ class ObjectChangeMiddleware(object):
def __call__(self, request): def __call__(self, request):
# Initialize an empty list to cache objects being saved.
_thread_locals.changed_objects = []
# Assign a random unique ID to the request. This will be used to associate multiple object changes made during # Assign a random unique ID to the request. This will be used to associate multiple object changes made during
# the same request. # the same request.
request.id = uuid.uuid4() request.id = uuid.uuid4()
# Curry signals receivers to pass the current request
handle_changed_object = curry(_handle_changed_object, request)
handle_deleted_object = curry(_handle_deleted_object, request)
# Connect our receivers to the post_save and post_delete signals. # Connect our receivers to the post_save and post_delete signals.
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object') post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.connect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.connect(handle_deleted_object, dispatch_uid='handle_deleted_object') pre_delete.connect(handle_deleted_object, dispatch_uid='handle_deleted_object')
# Provide a hook for purging the change cache
purge_changelog.connect(purge_objectchange_cache)
# Process the request # Process the request
response = self.get_response(request) response = self.get_response(request)
# If the change cache is empty, there's nothing more we need to do. # Disconnect change logging signals. This is necessary to avoid recording any errant
if not _thread_locals.changed_objects: # changes during test cleanup.
return response
# Disconnect our receivers from the post_save and post_delete signals.
post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object') post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object') pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object')
# Create records for any cached objects that were changed.
redis_failed = False
for instance, action in _thread_locals.changed_objects:
# Refresh cached custom field values
if action in [ObjectChangeActionChoices.ACTION_CREATE, ObjectChangeActionChoices.ACTION_UPDATE]:
if hasattr(instance, 'cache_custom_fields'):
instance.cache_custom_fields()
# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(action)
objectchange.user = request.user
objectchange.request_id = request.id
objectchange.save()
# Enqueue webhooks
try:
enqueue_webhooks(instance, request.user, request.id, action)
except RedisError as e:
if not redis_failed and not is_api_request(request):
messages.error(
request,
"There was an error processing webhooks for this request. Check that the Redis service is "
"running and reachable. The full error details were: {}".format(e)
)
redis_failed = True
# Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE:
model_inserts.labels(instance._meta.model_name).inc()
elif action == ObjectChangeActionChoices.ACTION_UPDATE:
model_updates.labels(instance._meta.model_name).inc()
elif action == ObjectChangeActionChoices.ACTION_DELETE:
model_deletes.labels(instance._meta.model_name).inc()
# Housekeeping: 1% chance of clearing out expired ObjectChanges. This applies only to requests which result in
# one or more changes being logged.
if settings.CHANGELOG_RETENTION and random.randint(1, 100) == 1:
cutoff = timezone.now() - timedelta(days=settings.CHANGELOG_RETENTION)
purged_count, _ = ObjectChange.objects.filter(
time__lt=cutoff
).delete()
return response return response
# TODO: Put this somewhere
# # Housekeeping: 1% chance of clearing out expired ObjectChanges. This applies only to requests which result in
# # one or more changes being logged.
# if settings.CHANGELOG_RETENTION and random.randint(1, 100) == 1:
# cutoff = timezone.now() - timedelta(days=settings.CHANGELOG_RETENTION)
# purged_count, _ = ObjectChange.objects.filter(
# time__lt=cutoff
# ).delete()

View File

@ -23,7 +23,6 @@ from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator,
from utilities.exceptions import AbortTransaction from utilities.exceptions import AbortTransaction
from utilities.forms import DynamicModelChoiceField, DynamicModelMultipleChoiceField from utilities.forms import DynamicModelChoiceField, DynamicModelMultipleChoiceField
from .forms import ScriptForm from .forms import ScriptForm
from .signals import purge_changelog
__all__ = [ __all__ = [
'BaseScript', 'BaseScript',
@ -465,7 +464,6 @@ def run_script(data, request, commit=True, *args, **kwargs):
if not commit: if not commit:
# Delete all pending changelog entries # Delete all pending changelog entries
purge_changelog.send(Script)
script.log_info( script.log_info(
"Database changes have been reverted automatically." "Database changes have been reverted automatically."
) )

View File

@ -1,7 +1,68 @@
from cacheops.signals import cache_invalidated, cache_read from cacheops.signals import cache_invalidated, cache_read
from django.dispatch import Signal from django_prometheus.models import model_deletes, model_inserts, model_updates
from prometheus_client import Counter from prometheus_client import Counter
from .choices import ObjectChangeActionChoices
from .webhooks import enqueue_webhooks
#
# Change logging/webhooks
#
def _handle_changed_object(request, sender, instance, **kwargs):
"""
Fires when an object is created or updated.
"""
# Queue the object for processing once the request completes
if kwargs.get('created'):
action = ObjectChangeActionChoices.ACTION_CREATE
elif 'created' in kwargs:
action = ObjectChangeActionChoices.ACTION_UPDATE
elif kwargs.get('action') in ['post_add', 'post_remove'] and kwargs['pk_set']:
# m2m_changed with objects added or removed
action = ObjectChangeActionChoices.ACTION_UPDATE
else:
return
# Cache any custom field values to ensure they are captured during serialization
if hasattr(instance, 'cache_custom_fields'):
instance.cache_custom_fields()
# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(action)
objectchange.user = request.user
objectchange.request_id = request.id
objectchange.save()
# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, action)
# Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE:
model_inserts.labels(instance._meta.model_name).inc()
elif action == ObjectChangeActionChoices.ACTION_UPDATE:
model_updates.labels(instance._meta.model_name).inc()
def _handle_deleted_object(request, sender, instance, **kwargs):
"""
Fires when an object is deleted.
"""
# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
objectchange.user = request.user
objectchange.request_id = request.id
objectchange.save()
# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
# Increment metric counters
model_deletes.labels(instance._meta.model_name).inc()
# #
# Caching # Caching
@ -25,10 +86,3 @@ def cache_invalidated_collector(sender, obj_dict, **kwargs):
cache_read.connect(cache_read_collector) cache_read.connect(cache_read_collector)
cache_invalidated.connect(cache_invalidated_collector) cache_invalidated.connect(cache_invalidated_collector)
#
# Change logging
#
purge_changelog = Signal()

View File

@ -3,7 +3,6 @@ import collections
from django.db.models import Q from django.db.models import Q
from django.utils.deconstruct import deconstructible from django.utils.deconstruct import deconstructible
from taggit.managers import _TaggableManager from taggit.managers import _TaggableManager
from utilities.querysets import DummyQuerySet
from extras.constants import EXTRAS_FEATURES from extras.constants import EXTRAS_FEATURES
from extras.registry import registry from extras.registry import registry
@ -16,9 +15,6 @@ def is_taggable(obj):
if hasattr(obj, 'tags'): if hasattr(obj, 'tags'):
if issubclass(obj.tags.__class__, _TaggableManager): if issubclass(obj.tags.__class__, _TaggableManager):
return True return True
# TaggableManager has been replaced with a DummyQuerySet prior to object deletion
if isinstance(obj.tags, DummyQuerySet):
return True
return False return False

View File

@ -3,20 +3,6 @@ from django.db.models import Q, QuerySet
from utilities.permissions import permission_is_exempt from utilities.permissions import permission_is_exempt
class DummyQuerySet:
"""
A fake QuerySet that can be used to cache relationships to objects that have been deleted.
"""
def __init__(self, queryset):
self._cache = [obj for obj in queryset.all()]
def __iter__(self):
return iter(self._cache)
def all(self):
return self._cache
class RestrictedQuerySet(QuerySet): class RestrictedQuerySet(QuerySet):
def restrict(self, user, action='view'): def restrict(self, user, action='view'):

View File

@ -276,6 +276,13 @@ def flatten_dict(d, prefix='', separator='.'):
return ret return ret
# Taken from django.utils.functional (<3.0)
def curry(_curried_func, *args, **kwargs):
def _curried(*moreargs, **morekwargs):
return _curried_func(*args, *moreargs, **{**kwargs, **morekwargs})
return _curried
# #
# Fake request object # Fake request object
# #