From 1b8849a43b8e485c1857aeeb69b0da995c9c6d09 Mon Sep 17 00:00:00 2001 From: Arthur Date: Thu, 25 Jan 2024 15:18:58 -0800 Subject: [PATCH] 12510 move reports to use BaseScript --- netbox/extras/reports.py | 187 ++------------------------------------- netbox/extras/scripts.py | 147 ++++++++++++++++++++++++++---- 2 files changed, 134 insertions(+), 200 deletions(-) diff --git a/netbox/extras/reports.py b/netbox/extras/reports.py index 90641cc84..64f7d50a5 100644 --- a/netbox/extras/reports.py +++ b/netbox/extras/reports.py @@ -11,6 +11,7 @@ from core.choices import JobStatusChoices from core.models import Job from .choices import LogLevelChoices from .models import ReportModule +from .scripts import BaseScript __all__ = ( 'Report', @@ -58,191 +59,15 @@ def run_report(job, *args, **kwargs): ) -class Report(object): - """ - NetBox users can extend this object to write custom reports to be used for validating data within NetBox. Each - report must have one or more test methods named `test_*`. - - The `_results` attribute of a completed report will take the following form: - - { - 'test_bar': { - 'failures': 42, - 'log': [ - (, , , ), - ... - ] - }, - 'test_foo': { - 'failures': 0, - 'log': [ - (, , , ), - ... - ] - } - } - """ - description = None - scheduling_enabled = True - job_timeout = None - - def __init__(self): - - self._results = {} - self.active_test = None - self.failed = False - - self.logger = logging.getLogger(f"netbox.reports.{self.__module__}.{self.__class__.__name__}") - - # Compile test methods and initialize results skeleton - test_methods = [] - for method in dir(self): - if method.startswith('test_') and callable(getattr(self, method)): - test_methods.append(method) - self._results[method] = { - 'success': 0, - 'info': 0, - 'warning': 0, - 'failure': 0, - 'log': [], - } - self.test_methods = test_methods - - @classproperty - def module(self): - return self.__module__ - - @classproperty - def class_name(self): - return self.__name__ - - @classproperty - def full_name(self): - return f'{self.module}.{self.class_name}' - - @property - def name(self): - """ - Override this attribute to set a custom display name. - """ - return self.class_name - - @property - def filename(self): - return inspect.getfile(self.__class__) - - @property - def source(self): - return inspect.getsource(self.__class__) - - @property - def is_valid(self): - """ - Indicates whether the report can be run. - """ - return bool(self.test_methods) - - # - # Logging methods - # - - def _log(self, obj, message, level=LogLevelChoices.LOG_DEFAULT): - """ - Log a message from a test method. Do not call this method directly; use one of the log_* wrappers below. - """ - if level not in LogLevelChoices.values(): - raise Exception(f"Unknown logging level: {level}") - self._results[self.active_test]['log'].append(( - timezone.now().isoformat(), - level, - str(obj) if obj else None, - obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None, - message, - )) - - def log(self, message): - """ - Log a message which is not associated with a particular object. - """ - self._log(None, message, level=LogLevelChoices.LOG_DEFAULT) - self.logger.info(message) - +class Report(BaseScript): def log_success(self, obj, message=None): - """ - Record a successful test against an object. Logging a message is optional. - """ - if message: - self._log(obj, message, level=LogLevelChoices.LOG_SUCCESS) - self._results[self.active_test]['success'] += 1 - self.logger.info(f"Success | {obj}: {message}") + super().log_success(message, obj) def log_info(self, obj, message): - """ - Log an informational message. - """ - self._log(obj, message, level=LogLevelChoices.LOG_INFO) - self._results[self.active_test]['info'] += 1 - self.logger.info(f"Info | {obj}: {message}") + super().log_info(message, obj) def log_warning(self, obj, message): - """ - Log a warning. - """ - self._log(obj, message, level=LogLevelChoices.LOG_WARNING) - self._results[self.active_test]['warning'] += 1 - self.logger.info(f"Warning | {obj}: {message}") + super().log_warning(message, obj) def log_failure(self, obj, message): - """ - Log a failure. Calling this method will automatically mark the report as failed. - """ - self._log(obj, message, level=LogLevelChoices.LOG_FAILURE) - self._results[self.active_test]['failure'] += 1 - self.logger.info(f"Failure | {obj}: {message}") - self.failed = True - - # - # Run methods - # - - def run(self, job): - """ - Run the report and save its results. Each test method will be executed in order. - """ - self.logger.info(f"Running report") - - # Perform any post-run tasks - self.pre_run() - - try: - for method_name in self.test_methods: - self.active_test = method_name - test_method = getattr(self, method_name) - test_method() - job.data = self._results - if self.failed: - self.logger.warning("Report failed") - job.terminate(status=JobStatusChoices.STATUS_FAILED) - else: - self.logger.info("Report completed successfully") - job.terminate() - except Exception as e: - stacktrace = traceback.format_exc() - self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e}
{stacktrace}
") - logger.error(f"Exception raised during report execution: {e}") - job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) - - # Perform any post-run tasks - self.post_run() - - def pre_run(self): - """ - Extend this method to include any tasks which should execute *before* the report is run. - """ - pass - - def post_run(self): - """ - Extend this method to include any tasks which should execute *after* the report is run. - """ - pass + super().log_failure(message, obj) diff --git a/netbox/extras/scripts.py b/netbox/extras/scripts.py index f28465547..0994bc66f 100644 --- a/netbox/extras/scripts.py +++ b/netbox/extras/scripts.py @@ -10,6 +10,7 @@ from django import forms from django.conf import settings from django.core.validators import RegexValidator from django.db import transaction +from django.utils import timezone from django.utils.functional import classproperty from core.choices import JobStatusChoices @@ -257,7 +258,7 @@ class IPNetworkVar(ScriptVariable): # Scripts # -class BaseScript: +class BaseScript(object): """ Base model for custom scripts. User classes should inherit from this model if they want to extend Script functionality for use in other subclasses. @@ -270,6 +271,9 @@ class BaseScript: pass def __init__(self): + self._results = {} + self.failed = False + self._current_method = 'main' # Initiate the log self.logger = logging.getLogger(f"netbox.scripts.{self.__module__}.{self.__class__.__name__}") @@ -278,9 +282,26 @@ class BaseScript: # Declare the placeholder for the current request self.request = None - # Grab some info about the script - self.filename = inspect.getfile(self.__class__) - self.source = inspect.getsource(self.__class__) + # Compile test methods and initialize results skeleton + self._results['main'] = { + 'success': 0, + 'info': 0, + 'warning': 0, + 'failure': 0, + 'log': [], + } + test_methods = [] + for method in dir(self): + if method.startswith('test_') and callable(getattr(self, method)): + test_methods.append(method) + self._results[method] = { + 'success': 0, + 'info': 0, + 'warning': 0, + 'failure': 0, + 'log': [], + } + self.test_methods = test_methods def __str__(self): return self.name @@ -331,6 +352,21 @@ class BaseScript: def scheduling_enabled(self): return getattr(self.Meta, 'scheduling_enabled', True) + @property + def filename(self): + return inspect.getfile(self.__class__) + + @property + def source(self): + return inspect.getsource(self.__class__) + + @property + def is_valid(self): + """ + Indicates whether the report can be run. + """ + return bool(self.test_methods) + @classmethod def _get_vars(cls): vars = {} @@ -399,25 +435,53 @@ class BaseScript: # Logging - def log_debug(self, message): - self.logger.log(logging.DEBUG, message) - self.log.append((LogLevelChoices.LOG_DEFAULT, str(message))) + def _log(self, message, obj=None, log_level=LogLevelChoices.LOG_DEFAULT, level=logging.INFO): + """ + Log a message from a test method. Do not call this method directly; use one of the log_* wrappers below. + """ + if log_level not in LogLevelChoices.values(): + raise Exception(f"Unknown logging level: {log_level}") - def log_success(self, message): - self.logger.log(logging.INFO, message) # No syslog equivalent for SUCCESS - self.log.append((LogLevelChoices.LOG_SUCCESS, str(message))) + if message: + self._results[self._current_method]['log'].append(( + timezone.now().isoformat(), + log_level, + str(obj) if obj else None, + obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None, + message, + )) - def log_info(self, message): - self.logger.log(logging.INFO, message) - self.log.append((LogLevelChoices.LOG_INFO, str(message))) + if log_level != LogLevelChoices.LOG_DEFAULT: + self._results[self._current_method][log_level] += 1 - def log_warning(self, message): - self.logger.log(logging.WARNING, message) - self.log.append((LogLevelChoices.LOG_WARNING, str(message))) + if self._current_method != 'main': + self._results['main'][log_level] += 1 - def log_failure(self, message): - self.logger.log(logging.ERROR, message) - self.log.append((LogLevelChoices.LOG_FAILURE, str(message))) + if obj: + self.logger.log(level, f"{log_level.capitalize()} | {obj}: {message}") + else: + self.logger.log(level, message) # No syslog equivalent for SUCCESS + + def log(self, message): + """ + Log a message which is not associated with a particular object. + """ + self._log(str(message), None, log_level=LogLevelChoices.LOG_DEFAULT, level=logging.INFO) + + def log_debug(self, message, obj=None): + self._log(str(message), obj, log_level=LogLevelChoices.LOG_DEFAULT, level=logging.DEBUG) + + def log_success(self, message=None, obj=None): + self._log(str(message), obj, log_level=LogLevelChoices.LOG_SUCCESS, level=logging.INFO) + + def log_info(self, message, obj=None): + self._log(str(message), obj, log_level=LogLevelChoices.LOG_INFO, level=logging.INFO) + + def log_warning(self, message, obj=None): + self._log(str(message), obj, log_level=LogLevelChoices.LOG_WARNING, level=logging.WARNING) + + def log_failure(self, message, obj=None): + self._log(str(message), obj, log_level=LogLevelChoices.LOG_FAILURE, level=logging.ERROR) # Convenience functions @@ -446,6 +510,51 @@ class BaseScript: return data + def run_test_scripts(self, job): + """ + Run the report and save its results. Each test method will be executed in order. + """ + self.logger.info(f"Running report") + + # Perform any post-run tasks + self.pre_run() + + try: + for method_name in self.test_methods: + self._current_method = method_name + test_method = getattr(self, method_name) + test_method() + job.data = self._results + if self.failed: + self.logger.warning("Report failed") + job.terminate(status=JobStatusChoices.STATUS_FAILED) + else: + self.logger.info("Report completed successfully") + job.terminate() + except Exception as e: + stacktrace = traceback.format_exc() + self.log_failure(None, f"An exception occurred: {type(e).__name__}: {e}
{stacktrace}
") + logger.error(f"Exception raised during report execution: {e}") + job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) + + # Perform any post-run tasks + self.post_run() + + def run(self, job): + self.run_test_scripts(job) + + def pre_run(self): + """ + Extend this method to include any tasks which should execute *before* the report is run. + """ + pass + + def post_run(self): + """ + Extend this method to include any tasks which should execute *after* the report is run. + """ + pass + class Script(BaseScript): """