Module ploigos_step_runner.decryption_utils

Shared utilities for doing decryption.

Classes

class DecryptionUtils

Shared utilities for doing decryption.

Any values that are decrypted are added to the given list of TextIOSelectiveObfuscator of strings to obfuscate.

Attributes

__obfuscation_streams : list of TextIOSelectiveObfuscator
TextIOSelectiveObfuscators to be sure that any decrypted values are obfuscated on those streams.
__config_value_decryptors : list of ConfigValueDecryptor
ConfigValueDecryptors that can be used to decrypt given ConfigValue.
Expand source code
class DecryptionUtils:
    """Shared utilities for doing decryption.

    Any values that are decrypted are added to the given list of TextIOSelectiveObfuscator
    of strings to obfuscate.

    Attributes
    ----------
    __obfuscation_streams : list of TextIOSelectiveObfuscator
        TextIOSelectiveObfuscators to be sure that any decrypted values are obfuscated
        on those streams.
    __config_value_decryptors : list of ConfigValueDecryptor
        ConfigValueDecryptors that can be used to decrypt given ConfigValue.
    """

    __DEFAULT_DECRYPTORS_MODULE = 'ploigos_step_runner.config.decryptors'

    __obfuscation_streams = []
    __config_value_decryptors = []

    @staticmethod
    def register_obfuscation_stream(obfuscator_stream):
        """Add a TextIOSelectiveObfuscator to obfuscate any decrypted values on.

        Parameters
        ----------
        obfuscator_stream : TextIOSelectiveObfuscator
            TextIOSelectiveObfuscator to be sure that any decrypted values are obfuscated on.

        Raises
        ------
        AssertionError
            If given obfuscator_stream is not a type of TextIOSelectiveObfuscator
        """
        assert isinstance(obfuscator_stream, TextIOSelectiveObfuscator)
        DecryptionUtils.__obfuscation_streams.append(obfuscator_stream)

    @staticmethod
    def register_config_value_decryptor(config_value_decryptor):
        """Add a ConfigValueDecryptor that can be used to decrypt ConfigValues.

        Parameters
        ----------
        config_value_decryptor : ConfigValueDecryptor
            ConfigValueDecryptor to add to the list of decryptors that can be
            used to decrypt ConfigValues.

        Raises
        ------
        AssertionError
            If given config_value_decryptor is not a type of ConfigValueDecryptor
        """
        assert isinstance(config_value_decryptor, ConfigValueDecryptor)
        DecryptionUtils.__config_value_decryptors.append(config_value_decryptor)

    @staticmethod
    def create_and_register_config_value_decryptor(
        config_value_decryptor_implementer_name,
        config_value_decryptor_constructor_params=None
    ):
        """Attempts to dynamically create a ConfigValueDecryptor and register it.

        Parameters
        ----------
        config_value_decryptor_implementer_name : str
            Name of the ConfigValueDecryptor to dynamically load and create.
        config_value_decryptor_constructor_params : dict, optional
            Dictionary of constructor parameters to expand and pass to the ConfigValueDecryptor
            when it is created.

        Raises
        ------
        StepRunnerException
            If could not find class to load
            If loaded class is not a subclass of ConfigValueDecryptor

        See Also
        --------
        register_config_value_decryptor
        """
        if config_value_decryptor_constructor_params is None:
            config_value_decryptor_constructor_params = {}

        decryptor_class = DecryptionUtils.__get_decryption_class(
            config_value_decryptor_implementer_name
        )
        try:
            config_value_decryptor = decryptor_class(**config_value_decryptor_constructor_params)
            DecryptionUtils.register_config_value_decryptor(config_value_decryptor)
        except TypeError as error:
            raise ValueError(
                f"Loaded decryptor class ({decryptor_class}) failed to construct with given" +
                f" constructor arguments ({config_value_decryptor_constructor_params}): {error}"
            ) from error

    @staticmethod
    def decrypt(config_value):
        """If possible decrypt the given ConfigValue using one of the
        registered ConfigValueDecryptors.

        Parameters
        ----------
        config_value : ConfigValue
            ConfigValue to decrypt the value of with one of the
            registered ConfigValueDecryptors if possible.

        Returns
        -------
        obj or None
            Decrypted value of the given ConfigValue or
            None if none of the registered ConfigValueDecryptors can decrypt the given
            ConfigValue.
        """

        decrypted_value = None
        for config_value_decryptor in DecryptionUtils.__config_value_decryptors:
            if config_value_decryptor.can_decrypt(config_value):
                decrypted_value = config_value_decryptor.decrypt(config_value)
                break

        DecryptionUtils.__add_obfuscation_targets(decrypted_value)

        return decrypted_value

    @staticmethod
    def __add_obfuscation_targets(targets):
        if targets is not None:
            for obfuscator_stream in DecryptionUtils.__obfuscation_streams:
                obfuscator_stream.add_obfuscation_targets(targets)

    @staticmethod
    def __get_decryption_class(decryptor_implementer_name):
        """Given a decryptor implementer name dynamically loads the associated Class.

        Parameters
        ----------
        decryptor_implementer_name : str
            Either the short name of a ConfigValueDecryptor class which will be dynamically
            loaded from the 'ploigos_step_runner.config.decryptors' module or
            A class name that includes a dot seperated module name to load the Class from.

        Returns
        -------
        ConfigValueDecryptor
            Dynamically loaded subclass of ConfigValueDecryptor for given decryptor name.

        Raises
        ------
        StepRunnerException
            If could not find class to load
            If loaded class is not a subclass of ConfigValueDecryptor
        """
        parts = decryptor_implementer_name.split('.')
        class_name = parts.pop()
        module_name = '.'.join(parts)

        if not module_name:
            module_name = DecryptionUtils.__DEFAULT_DECRYPTORS_MODULE

        clazz = import_and_get_class(module_name, class_name)
        if not clazz:
            raise StepRunnerException(
                "Could not dynamically load decryptor implementer" +
                f" ({decryptor_implementer_name}) from module ({module_name})" +
                f" with class name ({class_name})"
            )
        if not issubclass(clazz, ConfigValueDecryptor):
            raise StepRunnerException(
                f"For decryptor implementer ({decryptor_implementer_name})" +
                f" dynamically loaded class ({clazz}) which is not sub class of" +
                f" ({ConfigValueDecryptor}) and should be."
            )

        return clazz

