Replace JobResult.set_status() with terminate()

This commit is contained in:
jeremystretch 2023-02-28 15:19:54 -05:00 committed by Jeremy Stretch
parent 697feed257
commit a8c331f88a
5 changed files with 46 additions and 59 deletions

View File

@ -22,8 +22,9 @@ def sync_datasource(job_result, *args, **kwargs):
# Update the search cache for DataFiles belonging to this source # Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator()) search_backend.cache(datasource.datafiles.iterator())
job_result.terminate()
except SyncError as e: except SyncError as e:
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED) job_result.terminate(status=JobResultStatusChoices.STATUS_ERRORED)
job_result.save()
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
logging.error(e) logging.error(e)

View File

@ -40,17 +40,17 @@ class Command(BaseCommand):
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
the change_logging context manager (which is bypassed if commit == False). the change_logging context manager (which is bypassed if commit == False).
""" """
try:
try: try:
with transaction.atomic(): with transaction.atomic():
script.output = script.run(data=data, commit=commit) script.output = script.run(data=data, commit=commit)
job_result.set_status(JobResultStatusChoices.STATUS_COMPLETED)
if not commit: if not commit:
raise AbortTransaction() raise AbortTransaction()
except AbortTransaction: except AbortTransaction:
script.log_info("Database changes have been reverted automatically.") script.log_info("Database changes have been reverted automatically.")
clear_webhooks.send(request) clear_webhooks.send(request)
job_result.data = ScriptOutputSerializer(script).data
job_result.terminate()
except Exception as e: except Exception as e:
stacktrace = traceback.format_exc() stacktrace = traceback.format_exc()
script.log_failure( script.log_failure(
@ -58,11 +58,9 @@ class Command(BaseCommand):
) )
script.log_info("Database changes have been reverted due to error.") script.log_info("Database changes have been reverted due to error.")
logger.error(f"Exception raised during script execution: {e}") logger.error(f"Exception raised during script execution: {e}")
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
clear_webhooks.send(request) clear_webhooks.send(request)
finally:
job_result.data = ScriptOutputSerializer(script).data job_result.data = ScriptOutputSerializer(script).data
job_result.save() job_result.terminate(status=JobResultStatusChoices.STATUS_ERRORED)
logger.info(f"Script completed in {job_result.duration}") logger.info(f"Script completed in {job_result.duration}")

View File

@ -694,14 +694,16 @@ class JobResult(models.Model):
self.status = JobResultStatusChoices.STATUS_RUNNING self.status = JobResultStatusChoices.STATUS_RUNNING
JobResult.objects.filter(pk=self.pk).update(started=self.started, status=self.status) JobResult.objects.filter(pk=self.pk).update(started=self.started, status=self.status)
def set_status(self, status): def terminate(self, status=JobResultStatusChoices.STATUS_COMPLETED):
""" """
Helper method to change the status of the job result. If the target status is terminal, the completion Mark the job as completed, optionally specifying a particular termination status.
time is also set.
""" """
valid_statuses = JobResultStatusChoices.TERMINAL_STATE_CHOICES
if status not in valid_statuses:
raise ValueError(f"Invalid status for job termination. Choices are: {', '.join(valid_statuses)}")
self.status = status self.status = status
if status in JobResultStatusChoices.TERMINAL_STATE_CHOICES:
self.completed = timezone.now() self.completed = timezone.now()
JobResult.objects.filter(pk=self.pk).update(status=self.status, completed=self.completed)
@classmethod @classmethod
def enqueue_job(cls, func, name, obj_type, user, schedule_at=None, interval=None, *args, **kwargs): def enqueue_job(cls, func, name, obj_type, user, schedule_at=None, interval=None, *args, **kwargs):

View File

@ -85,8 +85,7 @@ def run_report(job_result, *args, **kwargs):
job_result.start() job_result.start()
report.run(job_result) report.run(job_result)
except Exception: except Exception:
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED) job_result.terminate(status=JobResultStatusChoices.STATUS_ERRORED)
job_result.save()
logging.error(f"Error during execution of report {job_result.name}") logging.error(f"Error during execution of report {job_result.name}")
finally: finally:
# Schedule the next job if an interval has been set # Schedule the next job if an interval has been set
@ -241,28 +240,23 @@ class Report(object):
self.pre_run() self.pre_run()
try: try:
for method_name in self.test_methods: for method_name in self.test_methods:
self.active_test = method_name self.active_test = method_name
test_method = getattr(self, method_name) test_method = getattr(self, method_name)
test_method() test_method()
if self.failed: if self.failed:
self.logger.warning("Report failed") self.logger.warning("Report failed")
job_result.status = JobResultStatusChoices.STATUS_FAILED job_result.status = JobResultStatusChoices.STATUS_FAILED
else: else:
self.logger.info("Report completed successfully") self.logger.info("Report completed successfully")
job_result.status = JobResultStatusChoices.STATUS_COMPLETED job_result.status = JobResultStatusChoices.STATUS_COMPLETED
except Exception as e: except Exception as e:
stacktrace = traceback.format_exc() stacktrace = traceback.format_exc()
self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e} <pre>{stacktrace}</pre>") self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e} <pre>{stacktrace}</pre>")
logger.error(f"Exception raised during report execution: {e}") logger.error(f"Exception raised during report execution: {e}")
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED) job_result.terminate(status=JobResultStatusChoices.STATUS_ERRORED)
finally:
job_result.data = self._results job_result.terminate()
job_result.completed = timezone.now()
job_result.save()
# Perform any post-run tasks # Perform any post-run tasks
self.post_run() self.post_run()

View File

@ -459,37 +459,29 @@ def run_script(data, request, commit=True, *args, **kwargs):
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
the change_logging context manager (which is bypassed if commit == False). the change_logging context manager (which is bypassed if commit == False).
""" """
try:
try: try:
with transaction.atomic(): with transaction.atomic():
script.output = script.run(data=data, commit=commit) script.output = script.run(data=data, commit=commit)
job_result.set_status(JobResultStatusChoices.STATUS_COMPLETED)
if not commit: if not commit:
raise AbortTransaction() raise AbortTransaction()
except AbortTransaction: except AbortTransaction:
script.log_info("Database changes have been reverted automatically.") script.log_info("Database changes have been reverted automatically.")
clear_webhooks.send(request) clear_webhooks.send(request)
except AbortScript as e:
script.log_failure(
f"Script aborted with error: {e}"
)
script.log_info("Database changes have been reverted due to error.")
logger.error(f"Script aborted with error: {e}")
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
clear_webhooks.send(request)
except Exception as e:
stacktrace = traceback.format_exc()
script.log_failure(
f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
script.log_info("Database changes have been reverted due to error.")
logger.error(f"Exception raised during script execution: {e}")
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
clear_webhooks.send(request)
finally:
job_result.data = ScriptOutputSerializer(script).data job_result.data = ScriptOutputSerializer(script).data
job_result.save() job_result.terminate()
except Exception as e:
if type(e) is AbortScript:
script.log_failure(f"Script aborted with error: {e}")
logger.error(f"Script aborted with error: {e}")
else:
stacktrace = traceback.format_exc()
script.log_failure(f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```")
logger.error(f"Exception raised during script execution: {e}")
script.log_info("Database changes have been reverted due to error.")
job_result.data = ScriptOutputSerializer(script).data
job_result.terminate(status=JobResultStatusChoices.STATUS_ERRORED)
clear_webhooks.send(request)
logger.info(f"Script completed in {job_result.duration}") logger.info(f"Script completed in {job_result.duration}")