Use SyncDataSourceJob for management command

Instead of maintaining two separate job execution logics, the same job
is now used for both background and interactive execution.
This commit is contained in:
Alexander Haase 2024-06-21 00:52:55 +02:00
parent 957bc3d3de
commit db591d422a
3 changed files with 18 additions and 10 deletions

View File

@ -34,13 +34,12 @@ class Command(BaseCommand):
for i, datasource in enumerate(datasources, start=1):
self.stdout.write(f"[{i}] Syncing {datasource}... ", ending='')
self.stdout.flush()
try:
datasource.sync()
self.stdout.write(datasource.get_status_display())
self.stdout.flush()
except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
raise e
datasource.enqueue_sync_job()
datasource.refresh_from_db()
self.stdout.write(datasource.get_status_display())
self.stdout.flush()
if len(options['name']) > 1:
self.stdout.write(f"Finished.")

View File

@ -153,7 +153,7 @@ class DataSource(JobsMixin, PrimaryModel):
return objectchange
def enqueue_sync_job(self, request):
def enqueue_sync_job(self, request=None):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
"""
@ -165,7 +165,8 @@ class DataSource(JobsMixin, PrimaryModel):
SyncDataSourceJob = import_string('core.jobs.SyncDataSourceJob')
return SyncDataSourceJob.enqueue(
instance=self,
user=request.user
user=(request.user if request else None),
run_now=(request is None),
)
def get_backend(self):

View File

@ -198,7 +198,7 @@ class Job(models.Model):
job_end.send(self)
@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, run_now=False, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable
@ -209,6 +209,8 @@ class Job(models.Model):
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
run_now: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
@ -225,6 +227,12 @@ class Job(models.Model):
job_id=uuid.uuid4()
)
# Optionally, the job can be run immediately without being scheduled to run in the background.
if run_now:
func(job_id=str(job.job_id), job=job, **kwargs)
return job
# Schedule the job to run asynchronously in the background.
if schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
else: