mirror of
https://github.com/netbox-community/netbox.git
synced 2025-08-18 13:38:16 -06:00
14729 add tests
This commit is contained in:
parent
0c07a144dd
commit
74bb0fca01
@ -1,6 +1,25 @@
|
|||||||
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
|
from django.contrib.auth.models import User
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
from django_rq import get_queue
|
||||||
|
from django.test import override_settings
|
||||||
|
from django.test.client import Client
|
||||||
|
from django_rq.settings import QUEUES_MAP, QUEUES_LIST
|
||||||
|
from django_rq.workers import get_worker
|
||||||
|
from django.urls import reverse
|
||||||
|
from rq import get_current_job
|
||||||
|
from rq.job import Job as RQ_Job
|
||||||
|
from rq.registry import (
|
||||||
|
DeferredJobRegistry,
|
||||||
|
FailedJobRegistry,
|
||||||
|
FinishedJobRegistry,
|
||||||
|
ScheduledJobRegistry,
|
||||||
|
StartedJobRegistry,
|
||||||
|
)
|
||||||
|
|
||||||
|
from utilities.testing import TestCase, ViewTestCases, create_tags
|
||||||
|
|
||||||
from utilities.testing import ViewTestCases, create_tags
|
|
||||||
from ..models import *
|
from ..models import *
|
||||||
|
|
||||||
|
|
||||||
@ -87,3 +106,148 @@ class DataFileTestCase(
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
DataFile.objects.bulk_create(data_files)
|
DataFile.objects.bulk_create(data_files)
|
||||||
|
|
||||||
|
|
||||||
|
def test_job_default():
|
||||||
|
return 'TestNBJob'
|
||||||
|
|
||||||
|
|
||||||
|
def test_job_high():
|
||||||
|
return 'TestNBJob'
|
||||||
|
|
||||||
|
|
||||||
|
class BackgroundTaskTestCase(TestCase):
|
||||||
|
user_permissions = ()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.user.is_staff = True
|
||||||
|
self.user.is_active = True
|
||||||
|
self.user.save()
|
||||||
|
get_queue('default').connection.flushall()
|
||||||
|
get_queue('high').connection.flushall()
|
||||||
|
get_queue('low').connection.flushall()
|
||||||
|
|
||||||
|
def test_background_queue_list(self):
|
||||||
|
response = self.client.get(reverse('core:background_queue_list'))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertIn('default', str(response.content))
|
||||||
|
self.assertIn('high', str(response.content))
|
||||||
|
self.assertIn('low', str(response.content))
|
||||||
|
|
||||||
|
def test_background_queue_list_no_perm(self):
|
||||||
|
self.user.is_staff = False
|
||||||
|
self.user.save()
|
||||||
|
response = self.client.get(reverse('core:background_queue_list'))
|
||||||
|
self.assertEqual(response.status_code, 403)
|
||||||
|
self.user.is_staff = True
|
||||||
|
self.user.save()
|
||||||
|
|
||||||
|
def test_background_tasks_list_default(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue(test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertIn('core.tests.test_views.test_job_default', str(response.content))
|
||||||
|
|
||||||
|
def test_background_tasks_list_high(self):
|
||||||
|
queue = get_queue('high')
|
||||||
|
job = queue.enqueue(test_job_high)
|
||||||
|
queue_index = QUEUES_MAP['high']
|
||||||
|
|
||||||
|
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertIn('core.tests.test_views.test_job_high', str(response.content))
|
||||||
|
|
||||||
|
def test_background_tasks_list_finished(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue(test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
registry = FinishedJobRegistry(queue.name, queue.connection)
|
||||||
|
registry.add(job, 2)
|
||||||
|
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'finished']))
|
||||||
|
self.assertIn('core.tests.test_views.test_job_default', str(response.content))
|
||||||
|
|
||||||
|
def test_background_tasks_list_failed(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue(test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
registry = FailedJobRegistry(queue.name, queue.connection)
|
||||||
|
registry.add(job, 2)
|
||||||
|
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'failed']))
|
||||||
|
self.assertIn('core.tests.test_views.test_job_default', str(response.content))
|
||||||
|
|
||||||
|
def test_background_tasks_scheduled(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue_at(datetime.now(), test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled']))
|
||||||
|
self.assertIn('core.tests.test_views.test_job_default', str(response.content))
|
||||||
|
|
||||||
|
def test_background_tasks_list_deferred(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue(test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
registry = DeferredJobRegistry(queue.name, queue.connection)
|
||||||
|
registry.add(job, 2)
|
||||||
|
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'deferred']))
|
||||||
|
self.assertIn('core.tests.test_views.test_job_default', str(response.content))
|
||||||
|
|
||||||
|
def test_background_task(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue(test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
response = self.client.get(reverse('core:background_task', args=[job.id]))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertIn('Background Tasks', str(response.content))
|
||||||
|
self.assertIn(str(job.id), str(response.content))
|
||||||
|
self.assertIn('Callable', str(response.content))
|
||||||
|
self.assertIn('Meta', str(response.content))
|
||||||
|
self.assertIn('Kwargs', str(response.content))
|
||||||
|
|
||||||
|
def test_background_task_delete(self):
|
||||||
|
queue = get_queue('default')
|
||||||
|
job = queue.enqueue(test_job_default)
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
|
||||||
|
response = self.client.post(reverse('core:background_task_delete', args=[job.id]), {'confirm': True})
|
||||||
|
self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
|
||||||
|
self.assertNotIn(job.id, queue.job_ids)
|
||||||
|
|
||||||
|
def test_background_task_requeue(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_background_task_enqueue(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_background_task_stop(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_worker_list(self):
|
||||||
|
worker1 = get_worker('default', name=uuid.uuid4().hex)
|
||||||
|
worker1.register_birth()
|
||||||
|
|
||||||
|
worker2 = get_worker('high')
|
||||||
|
worker2.register_birth()
|
||||||
|
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
response = self.client.get(reverse('core:worker_list', args=[queue_index]))
|
||||||
|
self.assertIn(str(worker1.name), str(response.content))
|
||||||
|
self.assertNotIn(str(worker2.name), str(response.content))
|
||||||
|
|
||||||
|
def test_worker(self):
|
||||||
|
worker1 = get_worker('default', name=uuid.uuid4().hex)
|
||||||
|
worker1.register_birth()
|
||||||
|
|
||||||
|
queue_index = QUEUES_MAP['default']
|
||||||
|
response = self.client.get(reverse('core:worker', args=[worker1.name]))
|
||||||
|
self.assertIn(str(worker1.name), str(response.content))
|
||||||
|
self.assertIn('Birth', str(response.content))
|
||||||
|
self.assertIn('Total working time (seconds)', str(response.content))
|
||||||
|
@ -311,7 +311,7 @@ class BackgroundTaskListView(BaseTaskListView):
|
|||||||
|
|
||||||
for job_id in job_ids:
|
for job_id in job_ids:
|
||||||
try:
|
try:
|
||||||
jobs.append(Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
|
jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
|
||||||
except NoSuchJobError:
|
except NoSuchJobError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user