Use SyncDataSourceJob for management command

Instead of maintaining two separate job execution logics, the same job
is now used for both background and interactive execution.
This commit is contained in:
Alexander Haase 2024-06-21 00:52:55 +02:00
parent 957bc3d3de
commit db591d422a
3 changed files with 18 additions and 10 deletions

View File

@ -34,13 +34,12 @@ class Command(BaseCommand):
for i, datasource in enumerate(datasources, start=1): for i, datasource in enumerate(datasources, start=1):
self.stdout.write(f"[{i}] Syncing {datasource}... ", ending='') self.stdout.write(f"[{i}] Syncing {datasource}... ", ending='')
self.stdout.flush() self.stdout.flush()
try:
datasource.sync() datasource.enqueue_sync_job()
self.stdout.write(datasource.get_status_display()) datasource.refresh_from_db()
self.stdout.flush()
except Exception as e: self.stdout.write(datasource.get_status_display())
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) self.stdout.flush()
raise e
if len(options['name']) > 1: if len(options['name']) > 1:
self.stdout.write(f"Finished.") self.stdout.write(f"Finished.")

View File

@ -153,7 +153,7 @@ class DataSource(JobsMixin, PrimaryModel):
return objectchange return objectchange
def enqueue_sync_job(self, request): def enqueue_sync_job(self, request=None):
""" """
Enqueue a background job to synchronize the DataSource by calling sync(). Enqueue a background job to synchronize the DataSource by calling sync().
""" """
@ -165,7 +165,8 @@ class DataSource(JobsMixin, PrimaryModel):
SyncDataSourceJob = import_string('core.jobs.SyncDataSourceJob') SyncDataSourceJob = import_string('core.jobs.SyncDataSourceJob')
return SyncDataSourceJob.enqueue( return SyncDataSourceJob.enqueue(
instance=self, instance=self,
user=request.user user=(request.user if request else None),
run_now=(request is None),
) )
def get_backend(self): def get_backend(self):

View File

@ -198,7 +198,7 @@ class Job(models.Model):
job_end.send(self) job_end.send(self)
@classmethod @classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs): def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, run_now=False, **kwargs):
""" """
Create a Job instance and enqueue a job using the given callable Create a Job instance and enqueue a job using the given callable
@ -209,6 +209,8 @@ class Job(models.Model):
user: The user responsible for running the job user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes) interval: Recurrence interval (in minutes)
run_now: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
""" """
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model) rq_queue_name = get_queue_for_model(object_type.model)
@ -225,6 +227,12 @@ class Job(models.Model):
job_id=uuid.uuid4() job_id=uuid.uuid4()
) )
# Optionally, the job can be run immediately without being scheduled to run in the background.
if run_now:
func(job_id=str(job.job_id), job=job, **kwargs)
return job
# Schedule the job to run asynchronously in the background.
if schedule_at: if schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs) queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
else: else: