Module ploigos_step_runner.step_runner

Constructs a given named StepImplementer using a given configuration, and runs it.

Classes

class StepRunner (config, results_file_name='step-runner-results.yml', work_dir_path='step-runner-working')

Enables the running of arbitrary Ploigos steps via StepImplementers.

Parameters

config : Config, dict, list, str (file or directory)
A Config object, or a dictionary that is a valid step runner configuration, or a string that is a path to a YAML or JSON file that is a valid step runner configuration, or a string that is a path to a directory containing one or more files that are valid YAML or JSON files that are valid configurations, or a list of any of the former.
results_file_name : str, optional
Path to the file for steps to write their results to Default: step-runner-results.yml
work_dir_path : str, optional
Path to the working folder for step_implementers for runtime files Default: step-runner-working

Raises

ValueError
If given config is not of expected type.
AssertionError
If given config contains any invalid configurations.

Returns

Bool
True if step completed successfully False if step returned an error message
Expand source code
class StepRunner:
    """Enables the running of arbitrary Ploigos steps via StepImplementers.

    Parameters
    ----------
    config : Config, dict, list, str (file or directory)
        A Config object,
        or a dictionary that is a valid step runner configuration,
        or a string that is a path to a YAML or JSON file that is
        a valid step runner configuration,
        or a string that is a path to a directory containing one or more
        files that are valid YAML or JSON files that are valid
        configurations,
        or a list of any of the former.
    results_file_name : str, optional
        Path to the file for steps to write their results to
        Default: step-runner-results.yml
    work_dir_path : str, optional
        Path to the working folder for step_implementers for runtime files
        Default: step-runner-working

    Raises
    ------
    ValueError
        If given config is not of expected type.
    AssertionError
        If given config contains any invalid configurations.

    Returns
    -------
    Bool
        True if step completed successfully
        False if step returned an error message
    """

    __DEFAULT_MODULE = 'ploigos_step_runner.step_implementers'

    def __init__(
        self,
        config,
        results_file_name='step-runner-results.yml',
        work_dir_path='step-runner-working'
    ):
        if isinstance(config, Config):
            self.__config = config
        else:
            self.__config = Config(config)

        self.__results_file_name = results_file_name
        self.__work_dir_path = work_dir_path

        self.__workflow_result = None

    @property
    def config(self):
        """
        Returns
        -------
        Config
            Configuration used by this factory.
        """
        return self.__config

    @property
    def results_file_path(self):
        """Get the full path to the results file.

        Returns
        -------
        str
            Full path to the results file.
        """
        return os.path.join(self.__work_dir_path, self.__results_file_name)

    @property
    def workflow_result_pickle_file_path(self):
        """
        Get the full path to the workflow result pickle file.
        (The 'pickle' file contains the serialized list of step results.)
        The name of the pickle file is the basename of the results_file_name.

        Returns
        -------
        str
           Full path to the workflow pickle (serialized) file.
        """
        pickle_filename = os.path.splitext(self.__results_file_name)[0] + '.pkl'
        return os.path.join(self.__work_dir_path, pickle_filename)

    @property
    def workflow_result(self):
        """
        Returns
        -------
        WorkflowResult
            Object containing a list of dictionary of step results
            from previous steps.
        """
        if not self.__workflow_result:
            self.__workflow_result = WorkflowResult.load_from_pickle_file(
                pickle_filename=self.workflow_result_pickle_file_path
            )
        return self.__workflow_result

    def run_step(self, step_name, environment=None):
        """
        Call the given step.

        Parameters
        ----------
        step_name : str
            Ploigos step to run.
        environment : str, optional
            Name of the environment the step is being run in. Used to determine environment
            specific global defaults and step configuration.

        Raises
        ------
        # todo: doc needs to be updated
        StepRunnerException
            If no StepImplementers have been registered for the given step_name
            If no specific StepImplementer name specified in sub step config
                and no default StepImplementer registered for given step_name.
            If no StepImplementer registered for given step with given implementer name.
        Returns
        -------
        Bool
           True if step completed successfully
           False if step returned an error message
        """

        sub_step_configs = self.config.get_sub_step_configs(step_name)
        assert len(sub_step_configs) != 0, \
            f"Can not run step ({step_name}) because no step configuration provided."

        # for each sub step in the step config get the step implementer and run it
        aggregate_success = True
        for sub_step_config in sub_step_configs:
            sub_step_implementer_name = sub_step_config.sub_step_implementer_name

            step_implementer_class = StepRunner.__get_step_implementer_class(
                step_name,
                sub_step_implementer_name)

            # create the StepImplementer instance
            sub_step = step_implementer_class(
                parent_work_dir_path=self.__work_dir_path,
                config=sub_step_config,
                environment=environment,
                workflow_result=self.workflow_result
            )

            # run the step
            step_result = sub_step.run_step()

            # save the step results
            self.workflow_result.add_step_result(
                step_result=step_result
            )

            # acquire the pickle lock
            with open(
                    self.workflow_result_pickle_file_path + '.lock',
                    'w',
                    encoding='utf-8'
            ) as pickle_lock:
                fcntl.flock(pickle_lock, fcntl.LOCK_EX)

                # we are locked
                try:
                    self.workflow_result.merge_with_pickle_file(
                        pickle_filename=self.workflow_result_pickle_file_path
                    )
                    self.workflow_result.write_to_pickle_file(
                        pickle_filename=self.workflow_result_pickle_file_path
                    )
                    self.workflow_result.write_results_to_yml_file(
                        yml_filename=self.results_file_path
                    )
                finally:
                    fcntl.flock(pickle_lock, fcntl.LOCK_UN)

            # aggregate success
            aggregate_success = (aggregate_success and step_result.success)

            # if this sub step failed and not configured to continue on failure, bail
            # else execute next sub step and continue aggregating success
            if (not step_result.success) and \
                    (not sub_step_config.sub_step_contine_sub_steps_on_failure):
                break

        return aggregate_success

    @staticmethod
    def __get_step_implementer_class(step_name, step_implementer_name):
        """Given a step name and a step implementer name dynamically loads the Class.

        Parameters
        ----------
        step_name : str
            Name of the step to load the given step implementer for.
            This is only used if the given step_implementer_name does not include
            a module path.
        step_implementer_name : str
            Either the short name of a StepImplementer class which will be dynamically
            loaded from the 'ploigos_step_runner.step_implementers.{step_name}' module or
            A class name that includes a dot seperated module name to load the Class from.

        Returns
        -------
        StepImplementer
            Dynamically loaded subclass of StepImplementer for given step name with
            given step implementer name.

        Raises
        ------
        StepRunnerException
            If could not find class to load
            If loaded class is not a subclass of StepImplementer
        """
        parts = step_implementer_name.split('.')
        class_name = parts.pop()
        module_name = '.'.join(parts)

        if not module_name:
            step_module_part = step_name.replace('-', '_')
            module_name = f"{StepRunner.__DEFAULT_MODULE}.{step_module_part}"

        clazz = import_and_get_class(module_name, class_name)
        if not clazz:
            raise StepRunnerException(
                f"Could not dynamically load step ({step_name}) step implementer" +
                f" ({step_implementer_name}) from module ({module_name})" +
                f" with class name ({class_name})"
            )
        if not issubclass(clazz, StepImplementer):
            raise StepRunnerException(
                f"Step ({step_name}) is configured to use step implementer" +
                f" ({step_implementer_name}) from module ({module_name}) with" +
                f" class name ({class_name}), and dynamically loads as class ({clazz})" +
                f" which is not a subclass of required parent class ({StepImplementer}).")

        return clazz

Instance variables

prop config

Returns

Config
Configuration used by this factory.
Expand source code
@property
def config(self):
    """
    Returns
    -------
    Config
        Configuration used by this factory.
    """
    return self.__config
prop results_file_path

Get the full path to the results file.

Returns

str
Full path to the results file.
Expand source code
@property
def results_file_path(self):
    """Get the full path to the results file.

    Returns
    -------
    str
        Full path to the results file.
    """
    return os.path.join(self.__work_dir_path, self.__results_file_name)
prop workflow_result

Returns

WorkflowResult
Object containing a list of dictionary of step results from previous steps.
Expand source code
@property
def workflow_result(self):
    """
    Returns
    -------
    WorkflowResult
        Object containing a list of dictionary of step results
        from previous steps.
    """
    if not self.__workflow_result:
        self.__workflow_result = WorkflowResult.load_from_pickle_file(
            pickle_filename=self.workflow_result_pickle_file_path
        )
    return self.__workflow_result
prop workflow_result_pickle_file_path

Get the full path to the workflow result pickle file. (The 'pickle' file contains the serialized list of step results.) The name of the pickle file is the basename of the results_file_name.

Returns

str
 

Full path to the workflow pickle (serialized) file.

Expand source code
@property
def workflow_result_pickle_file_path(self):
    """
    Get the full path to the workflow result pickle file.
    (The 'pickle' file contains the serialized list of step results.)
    The name of the pickle file is the basename of the results_file_name.

    Returns
    -------
    str
       Full path to the workflow pickle (serialized) file.
    """
    pickle_filename = os.path.splitext(self.__results_file_name)[0] + '.pkl'
    return os.path.join(self.__work_dir_path, pickle_filename)

Methods

def run_step(self, step_name, environment=None)

Call the given step.

Parameters

step_name : str
Ploigos step to run.
environment : str, optional
Name of the environment the step is being run in. Used to determine environment specific global defaults and step configuration.

Raises

todo: doc needs to be updated

StepRunnerException
If no StepImplementers have been registered for the given step_name If no specific StepImplementer name specified in sub step config and no default StepImplementer registered for given step_name. If no StepImplementer registered for given step with given implementer name.

Returns

Bool
 

True if step completed successfully False if step returned an error message