Misc cleanup

This commit is contained in:
Jeremy Stretch 2023-11-30 14:54:09 -05:00
parent 78c2dba68f
commit c45bdf4884
6 changed files with 52 additions and 33 deletions

View File

@ -15,7 +15,7 @@ __all__ = [
'NestedImageAttachmentSerializer',
'NestedJournalEntrySerializer',
'NestedSavedFilterSerializer',
'NestedScriptModuleSerializer',
'NestedScriptSerializer',
'NestedTagSerializer', # Defined in netbox.api.serializers
'NestedWebhookSerializer',
]
@ -117,7 +117,7 @@ class NestedJournalEntrySerializer(WritableNestedSerializer):
fields = ['id', 'url', 'display', 'created']
class NestedScriptModuleSerializer(WritableNestedSerializer):
class NestedScriptSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='extras-api:script-detail',
lookup_field='full_name',
@ -127,7 +127,7 @@ class NestedScriptModuleSerializer(WritableNestedSerializer):
display = serializers.SerializerMethodField(read_only=True)
class Meta:
model = models.ScriptModule
model = models.Script
fields = ['id', 'url', 'display', 'name']
def get_display(self, obj):

View File

@ -1,17 +1,17 @@
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.api.serializers import JobSerializer
from core.api.nested_serializers import NestedDataSourceSerializer, NestedDataFileSerializer, NestedJobSerializer
from core.api.serializers import JobSerializer
from core.models import ContentType
from dcim.api.nested_serializers import (
NestedDeviceRoleSerializer, NestedDeviceTypeSerializer, NestedLocationSerializer, NestedPlatformSerializer,
NestedRegionSerializer, NestedSiteSerializer, NestedSiteGroupSerializer,
)
from dcim.models import DeviceRole, DeviceType, Location, Platform, Region, Site, SiteGroup
from drf_spectacular.utils import extend_schema_field
from drf_spectacular.types import OpenApiTypes
from extras.choices import *
from extras.models import *
from netbox.api.exceptions import SerializerNotFound
@ -84,17 +84,17 @@ class EventRuleSerializer(NetBoxModelSerializer):
@extend_schema_field(OpenApiTypes.OBJECT)
def get_action_object(self, instance):
context = {'request': self.context['request']}
if instance.action_type == EventRuleActionChoices.WEBHOOK:
# We need to manually instantiate the serializer for scripts
if instance.action_type == EventRuleActionChoices.SCRIPT:
module_id, script_name = instance.action_parameters['script_choice'].split(":", maxsplit=1)
script = instance.action_object.scripts[script_name]()
return NestedScriptSerializer(script, context=context).data
else:
serializer = get_serializer_for_model(
model=instance.action_object_type.model_class(),
prefix=NESTED_SERIALIZER_PREFIX
)
return serializer(instance.action_object, context=context).data
elif instance.action_type == EventRuleActionChoices.SCRIPT:
from extras.api.nested_serializers import NestedScriptModuleSerializer
module_id, script_name = instance.action_parameters['script_choice'].split(":", maxsplit=1)
script = instance.action_object.scripts[script_name]()
return NestedScriptModuleSerializer(script, context=context).data
#

View File

@ -1,11 +1,10 @@
import logging
import sys
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django_rq import get_queue
from django.utils import timezone
from django.utils.module_loading import import_string
from django_rq import get_queue
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT

View File

@ -73,10 +73,12 @@ class EventRuleTest(APITestCase):
Tag(name='Baz', slug='baz'),
))
def test_event_rule_conditions(self):
# Create a conditional Webhook
def test_eventrule_conditions(self):
"""
Test evaluation of EventRule conditions.
"""
event_rule = EventRule(
name='Conditional Webhook',
name='Event Rule 1',
type_create=True,
type_update=True,
conditions={
@ -103,7 +105,10 @@ class EventRuleTest(APITestCase):
# Evaluate the conditions (status='active')
self.assertTrue(event_rule.eval_conditions(data))
def test_enqueue_webhook_create(self):
def test_single_create_process_eventrule(self):
"""
Check that creating an object with an applicable EventRule queues a background task for the rule's action.
"""
# Create an object via the REST API
data = {
'name': 'Site 1',
@ -120,7 +125,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(Site.objects.count(), 1)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a job was queued for the object creation webhook
# Verify that a background task was queued for the new object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
@ -131,7 +136,11 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_create(self):
def test_bulk_create_process_eventrule(self):
"""
Check that bulk creating multiple objects with an applicable EventRule queues a background task for each
new object.
"""
# Create multiple objects via the REST API
data = [
{
@ -166,7 +175,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(Site.objects.count(), 3)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a webhook was queued for each object
# Verify that a background task was queued for each new object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
@ -177,7 +186,10 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_update(self):
def test_single_update_process_eventrule(self):
"""
Check that updating an object with an applicable EventRule queues a background task for the rule's action.
"""
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
@ -194,7 +206,7 @@ class EventRuleTest(APITestCase):
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
# Verify that a background task was queued for the updated object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
@ -207,7 +219,11 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_bulk_update(self):
def test_bulk_update_process_eventrule(self):
"""
Check that bulk updating multiple objects with an applicable EventRule queues a background task for each
updated object.
"""
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
@ -246,7 +262,7 @@ class EventRuleTest(APITestCase):
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
# Verify that a background task was queued for each updated object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
@ -259,7 +275,10 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_delete(self):
def test_single_delete_process_eventrule(self):
"""
Check that deleting an object with an applicable EventRule queues a background task for the rule's action.
"""
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
@ -269,7 +288,7 @@ class EventRuleTest(APITestCase):
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
# Verify that a task was queued for the deleted object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
@ -279,7 +298,11 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_delete(self):
def test_bulk_delete_process_eventrule(self):
"""
Check that bulk deleting multiple objects with an applicable EventRule queues a background task for each
deleted object.
"""
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
@ -298,7 +321,7 @@ class EventRuleTest(APITestCase):
response = self.client.delete(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
# Verify that a background task was queued for each deleted object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
@ -309,7 +332,6 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_webhooks_worker(self):
request_id = uuid.uuid4()
def dummy_send(_, request, **kwargs):

View File

@ -1,4 +1,3 @@
import json
import urllib.parse
import uuid
@ -11,7 +10,6 @@ from extras.choices import *
from extras.models import *
from utilities.testing import ViewTestCases, TestCase
User = get_user_model()