Remove testing code

This commit is contained in:
Brian Tiemann 2024-11-21 11:29:15 -05:00
parent 4595eafd38
commit d4fbce5718

View File

@ -1,11 +1,8 @@
import time
from datetime import timedelta from datetime import timedelta
from redis import Redis
from django.test import TestCase from django.test import TestCase
from django.utils import timezone from django.utils import timezone
from django_rq import get_queue from django_rq import get_queue
from rq.job import Job as RQJob, InvalidJobOperation
from ..jobs import * from ..jobs import *
from core.models import DataSource, Job from core.models import DataSource, Job
@ -124,18 +121,7 @@ class EnqueueTest(JobRunnerTestCase):
def test_enqueue_once_after_enqueue(self): def test_enqueue_once_after_enqueue(self):
instance = DataSource() instance = DataSource()
redis = Redis()
job1 = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at()) job1 = TestJobRunner.enqueue(instance, schedule_at=self.get_schedule_at())
job1_rq = RQJob.fetch(str(job1.job_id), connection=redis)
job1_status = None
max_sleep = 5
sleep_count = 0
while job1_status is None and sleep_count < max_sleep:
try:
job1_status = job1_rq.get_status()
except InvalidJobOperation:
time.sleep(1)
sleep_count += 1
job2 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2)) job2 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at(2))
self.assertNotEqual(job1, job2) self.assertNotEqual(job1, job2)