mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-16 04:02:52 -06:00
parent
88565e8f68
commit
51d046b1f5
@ -9,6 +9,7 @@ from rq.registry import FailedJobRegistry, StartedJobRegistry
|
|||||||
|
|
||||||
from users.models import Token, User
|
from users.models import Token, User
|
||||||
from utilities.testing import APITestCase, APIViewTestCases, TestCase
|
from utilities.testing import APITestCase, APIViewTestCases, TestCase
|
||||||
|
from utilities.testing.utils import disable_logging
|
||||||
from ..models import *
|
from ..models import *
|
||||||
|
|
||||||
|
|
||||||
@ -189,7 +190,8 @@ class BackgroundTaskTestCase(TestCase):
|
|||||||
# Enqueue & run a job that will fail
|
# Enqueue & run a job that will fail
|
||||||
job = queue.enqueue(self.dummy_job_failing)
|
job = queue.enqueue(self.dummy_job_failing)
|
||||||
worker = get_worker('default')
|
worker = get_worker('default')
|
||||||
worker.work(burst=True)
|
with disable_logging():
|
||||||
|
worker.work(burst=True)
|
||||||
self.assertTrue(job.is_failed)
|
self.assertTrue(job.is_failed)
|
||||||
|
|
||||||
# Re-enqueue the failed job and check that its status has been reset
|
# Re-enqueue the failed job and check that its status has been reset
|
||||||
@ -231,7 +233,8 @@ class BackgroundTaskTestCase(TestCase):
|
|||||||
self.assertEqual(job.get_status(), JobStatus.STARTED)
|
self.assertEqual(job.get_status(), JobStatus.STARTED)
|
||||||
response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
|
response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
|
with disable_logging():
|
||||||
|
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
|
||||||
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
|
started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
|
||||||
self.assertEqual(len(started_job_registry), 0)
|
self.assertEqual(len(started_job_registry), 0)
|
||||||
|
|
||||||
|
@ -14,7 +14,7 @@ from core.choices import ObjectChangeActionChoices
|
|||||||
from core.models import *
|
from core.models import *
|
||||||
from dcim.models import Site
|
from dcim.models import Site
|
||||||
from users.models import User
|
from users.models import User
|
||||||
from utilities.testing import TestCase, ViewTestCases, create_tags
|
from utilities.testing import TestCase, ViewTestCases, create_tags, disable_logging
|
||||||
|
|
||||||
|
|
||||||
class DataSourceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
|
class DataSourceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
|
||||||
@ -271,7 +271,8 @@ class BackgroundTaskTestCase(TestCase):
|
|||||||
# Enqueue & run a job that will fail
|
# Enqueue & run a job that will fail
|
||||||
job = queue.enqueue(self.dummy_job_failing)
|
job = queue.enqueue(self.dummy_job_failing)
|
||||||
worker = get_worker('default')
|
worker = get_worker('default')
|
||||||
worker.work(burst=True)
|
with disable_logging():
|
||||||
|
worker.work(burst=True)
|
||||||
self.assertTrue(job.is_failed)
|
self.assertTrue(job.is_failed)
|
||||||
|
|
||||||
# Re-enqueue the failed job and check that its status has been reset
|
# Re-enqueue the failed job and check that its status has been reset
|
||||||
@ -317,7 +318,8 @@ class BackgroundTaskTestCase(TestCase):
|
|||||||
self.assertEqual(len(started_job_registry), 1)
|
self.assertEqual(len(started_job_registry), 1)
|
||||||
response = self.client.get(reverse('core:background_task_stop', args=[job.id]))
|
response = self.client.get(reverse('core:background_task_stop', args=[job.id]))
|
||||||
self.assertEqual(response.status_code, 302)
|
self.assertEqual(response.status_code, 302)
|
||||||
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
|
with disable_logging():
|
||||||
|
worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
|
||||||
self.assertEqual(len(started_job_registry), 0)
|
self.assertEqual(len(started_job_registry), 0)
|
||||||
|
|
||||||
canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
|
canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
|
||||||
|
@ -14,7 +14,7 @@ from ipam.models import ASN, RIR, VLAN, VRF
|
|||||||
from netbox.api.serializers import GenericObjectSerializer
|
from netbox.api.serializers import GenericObjectSerializer
|
||||||
from tenancy.models import Tenant
|
from tenancy.models import Tenant
|
||||||
from users.models import User
|
from users.models import User
|
||||||
from utilities.testing import APITestCase, APIViewTestCases, create_test_device
|
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
|
||||||
from virtualization.models import Cluster, ClusterType
|
from virtualization.models import Cluster, ClusterType
|
||||||
from wireless.choices import WirelessChannelChoices
|
from wireless.choices import WirelessChannelChoices
|
||||||
from wireless.models import WirelessLAN
|
from wireless.models import WirelessLAN
|
||||||
@ -1858,7 +1858,8 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase
|
|||||||
|
|
||||||
# Attempt to delete only the parent interface
|
# Attempt to delete only the parent interface
|
||||||
url = self._get_detail_url(interface1)
|
url = self._get_detail_url(interface1)
|
||||||
self.client.delete(url, **self.header)
|
with disable_logging():
|
||||||
|
self.client.delete(url, **self.header)
|
||||||
self.assertEqual(device.interfaces.count(), 4) # Parent was not deleted
|
self.assertEqual(device.interfaces.count(), 4) # Parent was not deleted
|
||||||
|
|
||||||
# Attempt to bulk delete parent & child together
|
# Attempt to bulk delete parent & child together
|
||||||
|
@ -2,7 +2,7 @@ import datetime
|
|||||||
|
|
||||||
from django.contrib.contenttypes.models import ContentType
|
from django.contrib.contenttypes.models import ContentType
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
from django.utils.timezone import make_aware
|
from django.utils.timezone import make_aware, now
|
||||||
from rest_framework import status
|
from rest_framework import status
|
||||||
|
|
||||||
from core.choices import ManagedFileRootPathChoices
|
from core.choices import ManagedFileRootPathChoices
|
||||||
@ -991,6 +991,10 @@ class SubscriptionTest(APIViewTestCases.APIViewTestCase):
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
cls.bulk_update_data = {
|
||||||
|
'user': users[3].pk,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class NotificationGroupTest(APIViewTestCases.APIViewTestCase):
|
class NotificationGroupTest(APIViewTestCases.APIViewTestCase):
|
||||||
model = NotificationGroup
|
model = NotificationGroup
|
||||||
@ -1072,6 +1076,9 @@ class NotificationGroupTest(APIViewTestCases.APIViewTestCase):
|
|||||||
class NotificationTest(APIViewTestCases.APIViewTestCase):
|
class NotificationTest(APIViewTestCases.APIViewTestCase):
|
||||||
model = Notification
|
model = Notification
|
||||||
brief_fields = ['display', 'event_type', 'id', 'object_id', 'object_type', 'read', 'url', 'user']
|
brief_fields = ['display', 'event_type', 'id', 'object_id', 'object_type', 'read', 'url', 'user']
|
||||||
|
bulk_update_data = {
|
||||||
|
'read': now(),
|
||||||
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpTestData(cls):
|
def setUpTestData(cls):
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import logging
|
||||||
import tempfile
|
import tempfile
|
||||||
from datetime import date, datetime, timezone
|
from datetime import date, datetime, timezone
|
||||||
|
|
||||||
@ -7,6 +8,7 @@ from netaddr import IPAddress, IPNetwork
|
|||||||
|
|
||||||
from dcim.models import DeviceRole
|
from dcim.models import DeviceRole
|
||||||
from extras.scripts import *
|
from extras.scripts import *
|
||||||
|
from utilities.testing import disable_logging
|
||||||
|
|
||||||
CHOICES = (
|
CHOICES = (
|
||||||
('ff0000', 'Red'),
|
('ff0000', 'Red'),
|
||||||
@ -39,7 +41,8 @@ class ScriptTest(TestCase):
|
|||||||
datafile.write(bytes(YAML_DATA, 'UTF-8'))
|
datafile.write(bytes(YAML_DATA, 'UTF-8'))
|
||||||
datafile.seek(0)
|
datafile.seek(0)
|
||||||
|
|
||||||
data = Script().load_yaml(datafile.name)
|
with disable_logging(level=logging.WARNING):
|
||||||
|
data = Script().load_yaml(datafile.name)
|
||||||
self.assertEqual(data, {
|
self.assertEqual(data, {
|
||||||
'Foo': 123,
|
'Foo': 123,
|
||||||
'Bar': 456,
|
'Bar': 456,
|
||||||
@ -51,7 +54,8 @@ class ScriptTest(TestCase):
|
|||||||
datafile.write(bytes(JSON_DATA, 'UTF-8'))
|
datafile.write(bytes(JSON_DATA, 'UTF-8'))
|
||||||
datafile.seek(0)
|
datafile.seek(0)
|
||||||
|
|
||||||
data = Script().load_json(datafile.name)
|
with disable_logging(level=logging.WARNING):
|
||||||
|
data = Script().load_json(datafile.name)
|
||||||
self.assertEqual(data, {
|
self.assertEqual(data, {
|
||||||
'Foo': 123,
|
'Foo': 123,
|
||||||
'Bar': 456,
|
'Bar': 456,
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
from netaddr import IPNetwork
|
from netaddr import IPNetwork
|
||||||
@ -9,7 +10,7 @@ from ipam.choices import *
|
|||||||
from ipam.models import *
|
from ipam.models import *
|
||||||
from tenancy.models import Tenant
|
from tenancy.models import Tenant
|
||||||
from utilities.data import string_to_ranges
|
from utilities.data import string_to_ranges
|
||||||
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_warnings
|
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
|
||||||
|
|
||||||
|
|
||||||
class AppTest(APITestCase):
|
class AppTest(APITestCase):
|
||||||
@ -1026,7 +1027,7 @@ class VLANTest(APIViewTestCases.APIViewTestCase):
|
|||||||
|
|
||||||
self.add_permissions('ipam.delete_vlan')
|
self.add_permissions('ipam.delete_vlan')
|
||||||
url = reverse('ipam-api:vlan-detail', kwargs={'pk': vlan.pk})
|
url = reverse('ipam-api:vlan-detail', kwargs={'pk': vlan.pk})
|
||||||
with disable_warnings('netbox.api.views.ModelViewSet'):
|
with disable_logging(level=logging.WARNING):
|
||||||
response = self.client.delete(url, **self.header)
|
response = self.client.delete(url, **self.header)
|
||||||
|
|
||||||
self.assertHttpStatus(response, status.HTTP_409_CONFLICT)
|
self.assertHttpStatus(response, status.HTTP_409_CONFLICT)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from django.test import Client, TestCase, override_settings
|
from django.test import Client, TestCase, override_settings
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
|
from drf_spectacular.drainage import GENERATOR_STATS
|
||||||
from rest_framework import status
|
from rest_framework import status
|
||||||
|
|
||||||
from core.models import ObjectType
|
from core.models import ObjectType
|
||||||
@ -264,5 +265,6 @@ class APIDocsTestCase(TestCase):
|
|||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
url = reverse('schema')
|
url = reverse('schema')
|
||||||
response = self.client.get(url)
|
with GENERATOR_STATS.silence(): # Suppress schema generator warnings
|
||||||
|
response = self.client.get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
from django.test import tag
|
from django.test import tag
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
from netaddr import IPNetwork
|
from netaddr import IPNetwork
|
||||||
@ -10,7 +12,9 @@ from extras.choices import CustomFieldTypeChoices
|
|||||||
from extras.models import ConfigTemplate, CustomField
|
from extras.models import ConfigTemplate, CustomField
|
||||||
from ipam.choices import VLANQinQRoleChoices
|
from ipam.choices import VLANQinQRoleChoices
|
||||||
from ipam.models import Prefix, VLAN, VRF
|
from ipam.models import Prefix, VLAN, VRF
|
||||||
from utilities.testing import APITestCase, APIViewTestCases, create_test_device, create_test_virtualmachine
|
from utilities.testing import (
|
||||||
|
APITestCase, APIViewTestCases, create_test_device, create_test_virtualmachine, disable_logging,
|
||||||
|
)
|
||||||
from virtualization.choices import *
|
from virtualization.choices import *
|
||||||
from virtualization.models import *
|
from virtualization.models import *
|
||||||
|
|
||||||
@ -402,7 +406,8 @@ class VMInterfaceTest(APIViewTestCases.APIViewTestCase):
|
|||||||
|
|
||||||
# Attempt to delete only the parent interface
|
# Attempt to delete only the parent interface
|
||||||
url = self._get_detail_url(interface1)
|
url = self._get_detail_url(interface1)
|
||||||
self.client.delete(url, **self.header)
|
with disable_logging(level=logging.WARNING):
|
||||||
|
self.client.delete(url, **self.header)
|
||||||
self.assertEqual(virtual_machine.interfaces.count(), 4) # Parent was not deleted
|
self.assertEqual(virtual_machine.interfaces.count(), 4) # Parent was not deleted
|
||||||
|
|
||||||
# Attempt to bulk delete parent & child together
|
# Attempt to bulk delete parent & child together
|
||||||
|
Loading…
Reference in New Issue
Block a user