#8958: Declare constants for event types

This commit is contained in:
jeremystretch 2023-03-20 13:04:05 -04:00
parent 6e93c3574c
commit 9219397208
2 changed files with 15 additions and 8 deletions

View File

@ -1,14 +1,20 @@
from django.contrib.contenttypes.models import ContentType # Events
EVENT_CREATE = 'create'
EVENT_UPDATE = 'update'
EVENT_DELETE = 'delete'
EVENT_JOB_START = 'job_start'
EVENT_JOB_END = 'job_end'
# Webhooks # Webhooks
HTTP_CONTENT_TYPE_JSON = 'application/json' HTTP_CONTENT_TYPE_JSON = 'application/json'
WEBHOOK_EVENT_TYPES = { WEBHOOK_EVENT_TYPES = {
'create': 'created', EVENT_CREATE: 'created',
'update': 'updated', EVENT_UPDATE: 'updated',
'delete': 'deleted', EVENT_DELETE: 'deleted',
'job_start': 'job_started', EVENT_JOB_START: 'job_started',
'job_end': 'job_ended', EVENT_JOB_END: 'job_ended',
} }
# Dashboard # Dashboard

View File

@ -5,6 +5,7 @@ from django.conf import settings
from django.contrib import admin from django.contrib import admin
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache from django.core.cache import cache
from django.core.validators import MinValueValidator, ValidationError from django.core.validators import MinValueValidator, ValidationError
from django.db import models from django.db import models
@ -698,7 +699,7 @@ class JobResult(models.Model):
JobResult.objects.filter(pk=self.pk).update(started=self.started, status=self.status) JobResult.objects.filter(pk=self.pk).update(started=self.started, status=self.status)
# Handle webhooks # Handle webhooks
self.trigger_webhooks(event='job_start') self.trigger_webhooks(event=EVENT_JOB_START)
def terminate(self, status=JobResultStatusChoices.STATUS_COMPLETED): def terminate(self, status=JobResultStatusChoices.STATUS_COMPLETED):
""" """
@ -714,7 +715,7 @@ class JobResult(models.Model):
JobResult.objects.filter(pk=self.pk).update(status=self.status, completed=self.completed) JobResult.objects.filter(pk=self.pk).update(status=self.status, completed=self.completed)
# Handle webhooks # Handle webhooks
self.trigger_webhooks(event='job_end') self.trigger_webhooks(event=EVENT_JOB_END)
@classmethod @classmethod
def enqueue_job(cls, func, name, obj_type, user, schedule_at=None, interval=None, *args, **kwargs): def enqueue_job(cls, func, name, obj_type, user, schedule_at=None, interval=None, *args, **kwargs):