Module ploigos_step_runner.results.workflow_result

Abstract class and helper constants for WorkflowResult

Classes

class WorkflowResult

Class to manage a list of StepResults. The WorkflowResult represents ALL previous results.

Expand source code
class WorkflowResult:
    """
    Class to manage a list of StepResults.
    The WorkflowResult represents ALL previous results.
    """

    def __init__(self):
        self.__workflow_list = []

    @property
    def workflow_list(self):
        """Return workflow_list
        """
        return self.__workflow_list

    def get_artifact_value(
        self,
        artifact,
        step_name=None,
        sub_step_name=None,
        environment=None
    ): # pylint: disable=too-many-boolean-expressions
        """Search for an artifact.

        If step_name, sub_step_name, or environment are not provided ensure the artifact comes
        from the last step that returned that artifact.

        1.  if step_name is provided, look for the artifact in the step
        2.  elif step_name and sub_step_name is provided, look for the artifact in the step/sub_step
        3.  else, search ALL steps for the FIRST match of the artifact.

        Parameters
        ----------
        artifact: str
           The artifact name to search for
        step_name: str optional
           Optionally search only in one step
        sub_step_name: str optional
            Optionally search only in one step
        environment : str
            Optional. Environment to get the step result for.

        Returns
        -------
        Str
           'v1.0.2'
        """

        value = None
        for step_result in reversed(self.workflow_list):
            if ( \
                (not step_name or step_result.step_name == step_name) and \
                (not sub_step_name or step_result.sub_step_name == sub_step_name) and \
                (not environment or step_result.environment == environment)
            ):
                value = step_result.get_artifact_value(name=artifact)
                if value is not None:
                    break

        return value

    def get_evidence_value(
        self,
        evidence,
        step_name=None,
        sub_step_name=None,
        environment=None
    ): # pylint: disable=too-many-boolean-expressions
        """Search for an evidence.

        If step_name, sub_step_name, or environment are provided ensure the evidence comes
        from the first

        1.  if step_name is provided, look for the evidence in the step
        2.  elif step_name and sub_step_name is provided, look for the evidence in the step/sub_step
        3.  else, search ALL steps for the FIRST match of the evidence.

        Parameters
        ----------
        evidence: str
           The artifact name to search for
        step_name: str optional
           Optionally search only in one step
        sub_step_name: str optional
            Optionally search only in one step
        environment : str
            Optional. Environment to get the step result for.

        Returns
        -------
        Str
           'v1.0.2'
        """

        value = None
        for step_result in self.workflow_list:
            if ( \
                (not step_name or step_result.step_name == step_name) and \
                (not sub_step_name or step_result.sub_step_name == sub_step_name) and \
                (not environment or step_result.environment == environment)
            ):
                value = step_result.get_evidence_value(name=evidence)
                if value is not None:
                    break

        return value

    def add_step_result(self, step_result):
        """Add a single step_result to the workflow list.

        If the new result step is not already in the list
           - simply append, done
        Else
           - find the old step result
           - merge the old artifacts into the new artifacts
           - delete the old step result
           - append the new step result
           - note: the delete/append is needed because it is a list

        Parameters
        ----------
        step_result : StepResult
           An StepResult object to add to the list

        Raises
        ------
        Raises a StepRunnerException if an instance other than
        StepResult is passed as a parameter
        """

        if isinstance(step_result, StepResult):
            if self.__step_result_exists(step_result):
                raise StepRunnerException(
                    f'Can not add duplicate StepResult for step ({step_result.step_name}),'
                    f' sub step ({step_result.sub_step_name}),'
                    f' and environment ({step_result.environment}).'
                )

            self.workflow_list.append(step_result)

        else:
            raise StepRunnerException('expect StepResult instance type')

    # ARTIFACT helpers:
    def write_results_to_yml_file(self, yml_filename):
        """Write the workflow list in a yaml format to file

        Parameters
        ----------
        yml_filename : str
             Name of file to write (eg: step-runner-results/step-runner-results.yml)

        Raises
        ------
        Raises a RuntimeError if the file cannot be dumped
        """
        try:
            create_parent_dir(yml_filename)
            with open(yml_filename, 'w', encoding='utf-8') as file:
                results = self.__get_all_step_results_dict()
                yaml.dump(results, file, indent=4)
        except Exception as error:
            raise RuntimeError(f'error dumping {yml_filename}: {error}') from error

    def write_results_to_json_file(self, json_filename):
        """Write the workflow list in a json format to file.

        Parameters
        ----------
        json_filename : str
             Name of file to write (eg: step-runner-results.json)

        Raises
        ------
        Raises a RuntimeError if the file cannot be dumped
        """
        try:
            create_parent_dir(json_filename)
            with open(json_filename, 'w', encoding='utf-8') as file:
                results = self.__get_all_step_results_dict()
                json.dump(results, file, indent=4)
        except Exception as error:
            raise RuntimeError(f'error dumping {json_filename}: {error}') from error

    # File handlers

    @staticmethod
    def load_from_pickle_file(pickle_filename):
        """Return the contents of a pickled file.

        The file is expected to contain WorkflowResult instances

        Parameters
        ----------
        pickle_filename: str
           Name of the file to load

        Raises
        ------
        Raises a StepRunnerException if the file cannot be loaded
        Raises a StepRunnerException if the file contains non WorkflowResult instances
        """
        try:
            create_parent_dir(pickle_filename)

            # if the file does not exist return empty object
            if not os.path.isfile(pickle_filename):
                return WorkflowResult()

            # if the file is empty return empty object
            if os.path.getsize(pickle_filename) == 0:
                return WorkflowResult()

            # check that the file has Workflow object
            with open(pickle_filename, 'rb') as file:
                workflow_result = pickle.load(file)
                if not isinstance(workflow_result, WorkflowResult):
                    raise StepRunnerException(f'error {pickle_filename} has invalid data')
                return workflow_result

        except Exception as error:
            raise StepRunnerException(f'error loading {pickle_filename}: {error}') from error

    def merge_with_pickle_file(self, pickle_filename):
        """Merge our workflow list with that stored on disk.
        When we find overlaps, our in-memory values win.

        Note: any locking of the pickle file is the responsibility
        of the caller.

        Parameters
        ----------
        pickle_filename : the on-disk path to the pickle file to merge with.

        """
        on_disk_wfr = WorkflowResult.load_from_pickle_file(pickle_filename)
        on_disk_results = on_disk_wfr.workflow_list
        merged_workflow_list = []

        for in_mem_step_result in self.__workflow_list:
            on_disk_step_result = on_disk_wfr.get_step_result(
                step_name=in_mem_step_result.step_name,
                sub_step_name=in_mem_step_result.sub_step_name,
                environment=in_mem_step_result.environment
            )

            if on_disk_step_result:
                on_disk_results.remove(on_disk_step_result)

                # in-memory values win if the two results are unequal
                if on_disk_step_result != in_mem_step_result:
                    on_disk_step_result.merge(in_mem_step_result)

                merged_workflow_list.append(on_disk_step_result)

            else:
                merged_workflow_list.append(in_mem_step_result)

        # on_disk_results at this point will only have the step results
        # that were on disk, but not in memory.
        merged_workflow_list += on_disk_results

        self.__workflow_list = merged_workflow_list

    def write_to_pickle_file(self, pickle_filename):
        """Write the workflow list in a pickle format to file.

        Note: any locking of the pickle file is the responsibility
        of the caller.

        Parameters
        ----------
        pickle_filename : str
             Name of file to write (eg: step-runner-results.pkl)

        Raises
        ------
        Raises a RuntimeError if the file cannot be dumped
        """
        try:
            create_parent_dir(pickle_filename)
            with open(pickle_filename, 'wb') as file:
                pickle.dump(self, file)
        except Exception as error:
            raise RuntimeError(f'error dumping {pickle_filename}: {error}') from error

    def __step_result_exists(self, step_result):
        """Return True if the provided StepResult exists in our
        workflow list, False otherwise.

        Parameters
        ----------
        step_result - a StepResult object to search for

        Return value
        ------------
            True if StepResult exists, False otherwise
        """
        existing_step_result = self.get_step_result(
            step_name=step_result.step_name,
            sub_step_name=step_result.sub_step_name,
            environment=step_result.environment
        )
        return existing_step_result

    def __get_all_step_results_dict(self):
        """Get a dictionary of all of the recorded StepResults.

        Returns
        -------
        results: dict
            results of all steps from list
        """
        all_results = {}
        for step_result in self.workflow_list:
            all_results = deep_merge(
                dest=all_results,
                source=step_result.get_step_result_dict(),
                overwrite_duplicate_keys=True
            )
        step_runner_results = {
            'step-runner-results': all_results
        }
        return step_runner_results

    def get_step_result(
        self,
        step_name,
        sub_step_name=None,
        environment=None
    ): # pylint: disable=too-many-boolean-expressions
        """Helper method to return a step result.

        Parameters
        ----------
        step_name: str
            Name of step to search for
        sub_step_name: str
            Name of sub step to search for
        environment : str
            Optional. Environment to get the step result for.

        Returns
        -------
        StepResult
        """

        for step_result in self.workflow_list:
            if ( \
                (not step_name or step_result.step_name == step_name) and \
                (not sub_step_name or step_result.sub_step_name == sub_step_name) and \
                (not environment or step_result.environment == environment)
            ):
                return step_result

        return None