Static methods

def create_and_register_config_value_decryptor(config_value_decryptor_implementer_name, config_value_decryptor_constructor_params=None)

Attempts to dynamically create a ConfigValueDecryptor and register it.

Parameters

config_value_decryptor_implementer_name : str
Name of the ConfigValueDecryptor to dynamically load and create.
config_value_decryptor_constructor_params : dict, optional
Dictionary of constructor parameters to expand and pass to the ConfigValueDecryptor when it is created.

Raises

StepRunnerException
If could not find class to load If loaded class is not a subclass of ConfigValueDecryptor

See Also

register_config_value_decryptor

def decrypt(config_value)

If possible decrypt the given ConfigValue using one of the registered ConfigValueDecryptors.

Parameters

config_value : ConfigValue
ConfigValue to decrypt the value of with one of the registered ConfigValueDecryptors if possible.

Returns

obj or None
Decrypted value of the given ConfigValue or None if none of the registered ConfigValueDecryptors can decrypt the given ConfigValue.
def register_config_value_decryptor(config_value_decryptor)

Add a ConfigValueDecryptor that can be used to decrypt ConfigValues.

Parameters

config_value_decryptor : ConfigValueDecryptor
ConfigValueDecryptor to add to the list of decryptors that can be used to decrypt ConfigValues.

Raises

AssertionError
If given config_value_decryptor is not a type of ConfigValueDecryptor
def register_obfuscation_stream(obfuscator_stream)

Add a TextIOSelectiveObfuscator to obfuscate any decrypted values on.

Parameters

obfuscator_stream : TextIOSelectiveObfuscator
TextIOSelectiveObfuscator to be sure that any decrypted values are obfuscated on.

Raises

AssertionError
If given obfuscator_stream is not a type of TextIOSelectiveObfuscator