Module ploigos_step_runner.results.workflow_result
Abstract class and helper constants for WorkflowResult
Classes
class WorkflowResult
-
Class to manage a list of StepResults. The WorkflowResult represents ALL previous results.
Expand source code
class WorkflowResult: """ Class to manage a list of StepResults. The WorkflowResult represents ALL previous results. """ def __init__(self): self.__workflow_list = [] @property def workflow_list(self): """Return workflow_list """ return self.__workflow_list def get_artifact_value( self, artifact, step_name=None, sub_step_name=None, environment=None ): # pylint: disable=too-many-boolean-expressions """Search for an artifact. If step_name, sub_step_name, or environment are not provided ensure the artifact comes from the last step that returned that artifact. 1. if step_name is provided, look for the artifact in the step 2. elif step_name and sub_step_name is provided, look for the artifact in the step/sub_step 3. else, search ALL steps for the FIRST match of the artifact. Parameters ---------- artifact: str The artifact name to search for step_name: str optional Optionally search only in one step sub_step_name: str optional Optionally search only in one step environment : str Optional. Environment to get the step result for. Returns ------- Str 'v1.0.2' """ value = None for step_result in reversed(self.workflow_list): if ( \ (not step_name or step_result.step_name == step_name) and \ (not sub_step_name or step_result.sub_step_name == sub_step_name) and \ (not environment or step_result.environment == environment) ): value = step_result.get_artifact_value(name=artifact) if value is not None: break return value def get_evidence_value( self, evidence, step_name=None, sub_step_name=None, environment=None ): # pylint: disable=too-many-boolean-expressions """Search for an evidence. If step_name, sub_step_name, or environment are provided ensure the evidence comes from the first 1. if step_name is provided, look for the evidence in the step 2. elif step_name and sub_step_name is provided, look for the evidence in the step/sub_step 3. else, search ALL steps for the FIRST match of the evidence. Parameters ---------- evidence: str The artifact name to search for step_name: str optional Optionally search only in one step sub_step_name: str optional Optionally search only in one step environment : str Optional. Environment to get the step result for. Returns ------- Str 'v1.0.2' """ value = None for step_result in self.workflow_list: if ( \ (not step_name or step_result.step_name == step_name) and \ (not sub_step_name or step_result.sub_step_name == sub_step_name) and \ (not environment or step_result.environment == environment) ): value = step_result.get_evidence_value(name=evidence) if value is not None: break return value def add_step_result(self, step_result): """Add a single step_result to the workflow list. If the new result step is not already in the list - simply append, done Else - find the old step result - merge the old artifacts into the new artifacts - delete the old step result - append the new step result - note: the delete/append is needed because it is a list Parameters ---------- step_result : StepResult An StepResult object to add to the list Raises ------ Raises a StepRunnerException if an instance other than StepResult is passed as a parameter """ if isinstance(step_result, StepResult): if self.__step_result_exists(step_result): raise StepRunnerException( f'Can not add duplicate StepResult for step ({step_result.step_name}),' f' sub step ({step_result.sub_step_name}),' f' and environment ({step_result.environment}).' ) self.workflow_list.append(step_result) else: raise StepRunnerException('expect StepResult instance type') # ARTIFACT helpers: def write_results_to_yml_file(self, yml_filename): """Write the workflow list in a yaml format to file Parameters ---------- yml_filename : str Name of file to write (eg: step-runner-results/step-runner-results.yml) Raises ------ Raises a RuntimeError if the file cannot be dumped """ try: create_parent_dir(yml_filename) with open(yml_filename, 'w', encoding='utf-8') as file: results = self.__get_all_step_results_dict() yaml.dump(results, file, indent=4) except Exception as error: raise RuntimeError(f'error dumping {yml_filename}: {error}') from error def write_results_to_json_file(self, json_filename): """Write the workflow list in a json format to file. Parameters ---------- json_filename : str Name of file to write (eg: step-runner-results.json) Raises ------ Raises a RuntimeError if the file cannot be dumped """ try: create_parent_dir(json_filename) with open(json_filename, 'w', encoding='utf-8') as file: results = self.__get_all_step_results_dict() json.dump(results, file, indent=4) except Exception as error: raise RuntimeError(f'error dumping {json_filename}: {error}') from error # File handlers @staticmethod def load_from_pickle_file(pickle_filename): """Return the contents of a pickled file. The file is expected to contain WorkflowResult instances Parameters ---------- pickle_filename: str Name of the file to load Raises ------ Raises a StepRunnerException if the file cannot be loaded Raises a StepRunnerException if the file contains non WorkflowResult instances """ try: create_parent_dir(pickle_filename) # if the file does not exist return empty object if not os.path.isfile(pickle_filename): return WorkflowResult() # if the file is empty return empty object if os.path.getsize(pickle_filename) == 0: return WorkflowResult() # check that the file has Workflow object with open(pickle_filename, 'rb') as file: workflow_result = pickle.load(file) if not isinstance(workflow_result, WorkflowResult): raise StepRunnerException(f'error {pickle_filename} has invalid data') return workflow_result except Exception as error: raise StepRunnerException(f'error loading {pickle_filename}: {error}') from error def merge_with_pickle_file(self, pickle_filename): """Merge our workflow list with that stored on disk. When we find overlaps, our in-memory values win. Note: any locking of the pickle file is the responsibility of the caller. Parameters ---------- pickle_filename : the on-disk path to the pickle file to merge with. """ on_disk_wfr = WorkflowResult.load_from_pickle_file(pickle_filename) on_disk_results = on_disk_wfr.workflow_list merged_workflow_list = [] for in_mem_step_result in self.__workflow_list: on_disk_step_result = on_disk_wfr.get_step_result( step_name=in_mem_step_result.step_name, sub_step_name=in_mem_step_result.sub_step_name, environment=in_mem_step_result.environment ) if on_disk_step_result: on_disk_results.remove(on_disk_step_result) # in-memory values win if the two results are unequal if on_disk_step_result != in_mem_step_result: on_disk_step_result.merge(in_mem_step_result) merged_workflow_list.append(on_disk_step_result) else: merged_workflow_list.append(in_mem_step_result) # on_disk_results at this point will only have the step results # that were on disk, but not in memory. merged_workflow_list += on_disk_results self.__workflow_list = merged_workflow_list def write_to_pickle_file(self, pickle_filename): """Write the workflow list in a pickle format to file. Note: any locking of the pickle file is the responsibility of the caller. Parameters ---------- pickle_filename : str Name of file to write (eg: step-runner-results.pkl) Raises ------ Raises a RuntimeError if the file cannot be dumped """ try: create_parent_dir(pickle_filename) with open(pickle_filename, 'wb') as file: pickle.dump(self, file) except Exception as error: raise RuntimeError(f'error dumping {pickle_filename}: {error}') from error def __step_result_exists(self, step_result): """Return True if the provided StepResult exists in our workflow list, False otherwise. Parameters ---------- step_result - a StepResult object to search for Return value ------------ True if StepResult exists, False otherwise """ existing_step_result = self.get_step_result( step_name=step_result.step_name, sub_step_name=step_result.sub_step_name, environment=step_result.environment ) return existing_step_result def __get_all_step_results_dict(self): """Get a dictionary of all of the recorded StepResults. Returns ------- results: dict results of all steps from list """ all_results = {} for step_result in self.workflow_list: all_results = deep_merge( dest=all_results, source=step_result.get_step_result_dict(), overwrite_duplicate_keys=True ) step_runner_results = { 'step-runner-results': all_results } return step_runner_results def get_step_result( self, step_name, sub_step_name=None, environment=None ): # pylint: disable=too-many-boolean-expressions """Helper method to return a step result. Parameters ---------- step_name: str Name of step to search for sub_step_name: str Name of sub step to search for environment : str Optional. Environment to get the step result for. Returns ------- StepResult """ for step_result in self.workflow_list: if ( \ (not step_name or step_result.step_name == step_name) and \ (not sub_step_name or step_result.sub_step_name == sub_step_name) and \ (not environment or step_result.environment == environment) ): return step_result return None
Static methods
def load_from_pickle_file(pickle_filename)
-
Return the contents of a pickled file.
The file is expected to contain WorkflowResult instances
Parameters
pickle_filename
:str
Name of the file to load
Raises
Raises a StepRunnerException if the file cannot be loaded
Raises a StepRunnerException if the file contains non WorkflowResult instances
Instance variables
prop workflow_list
-
Return workflow_list
Expand source code
@property def workflow_list(self): """Return workflow_list """ return self.__workflow_list
Methods
def add_step_result(self, step_result)
-
Add a single step_result to the workflow list.
If the new result step is not already in the list - simply append, done Else - find the old step result - merge the old artifacts into the new artifacts - delete the old step result - append the new step result - note: the delete/append is needed because it is a list
Parameters
step_result
:StepResult
An StepResult object to add to the list
Raises
Raises a StepRunnerException if an instance other than
StepResult is passed as a parameter
def get_artifact_value(self, artifact, step_name=None, sub_step_name=None, environment=None)
-
Search for an artifact.
If step_name, sub_step_name, or environment are not provided ensure the artifact comes from the last step that returned that artifact.
- if step_name is provided, look for the artifact in the step
- elif step_name and sub_step_name is provided, look for the artifact in the step/sub_step
- else, search ALL steps for the FIRST match of the artifact.
Parameters
artifact
:str
- The artifact name to search for
step_name
:str optional
- Optionally search only in one step
sub_step_name
:str optional
- Optionally search only in one step
environment
:str
- Optional. Environment to get the step result for.
Returns
Str
'v1.0.2'
def get_evidence_value(self, evidence, step_name=None, sub_step_name=None, environment=None)
-
Search for an evidence.
If step_name, sub_step_name, or environment are provided ensure the evidence comes from the first
- if step_name is provided, look for the evidence in the step
- elif step_name and sub_step_name is provided, look for the evidence in the step/sub_step
- else, search ALL steps for the FIRST match of the evidence.
Parameters
evidence
:str
- The artifact name to search for
step_name
:str optional
- Optionally search only in one step
sub_step_name
:str optional
- Optionally search only in one step
environment
:str
- Optional. Environment to get the step result for.
Returns
Str
'v1.0.2'
def get_step_result(self, step_name, sub_step_name=None, environment=None)
-
Helper method to return a step result.
Parameters
step_name
:str
- Name of step to search for
sub_step_name
:str
- Name of sub step to search for
environment
:str
- Optional. Environment to get the step result for.
Returns
StepResult
def merge_with_pickle_file(self, pickle_filename)
-
Merge our workflow list with that stored on disk. When we find overlaps, our in-memory values win.
Note: any locking of the pickle file is the responsibility of the caller.
Parameters
pickle_filename : the on-disk path to the pickle file to merge with.
def write_results_to_json_file(self, json_filename)
-
Write the workflow list in a json format to file.
Parameters
json_filename
:str
- Name of file to write (eg: step-runner-results.json)
Raises
Raises a RuntimeError if the file cannot be dumped
def write_results_to_yml_file(self, yml_filename)
-
Write the workflow list in a yaml format to file
Parameters
yml_filename
:str
- Name of file to write (eg: step-runner-results/step-runner-results.yml)
Raises
Raises a RuntimeError if the file cannot be dumped
def write_to_pickle_file(self, pickle_filename)
-
Write the workflow list in a pickle format to file.
Note: any locking of the pickle file is the responsibility of the caller.
Parameters
pickle_filename
:str
- Name of file to write (eg: step-runner-results.pkl)
Raises
Raises a RuntimeError if the file cannot be dumped