Static methods

def load_from_pickle_file(pickle_filename)

Return the contents of a pickled file.

The file is expected to contain WorkflowResult instances

Parameters

pickle_filename : str
 

Name of the file to load

Raises

Raises a StepRunnerException if the file cannot be loaded
 
Raises a StepRunnerException if the file contains non WorkflowResult instances
 

Instance variables

prop workflow_list

Return workflow_list

Expand source code
@property
def workflow_list(self):
    """Return workflow_list
    """
    return self.__workflow_list

Methods

def add_step_result(self, step_result)

Add a single step_result to the workflow list.

If the new result step is not already in the list - simply append, done Else - find the old step result - merge the old artifacts into the new artifacts - delete the old step result - append the new step result - note: the delete/append is needed because it is a list

Parameters

step_result : StepResult
 

An StepResult object to add to the list

Raises

Raises a StepRunnerException if an instance other than
 
StepResult is passed as a parameter
 
def get_artifact_value(self, artifact, step_name=None, sub_step_name=None, environment=None)

Search for an artifact.

If step_name, sub_step_name, or environment are not provided ensure the artifact comes from the last step that returned that artifact.

  1. if step_name is provided, look for the artifact in the step
  2. elif step_name and sub_step_name is provided, look for the artifact in the step/sub_step
  3. else, search ALL steps for the FIRST match of the artifact.

