Module ploigos_step_runner.config.config
Representation of configuration for workflow.
Classes
class Config (config=None)
-
Representation of configuration for Ploigos workflow.
Parameters
config
:dict, list, str (file
ordirectory)
, optional- A dictionary that is a valid configuration, or a string that is a path to a YAML or JSON file that is a valid configuration, or a string that is a path to a directory containing one or more files that are valid YAML or JSON files that are valid configurations, or a list of any of the former.
Attributes
__global_defaults
:dict
__global_environment_defaults
:dict
__step_configs
:dict
ofstr (step names) to StepConfig
Raises
ValueError
- If given config is not of expected type.
AssertionError
- If given config contains any invalid configurations.
Expand source code
class Config: """Representation of configuration for Ploigos workflow. Parameters ---------- config : dict, list, str (file or directory), optional A dictionary that is a valid configuration, or a string that is a path to a YAML or JSON file that is a valid configuration, or a string that is a path to a directory containing one or more files that are valid YAML or JSON files that are valid configurations, or a list of any of the former. Attributes ---------- __global_defaults : dict __global_environment_defaults : dict __step_configs : dict of str (step names) to StepConfig Raises ------ ValueError If given config is not of expected type. AssertionError If given config contains any invalid configurations. """ CONFIG_KEY = 'step-runner-config' CONFIG_KEY_GLOBAL_DEFAULTS = 'global-defaults' CONFIG_KEY_GLOBAL_ENVIRONMENT_DEFAULTS = 'global-environment-defaults' CONFIG_KEY_ENVIRONMENT_NAME = 'environment-name' CONFIG_KEY_CONTINUE_SUB_STEPS_ON_FAILURE = 'continue-sub-steps-on-failure' CONFIG_KEY_STEP_IMPLEMENTER = 'implementer' CONFIG_KEY_SUB_STEP_NAME = 'name' CONFIG_KEY_SUB_STEP_CONFIG = 'config' CONFIG_KEY_SUB_STEP_ENVIRONMENT_CONFIG = 'environment-config' CONFIG_KEY_DECRYPTORS = 'config-decryptors' CONFIG_KEY_DECRYPTOR_IMPLEMENTER = 'implementer' CONFIG_KEY_DECRYPTOR_CONFIG = 'config' def __init__(self, config=None): self.__global_defaults = {} self.__global_environment_defaults = {} self.__step_configs = {} if config is not None: self.add_config(config) @property def global_defaults(self): """Get a deep copy of the global defaults. Returns ------- dict Deep copy of the global defaults. """ return copy.deepcopy(self.__global_defaults) @property def global_environment_defaults(self): """Deep copy of all global environment defaults for all environments. Returns ------- dict deep copy of all global environment defaults. """ return copy.deepcopy(self.__global_environment_defaults) @property def step_configs(self): """ Returns ------- list of StepConfig """ return self.__step_configs def get_global_environment_defaults_for_environment(self, env): """Get a deep copy of all of the global environment defaults or for a given an environment. Parameters ---------- env : str The global environment defaults for the given environment. If given environment name does not exist in environment defaults then empty dict. Returns ------- dict Deep copy of the global environment defaults for the given environment or empty dict if no environment given or environment does not exist in the defaults """ if env is not None: if env in self.__global_environment_defaults: global_environment_defaults = copy.deepcopy(self.__global_environment_defaults[env]) else: global_environment_defaults = {} else: global_environment_defaults = {} return global_environment_defaults def get_step_config(self, step_name): """Get the step config for a given step name. Parameters ---------- step_name : str Name of the step to get the step configuration for. Returns ------- StepConfig Step configuration for the given step name or None if does not exist """ if step_name in self.step_configs: step_config = self.step_configs[step_name] else: step_config = None return step_config def get_sub_step_configs(self, step_name): """Gets lit of configured sub step configurations for a step with the given name. Parameters ---------- step_name : str Name of step to get configured sub steps for. Returns ------- list of SubStepConfig List of configured sub step configurations for the step with the given name. """ if step_name in self.step_configs: sub_step_configs = self.step_configs[step_name].sub_steps else: sub_step_configs = [] return sub_step_configs def add_config(self, config): """Parses, validates, and adds a given config to this Config. Parameters ---------- config : dict, list, str (file or directory) A dictionary that is a valid configuration, or aa string that is a path to a YAML or JSON file that is a valid configuration, or a string that is a path to a directory containing one or more files that are valid YAML or JSON files that are valid configurations, or a list of any of the former. Raises ------ ValueError If given config is not of expected type. AssertionError If given config contains any invalid configurations. """ if isinstance(config, dict): # add the config self.__add_config_dict(config) elif isinstance(config, list): # for each item in the list, treat it as a config to add for _config in config: self.add_config(_config) elif isinstance(config, str): if os.path.isfile(config): self.__add_config_file(config) elif os.path.isdir(config): # for each recursively found file in the directory add the config file config_dir_files = glob.glob(config + '/**', recursive=True) found_nested_file = False for config_dir_file in config_dir_files: if os.path.isfile(config_dir_file): found_nested_file = True self.__add_config_file(config_dir_file) if not found_nested_file: raise ValueError( f"Given config string ({config}) is a directory" + " with no recursive children files." ) else: raise ValueError( f"Given config string ({config}) is not a valid path." ) else: raise ValueError( f"Given config ({config}) is unexpected type ({type(config)}) " + "not a dictionary, string, or list of former." ) def set_step_config_overrides(self, step_name, step_config_overrides): """Sets configuration overrides for all sub steps of a given step. Notes ----- If step configuration overrides are already set for the given step then the existing overides will be replaced with the newly given step configuration overrides. Parameters ---------- step_name : str Name of step to add configuration overrides for all sub steps for. step_config_overrides : dict Overrides for all sub steps for the step with the given name. """ if step_name not in self.step_configs: self.step_configs[step_name] = StepConfig(self, step_name) self.step_configs[step_name].step_config_overrides = step_config_overrides def __add_config_file(self, config_file): """Adds a JSON or YAML file as config to this Config. Parameters ---------- config_file : str (file path) A string that is a path to an existing YAML or JSON file to parse and validate as a configuration to add to this Config. Raises ------ ValueError If can not parse given file as YAML or JSON AssertionError If dictionary parsed from given YAML or JSON file is not a valid config. """ # parse the configuration file try: parsed_config_file = parse_yaml_or_json_file(config_file) except ValueError as error: raise ValueError( f"Error parsing config file ({config_file}) as json or yaml" ) from error # add the config parsed from file try: self.__add_config_dict(parsed_config_file, config_file) except AssertionError as error: raise AssertionError( f"Failed to add parsed configuration file ({config_file}): {error}" ) from error def __add_config_dict(self, config_dict, source_file_path=None): # pylint: disable=too-many-locals, too-many-branches, too-many-statements """Add a configuration dictionary to the list of configuration dictionaries. Parameters ---------- config_dict : dict A dictionary to validate as a configuration and to add to this Config. source_file_path : str, optional File path to the file from which the given config_dict came from. Raises ------ AssertionError If the given config_dict is not a valid configuration dictionary. If attempt to update an existing sub step and new and existing sub step implementers do not match. If sub step does not define a step implementer. ValueError If duplicative leaf keys when merging global defaults If duplicative leaf keys when merging global env defaults If step config is not of type dict or list If new sub step configuration has duplicative leaf keys to existing sub step configuration. If new sub step environment configuration has duplicative leaf keys to existing sub step environment configuration. """ assert Config.CONFIG_KEY in config_dict, \ "Failed to add invalid config. " + \ f"Missing expected top level key ({Config.CONFIG_KEY}): " + \ f"{config_dict}" # if file path given use that as the source when creating ConfigValue objects # else use a copy of the given configuration dictionary if source_file_path is not None: parent_source = source_file_path else: parent_source = copy.deepcopy(config_dict) # convert all the leaves of the configuration dictionary under # the Config.CONFIG_KEY to ConfigValue objects config_values = ConfigValue.convert_leaves_to_config_values( values=copy.deepcopy(config_dict[Config.CONFIG_KEY]), parent_source=parent_source, path_parts=[Config.CONFIG_KEY] ) for key, value in config_values.items(): # if global default key # else if global env defaults key # else assume step config if key == Config.CONFIG_KEY_GLOBAL_DEFAULTS: try: self.__global_defaults = deep_merge( copy.deepcopy(self.__global_defaults), copy.deepcopy(value) ) except ValueError as error: raise ValueError( f"Error merging global defaults: {error}" ) from error elif key == Config.CONFIG_KEY_GLOBAL_ENVIRONMENT_DEFAULTS: for env, env_config in value.items(): if env not in self.__global_environment_defaults: self.__global_environment_defaults[env] = { Config.CONFIG_KEY_ENVIRONMENT_NAME: env } try: self.__global_environment_defaults[env] = deep_merge( copy.deepcopy(self.__global_environment_defaults[env]), copy.deepcopy(env_config) ) except ValueError as error: raise ValueError( f"Error merging global environment ({env}) defaults: {error}" ) from error elif key == Config.CONFIG_KEY_DECRYPTORS: config_decryptor_definitions = ConfigValue.convert_leaves_to_values(value) Config.parse_and_register_decryptors_definitions(config_decryptor_definitions) else: step_name = key step_config = value # if step_config is dict then assume step with single sub step if isinstance(step_config, dict): sub_steps = [step_config] elif isinstance(step_config, list): sub_steps = step_config else: raise ValueError( f"Expected step ({step_name}) to have have step config ({step_config})" + f" of type dict or list but got: {type(step_config)}" ) for sub_step in sub_steps: assert Config.CONFIG_KEY_STEP_IMPLEMENTER in sub_step, \ f"Step ({step_name}) defines a single sub step with values " + \ f"({sub_step}) but is missing value for key: " + \ f"{Config.CONFIG_KEY_STEP_IMPLEMENTER}" sub_step_implementer_name = \ sub_step[Config.CONFIG_KEY_STEP_IMPLEMENTER].value # if sub step name given # else if no sub step name given use step implementer as sub step name if Config.CONFIG_KEY_SUB_STEP_NAME in sub_step: sub_step_name = sub_step[Config.CONFIG_KEY_SUB_STEP_NAME].value else: sub_step_name = sub_step_implementer_name # determine sub step config if Config.CONFIG_KEY_SUB_STEP_CONFIG in sub_step: sub_step_config_dict = copy.deepcopy( sub_step[Config.CONFIG_KEY_SUB_STEP_CONFIG]) else: sub_step_config_dict = {} # determine sub step environment config if Config.CONFIG_KEY_SUB_STEP_ENVIRONMENT_CONFIG in sub_step: sub_step_env_config = copy.deepcopy( sub_step[Config.CONFIG_KEY_SUB_STEP_ENVIRONMENT_CONFIG]) else: sub_step_env_config = {} # determine if continue sub steps on this sub step failure sub_step_contine_sub_steps_on_failure = False if Config.CONFIG_KEY_CONTINUE_SUB_STEPS_ON_FAILURE in sub_step: sub_step_contine_sub_steps_on_failure = sub_step[ Config.CONFIG_KEY_CONTINUE_SUB_STEPS_ON_FAILURE ] if isinstance(sub_step_contine_sub_steps_on_failure.value, bool): sub_step_contine_sub_steps_on_failure = \ sub_step_contine_sub_steps_on_failure.value else: sub_step_contine_sub_steps_on_failure = bool( strtobool(sub_step_contine_sub_steps_on_failure.value) ) self.add_or_update_step_config( step_name=step_name, sub_step_name=sub_step_name, sub_step_implementer_name=sub_step_implementer_name, sub_step_config_dict=sub_step_config_dict, sub_step_env_config=sub_step_env_config, sub_step_contine_sub_steps_on_failure=sub_step_contine_sub_steps_on_failure ) @staticmethod def parse_and_register_decryptors_definitions(decryptors_definitions): """Parse decryptor definitions from a list and then register them with the DecryptionUtils. Parameters ---------- decryptors_definitions : list of dicts List of decryptor definitions. Each element should be a dict with at least an 'implementer' key with a string value and optionally a 'config' key with a dict value. Raises ------ AssertionError If decryptors_definitions is not a list. If a decryptor definition does not have a Config.CONFIG_KEY_DECRYPTOR_IMPLEMENTER key. """ assert isinstance(decryptors_definitions, list), \ f"Decryptors configuration ({decryptors_definitions}) must be of type " + \ f"(list) got: {type(decryptors_definitions)}" for decryptor_definition in decryptors_definitions: assert Config.CONFIG_KEY_DECRYPTOR_IMPLEMENTER in decryptor_definition, \ "Decryptor configuration is missing key " + \ f"({Config.CONFIG_KEY_DECRYPTOR_IMPLEMENTER}): {decryptor_definition}" decryptor_implementer_name = \ decryptor_definition[Config.CONFIG_KEY_DECRYPTOR_IMPLEMENTER] if Config.CONFIG_KEY_DECRYPTOR_CONFIG in decryptor_definition: decryptor_config = decryptor_definition[Config.CONFIG_KEY_DECRYPTOR_CONFIG] else: decryptor_config = {} DecryptionUtils.create_and_register_config_value_decryptor( decryptor_implementer_name, decryptor_config ) def add_or_update_step_config( # pylint: disable=too-many-arguments self, step_name, sub_step_name, sub_step_implementer_name, sub_step_config_dict, sub_step_env_config, sub_step_contine_sub_steps_on_failure=False ): """Adds a new step configuration with a single new sub step or updates an existing step with new or updated sub step. Parameters ---------- step_name : str Name of step to create or update. sub_step_name : str Name of the sub step to add or update on the new or updated step. sub_step_implementer_name : str Name of the sub step implementer for the sub step being added or updated. If updating this can not be different then existing sub step with the same name. sub_step_config_dict : dict, optional Sub step configuration to add or update for named sub step on the new or updated step. If updating this can not have any duplicative leaf keys to the existing sub step configuration. sub_step_env_config : dict, optional Sub step environment configuration to add or update for named sub step on the new or updated step. If updating this can not have any duplicative leaf keys to the existing sub step environment configuration. sub_step_contine_sub_steps_on_failure : bool True to continue executing other sub steps in current step if this sub step fails. False to fail all step execution if this sub step fails. Raises ------ AssertionError If attempt to update an existing sub step and new and existing sub step implementers do not match. ValueError If new sub step configuration has duplicative leaf keys to existing sub step configuration. If new sub step environment configuration has duplicative leaf keys to existing sub step environment configuration. """ if step_name not in self.step_configs: self.step_configs[step_name] = StepConfig(self, step_name) step_config = self.step_configs[step_name] step_config.add_or_update_sub_step_config( sub_step_name=sub_step_name, sub_step_implementer_name=sub_step_implementer_name, sub_step_config_dict=sub_step_config_dict, sub_step_env_config=sub_step_env_config, sub_step_contine_sub_steps_on_failure=sub_step_contine_sub_steps_on_failure )
Class variables
var CONFIG_KEY
var CONFIG_KEY_CONTINUE_SUB_STEPS_ON_FAILURE
var CONFIG_KEY_DECRYPTORS
var CONFIG_KEY_DECRYPTOR_CONFIG
var CONFIG_KEY_DECRYPTOR_IMPLEMENTER
var CONFIG_KEY_ENVIRONMENT_NAME
var CONFIG_KEY_GLOBAL_DEFAULTS
var CONFIG_KEY_GLOBAL_ENVIRONMENT_DEFAULTS
var CONFIG_KEY_STEP_IMPLEMENTER
var CONFIG_KEY_SUB_STEP_CONFIG
var CONFIG_KEY_SUB_STEP_ENVIRONMENT_CONFIG
var CONFIG_KEY_SUB_STEP_NAME
Static methods
def parse_and_register_decryptors_definitions(decryptors_definitions)
-
Parse decryptor definitions from a list and then register them with the DecryptionUtils.
Parameters
decryptors_definitions
:list
ofdicts
- List of decryptor definitions. Each element should be a dict with at least an 'implementer' key with a string value and optionally a 'config' key with a dict value.
Raises
AssertionError
- If decryptors_definitions is not a list. If a decryptor definition does not have a Config.CONFIG_KEY_DECRYPTOR_IMPLEMENTER key.
Instance variables
prop global_defaults
-
Get a deep copy of the global defaults.
Returns
dict
- Deep copy of the global defaults.
Expand source code
@property def global_defaults(self): """Get a deep copy of the global defaults. Returns ------- dict Deep copy of the global defaults. """ return copy.deepcopy(self.__global_defaults)
prop global_environment_defaults
-
Deep copy of all global environment defaults for all environments.
Returns
dict
- deep copy of all global environment defaults.
Expand source code
@property def global_environment_defaults(self): """Deep copy of all global environment defaults for all environments. Returns ------- dict deep copy of all global environment defaults. """ return copy.deepcopy(self.__global_environment_defaults)
prop step_configs
-
Returns
list
ofStepConfig
Expand source code
@property def step_configs(self): """ Returns ------- list of StepConfig """ return self.__step_configs
Methods
def add_config(self, config)
-
Parses, validates, and adds a given config to this Config.
Parameters
config
:dict, list, str (file
ordirectory)
- A dictionary that is a valid configuration, or aa string that is a path to a YAML or JSON file that is a valid configuration, or a string that is a path to a directory containing one or more files that are valid YAML or JSON files that are valid configurations, or a list of any of the former.
Raises
ValueError
- If given config is not of expected type.
AssertionError
- If given config contains any invalid configurations.
def add_or_update_step_config(self, step_name, sub_step_name, sub_step_implementer_name, sub_step_config_dict, sub_step_env_config, sub_step_contine_sub_steps_on_failure=False)
-
Adds a new step configuration with a single new sub step or updates an existing step with new or updated sub step.
Parameters
step_name
:str
- Name of step to create or update.
sub_step_name
:str
- Name of the sub step to add or update on the new or updated step.
sub_step_implementer_name
:str
- Name of the sub step implementer for the sub step being added or updated. If updating this can not be different then existing sub step with the same name.
sub_step_config_dict
:dict
, optional- Sub step configuration to add or update for named sub step on the new or updated step. If updating this can not have any duplicative leaf keys to the existing sub step configuration.
sub_step_env_config
:dict
, optional- Sub step environment configuration to add or update for named sub step on the new or updated step. If updating this can not have any duplicative leaf keys to the existing sub step environment configuration.
sub_step_contine_sub_steps_on_failure
:bool
- True to continue executing other sub steps in current step if this sub step fails. False to fail all step execution if this sub step fails.
Raises
AssertionError
- If attempt to update an existing sub step and new and existing sub step implementers do not match.
ValueError
- If new sub step configuration has duplicative leaf keys to existing sub step configuration. If new sub step environment configuration has duplicative leaf keys to existing sub step environment configuration.
def get_global_environment_defaults_for_environment(self, env)
-
Get a deep copy of all of the global environment defaults or for a given an environment.
Parameters
env
:str
- The global environment defaults for the given environment. If given environment name does not exist in environment defaults then empty dict.
Returns
dict
- Deep copy of the global environment defaults for the given environment or empty dict if no environment given or environment does not exist in the defaults
def get_step_config(self, step_name)
-
Get the step config for a given step name.
Parameters
step_name
:str
- Name of the step to get the step configuration for.
Returns
StepConfig
- Step configuration for the given step name or None if does not exist
def get_sub_step_configs(self, step_name)
-
Gets lit of configured sub step configurations for a step with the given name.
Parameters
step_name
:str
- Name of step to get configured sub steps for.
Returns
list
ofSubStepConfig
- List of configured sub step configurations for the step with the given name.
def set_step_config_overrides(self, step_name, step_config_overrides)
-
Sets configuration overrides for all sub steps of a given step.
Notes
If step configuration overrides are already set for the given step then the existing overides will be replaced with the newly given step configuration overrides.
Parameters
step_name
:str
- Name of step to add configuration overrides for all sub steps for.
step_config_overrides
:dict
- Overrides for all sub steps for the step with the given name.