Parameters

artifact : str
 
The artifact name to search for
step_name : str optional
 
Optionally search only in one step
sub_step_name : str optional
Optionally search only in one step
environment : str
Optional. Environment to get the step result for.

Returns

Str
 

'v1.0.2'

def get_evidence_value(self, evidence, step_name=None, sub_step_name=None, environment=None)

Search for an evidence.

If step_name, sub_step_name, or environment are provided ensure the evidence comes from the first

  1. if step_name is provided, look for the evidence in the step
  2. elif step_name and sub_step_name is provided, look for the evidence in the step/sub_step
  3. else, search ALL steps for the FIRST match of the evidence.

Parameters

evidence : str
 
The artifact name to search for
step_name : str optional
 
Optionally search only in one step
sub_step_name : str optional
Optionally search only in one step
environment : str
Optional. Environment to get the step result for.

Returns

Str
 

'v1.0.2'

def get_step_result(self, step_name, sub_step_name=None, environment=None)

Helper method to return a step result.

Parameters

step_name : str
Name of step to search for
sub_step_name : str
Name of sub step to search for
environment : str
Optional. Environment to get the step result for.

Returns

StepResult
 
def merge_with_pickle_file(self, pickle_filename)

Merge our workflow list with that stored on disk. When we find overlaps, our in-memory values win.

Note: any locking of the pickle file is the responsibility of the caller.

Parameters

pickle_filename : the on-disk path to the pickle file to merge with.

def write_results_to_json_file(self, json_filename)

Write the workflow list in a json format to file.

Parameters

json_filename : str
Name of file to write (eg: step-runner-results.json)

Raises

Raises a RuntimeError if the file cannot be dumped
 
def write_results_to_yml_file(self, yml_filename)

Write the workflow list in a yaml format to file

Parameters

yml_filename : str
Name of file to write (eg: step-runner-results/step-runner-results.yml)

Raises

Raises a RuntimeError if the file cannot be dumped
 
def write_to_pickle_file(self, pickle_filename)

Write the workflow list in a pickle format to file.

Note: any locking of the pickle file is the responsibility of the caller.

Parameters

pickle_filename : str
Name of file to write (eg: step-runner-results.pkl)

Raises

Raises a RuntimeError if the file cannot be dumped