Module ploigos_step_runner.step_implementers.shared.openscap_generic
Step Implementer for the container-image-static-compliance-scan step for OpenSCAP.
Step Configuration
Step configuration expected as input to this step. Could come from either configuration file or from runtime configuration.
Configuration Key | Required? | Default | Description |
---|---|---|---|
container-image-tag |
Yes | Container image tag to scan. | |
oscap-input-definitions-uri |
Yes | URI to the OpenSCAP definitions file to do the evaluation with. Must use protocol file:// | |
oscap-profile |
No | OpenSCAP profile to evaluate. | |
oscap-tailoring-uri |
No | URI to OpenSCAP tailoring file to do the evaluation with. Must use protocol file:// | |
oscap-fetch-remote-resources |
No | True | For Source DataStream and XCCDF files
that have remote references fetch them if
True, else don't.
WARNING: evaluations will not be complete if input defintions require remote resources and this is not True. For disconnected environments the remote internal mirror. |
'oscap-severity' | No | Severity threshold for failing a step. Will fail step on any vulnerability at that severity or higher. Will fail on any severity if unset. Valid severity: low |
Results
Results output by this step.
Result Key | Description |
---|---|
html-report |
HTML report generated by oscap eval |
xml-report |
XML report generated by oscap eval |
stdout-report |
stdout report generated by oscap eval |
Classes
-
A generic OpenSCAP step implementer that can be used for more then one step.
Expected uses: * container-image-static-compliance-scan * container-image-static-vulnerability-scan
Expand source code
class OpenSCAPGeneric(StepImplementer): """A generic OpenSCAP step implementer that can be used for more then one step. Expected uses: * container-image-static-compliance-scan * container-image-static-vulnerability-scan """ # Example Input: # Title RHSA-2020:4186: spice and spice-gtk security update (Important) # Rule xccdf_com.redhat.rhsa_rule_oval-com.redhat.rhsa-def-20204186 # Ident RHSA-2020:4186 # Ident CVE-2020-14355 # Result pass # # Title RHSA-2020:3658: librepo security update (Important) # Rule xccdf_com.redhat.rhsa_rule_oval-com.redhat.rhsa-def-20203658 # Ident RHSA-2020:3658 # Ident CVE-2020-14352 # Result fail # # Matches: # (Title RHSA-2020:4186: spice and spice-gtk security update ((Important)) # Rule xccdf_com.redhat.rhsa_rule_oval-com.redhat.rhsa-def-20204186 # Ident RHSA-2020:4186 # Ident CVE-2020-14355 # Result (pass)) # # (Title RHSA-2020:3658: librepo security update ((Important)) # Rule xccdf_com.redhat.rhsa_rule_oval-com.redhat.rhsa-def-20203658 # Ident RHSA-2020:3658 # Ident CVE-2020-14352 # Result (fail)) # # Named Groups: # [0]ruleblock # Title RHSA-2020:4186: spice and spice-gtk security update ([0]severityImportant) # Rule xccdf_com.redhat.rhsa_rule_oval-com.redhat.rhsa-def-20204186 # Ident RHSA-2020:4186 # Ident CVE-2020-14355 # Result pass # [0]ruleresult # pass # # [1]ruleblock # Title RHSA-2020:3658: librepo security update ([1]severityImportant) # Rule xccdf_com.redhat.rhsa_rule_oval-com.redhat.rhsa-def-20203658 # Ident RHSA-2020:3658 # Ident CVE-2020-14352 # Result fail # [1]ruleresult # fail OSCAP_XCCDF_STDOUT_PATTERN = re.compile( r'(?P<ruleblock>Title.+?(\((?P<severity>.*?)\).+?)?Rule.+?Result\s+(?P<ruleresult>[^\n]+))\n', # pylint: disable=line-too-long re.DOTALL ) OSCAP_XCCDF_STDOUT_FAIL_PATTERN = re.compile(r'fail') # NOTE: oval output far less useful then xccdf output but it is all but given some content # is only given in oval format and therefor supporting this is important # # Example Input: # Definition oval:com.redhat.rhsa:def:20202031: false # Definition oval:com.redhat.rhsa:def:20201998: true # # Matches: # (Definition oval:com.redhat.rhsa:def:20202031: (false)) # (Definition oval:com.redhat.rhsa:def:20201998: (true)) # # Named Groups: # [0]ruleblock # Definition oval:com.redhat.rhsa:def:20202031: false # [0]ruleresult # false # # [1]ruleblock # Definition oval:com.redhat.rhsa:def:20201998: true # [1]ruleresult # true OSCAP_OVAL_STDOUT_PATTERN = re.compile( r'(?P<ruleblock>^.*:\s*(?P<ruleresult>true|false)\s*$)$', re.MULTILINE ) OSCAP_OVAL_STDOUT_FAIL_PATTERN = re.compile(r'true') OSCAP_INFO_DOC_TYPE_PATTERN = re.compile(r'Document type: (?P<doctype>.+)') @staticmethod def step_implementer_config_defaults(): """ Getter for the StepImplementer's configuration defaults. Notes ----- These are the lowest precedence configuration values. Returns ------- dict Default values to use for step configuration values. """ return DEFAULT_CONFIG @staticmethod def _required_config_or_result_keys(): """Getter for step configuration or previous step result artifacts that are required before running this step. See Also -------- _validate_required_config_or_previous_step_result_artifact_keys Returns ------- array_list Array of configuration keys or previous step result artifacts that are required before running the step. """ return REQUIRED_CONFIG_OR_PREVIOUS_STEP_RESULT_ARTIFACT_KEYS def _validate_required_config_or_previous_step_result_artifact_keys(self): """Validates that the required configuration keys or previous step result artifacts are set and have valid values. Validates that: * required configuration is given * oscap-input-definitions-uri - starts with file://|http://|https:// - ends with .xml|.bz2 Raises ------ AssertionError If step configuration or previous step result artifacts have invalid required values """ super()._validate_required_config_or_previous_step_result_artifact_keys() # pylint: disable=protected-access # validate that the given 'oscap-input-definitions-uri' starts with file://|http://|https:// oscap_input_definitions_uri = self.get_value('oscap-input-definitions-uri') assert (re.match(r'^file://|http://|https://', oscap_input_definitions_uri)), \ f"Open SCAP input definitions source ({oscap_input_definitions_uri})" \ f" must start with known protocol (file://|http://|https://)." # validate that the given 'oscap-input-definitions-uri' is an xml or bz2 file oscap_input_definitions_uri_extension = os.path.splitext(oscap_input_definitions_uri)[1] assert (re.match(r'\.xml|\.bz2', oscap_input_definitions_uri_extension)), \ f"Open SCAP input definitions source ({oscap_input_definitions_uri})" \ f" must be of known type (xml|bz2), got: {oscap_input_definitions_uri_extension}" # validate that the give 'oscap-severity' is valid oscap_severity = self.get_value('oscap-severity') if oscap_severity is not None: oscap_severity_index = OpenSCAPGeneric.__parse_sev_to_int( oscap_severity = oscap_severity ) assert (oscap_severity_index is not None), \ f"Open SCAP severity ({oscap_severity})" \ f" must be of know severity (low|moderate|important|critical)." def _run_step(self): # pylint: disable=too-many-locals,too-many-statements """Runs the OpenSCAP eval for a given input file against a given container. """ step_result = StepResult.from_step_implementer(self) # get config image_address = self.get_value([ 'container-image-build-address', 'container-image-push-address', 'container-image-pull-address', 'container-image-address', 'container-image-tag' ]) oscap_profile = self.get_value('oscap-profile') oscap_fetch_remote_resources = self.get_value('oscap-fetch-remote-resources') oscap_severity_index = OpenSCAPGeneric.__parse_sev_to_int( oscap_severity = self.get_value('oscap-severity') ) pull_repository_type = self.get_value([ 'container-image-pull-registry-type', 'container-image-registry-type' ]) try: # create container from image that can be mounted print(f"\nCreate container from image ({image_address})") container_name = create_container_from_image( image_address=image_address, repository_type=pull_repository_type ) print(f"Created container ({container_name}) from image ({image_address})") # baking `buildah unshare` command to wrap other buildah commands with # so that container does not need to be running in a privileged mode to be able # to function buildah_unshare_command = sh.buildah.bake('unshare') # pylint: disable=no-member # mount the container filesystem and get mount path # # NOTE: run in the context of `buildah unshare` so that container does not # need to be run in a privileged mode print(f"\nMount container: {container_name}") container_mount_path = mount_container( buildah_unshare_command=buildah_unshare_command, container_id=container_name ) print(f"Mounted container ({container_name}) with mount path: '{container_mount_path}'") try: # download the open scap input file oscap_input_definitions_uri = self.get_value('oscap-input-definitions-uri') print(f"\nDownload input definitions: {oscap_input_definitions_uri}") oscap_input_file = download_and_decompress_source_to_destination( source_uri=oscap_input_definitions_uri, destination_dir=self.work_dir_path ) print(f"Downloaded input definitions to: {oscap_input_file}") except (RuntimeError, ValueError) as error: raise StepRunnerException( f"Error downloading OpenSCAP input file: {error}" ) from error try: # if specified download oscap tailoring file oscap_tailoring_file = None oscap_tailoring_file_uri = self.get_value('oscap-tailoring-uri') if oscap_tailoring_file_uri: print(f"\nDownload oscap tailoring file: {oscap_tailoring_file_uri}") oscap_tailoring_file = download_and_decompress_source_to_destination( source_uri=oscap_tailoring_file_uri, destination_dir=self.work_dir_path ) print(f"Download oscap tailoring file to: {oscap_tailoring_file}") except (RuntimeError, ValueError) as error: raise StepRunnerException( f"Error downloading OpenSCAP tailoring file: {error}" ) from error # determine oscap eval type based on document type print(f"\nDetermine OpenSCAP document type of input file: {oscap_input_file}") oscap_document_type = OpenSCAPGeneric.__get_oscap_document_type( oscap_input_file=oscap_input_file ) print( "Determined OpenSCAP document type of input file" f" ({oscap_input_file}): {oscap_document_type}" ) print( f"\nDetermine OpenSCAP eval type for input file ({oscap_input_file}) " f"of document type: {oscap_document_type}" ) oscap_eval_type = OpenSCAPGeneric.__get_oscap_eval_type_based_on_document_type( oscap_document_type=oscap_document_type ) print( "Determined OpenSCAP eval type of input file" f" ({oscap_input_file}): {oscap_eval_type}" ) # Execute scan in the context of buildah unshare # # NOTE: run in the context of `buildah unshare` so that container does not # need to be run in a privilaged mode oscap_out_file_path = self.write_working_file(f'oscap-{oscap_eval_type}-out') oscap_xml_results_file_path = self.write_working_file( f'oscap-{oscap_eval_type}-results.xml' ) oscap_html_report_path = self.write_working_file(f'oscap-{oscap_eval_type}-report.html') print("\nRun oscap scan") oscap_eval_success, \ oscap_eval_fails, \ oscap_failure_met_threshold = OpenSCAPGeneric.__run_oscap_scan( buildah_unshare_command=buildah_unshare_command, oscap_eval_type=oscap_eval_type, oscap_input_file=oscap_input_file, oscap_out_file_path=oscap_out_file_path, oscap_xml_results_file_path=oscap_xml_results_file_path, oscap_html_report_path=oscap_html_report_path, container_mount_path=container_mount_path, oscap_profile=oscap_profile, oscap_tailoring_file=oscap_tailoring_file, oscap_fetch_remote_resources=oscap_fetch_remote_resources, oscap_severity_index=oscap_severity_index ) print(f"OpenSCAP scan completed with eval success: {oscap_eval_success}") # save scan results # if there were no failure or threshold was met then pass step_result.success = oscap_eval_success or not oscap_failure_met_threshold # report all issues even if they did not meet threshold if not oscap_eval_success: step_result.message = f"OSCAP eval found issues:\n{oscap_eval_fails}" step_result.add_artifact( name='html-report', value=oscap_html_report_path ) step_result.add_artifact( name='xml-report', value=oscap_xml_results_file_path ) step_result.add_artifact( name='stdout-report', value=oscap_out_file_path ) except (StepRunnerException, RuntimeError) as error: step_result.success = False step_result.message = str(error) return step_result @staticmethod def __get_oscap_document_type(oscap_input_file): """Gets the OpenSCAP document type for a given input file. Parameters ---------- oscap_input_file : path Path to OSCAP file to determine the OpenSCAP document type of. Returns ------- str OpenSCAP document type. For example: * Source Data Stream * XCCDF Checklist * OVAL Definitions Raises ------ StepRunnerException If error getting document type of oscap input file. """ oscap_document_type = None try: oscap_info_out_buff = StringIO() sh.oscap.info( # pylint: disable=no-member oscap_input_file, _out=oscap_info_out_buff ) oscap_info_out = oscap_info_out_buff.getvalue().rstrip() oscap_document_type_match = OpenSCAPGeneric.OSCAP_INFO_DOC_TYPE_PATTERN.search( oscap_info_out ) oscap_document_type = oscap_document_type_match.groupdict()['doctype'] except sh.ErrorReturnCode as error: raise StepRunnerException( f"Error getting document type of oscap input file" f" ({oscap_input_file}): {error}" ) from error return oscap_document_type @staticmethod def __get_oscap_eval_type_based_on_document_type(oscap_document_type): """Given an OSCAP document type returns the type of oscap eval that should be used. Parameters ---------- oscap_document_type : str OSCAP Document type to get the oscap eval type for. Returns ------- str OSCAP eval type to perform on document with given oscap document type. """ oscap_eval_type = None if oscap_document_type == 'Source Data Stream': oscap_eval_type = 'xccdf' elif oscap_document_type == 'XCCDF Checklist': oscap_eval_type = 'xccdf' elif oscap_document_type == 'OVAL Definitions': oscap_eval_type = 'oval' return oscap_eval_type @staticmethod def __run_oscap_scan( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements buildah_unshare_command, oscap_eval_type, oscap_input_file, oscap_out_file_path, oscap_xml_results_file_path, oscap_html_report_path, container_mount_path, oscap_profile=None, oscap_tailoring_file=None, oscap_fetch_remote_resources=True, oscap_severity_index=-1 ): """Run an oscap scan in the context of a buildah unshare to run "rootless". Parameters ---------- buildah_unshare_command : sh.buildah.unshare.bake() A baked sh.buildah.unshare command to use to run this command in the context off so that this can be done "rootless". oscap_eval_type : str The type of oscap eval to perform. Must be a valid oscap eval type. EX: xccdf, oval oscap_input_file : str Path to rules file passed to the oscap command. oscap_out_file_path : str Path to write the stdout and stderr of running the oscap command to. oscap_xml_results_file_path : str Write the scan results into this file. oscap_html_report_path : str Write the human readable (HTML) report into this file. container_mount_path : str Path to the mounted container to scan. oscap_tailoring_file : str XCCF Tailoring file. See: - https://www.open-scap.org/security-policies/customization/ - https://www.open-scap.org/resources/documentation/customizing-scap-security-guide-for-your-use-case/ # pylint: disable=line-too-long - https://static.open-scap.org/openscap-1.2/oscap_user_manual.html#_how_to_tailor_source_data_stream # pylint: disable=line-too-long oscap_profile : str OpenSCAP profile to evaluate. Must be a valid profile in the given oscap_input_file. EX: if you perform an `oscap info oscap_input_file` the profile must be listed. oscap_severity_index : int Index of the severity level. -1 : undefined 0 : low 1 : moderate 2 : important 3 : critical Returns ------- oscap_eval_success : bool True if oscap eval passed all rules False if oscap eval failed any rules oscap_eval_fails : str If oscap_eval_success is True then indeterminate. If oscap_eval_success is False then string of all of the failed rules. oscap_failure_met_threshold : bool True if oval type or if there were failures that were greater than or equal to the oscap_severity_index False if xccdf and there were not failures or if there were failures and they were las than oscap_severity_index Raises ------ StepRunnerException If unexpected error running oscap scan. """ oscap_profile_flag = None if oscap_profile is not None: oscap_profile_flag = f"--profile={oscap_profile}" oscap_fetch_remote_resources_flag = None if isinstance(oscap_fetch_remote_resources, str): oscap_fetch_remote_resources = strtobool(oscap_fetch_remote_resources) if oscap_fetch_remote_resources: oscap_fetch_remote_resources_flag = "--fetch-remote-resources" oscap_tailoring_file_flag = None if oscap_tailoring_file is not None: oscap_tailoring_file_flag = f"--tailoring-file={oscap_tailoring_file}" oscap_eval_success = None oscap_eval_out_buff = StringIO() oscap_eval_out = "" oscap_eval_fails = None oscap_failure_met_threshold = False try: oscap_chroot_command = buildah_unshare_command.bake("oscap-chroot") with open(oscap_out_file_path, 'w', encoding='utf-8') as oscap_out_file: out_callback = create_sh_redirect_to_multiple_streams_fn_callback([ oscap_eval_out_buff, oscap_out_file ]) err_callback = create_sh_redirect_to_multiple_streams_fn_callback([ oscap_eval_out_buff, oscap_out_file ]) oscap_chroot_command( container_mount_path, oscap_eval_type, 'eval', oscap_profile_flag, oscap_fetch_remote_resources_flag, oscap_tailoring_file_flag, f'--results={oscap_xml_results_file_path}', f'--report={oscap_html_report_path}', oscap_input_file, _out=out_callback, _err=err_callback, _tee='err' ) oscap_eval_success = True except sh.ErrorReturnCode_1 as error: # pylint: disable=no-member oscap_eval_success = error except sh.ErrorReturnCode_2 as error: # pylint: disable=no-member # XCCDF: If there is at least one rule with either fail or unknown result, # oscap-scan finishes with return code 2. # OVAL: Never returned # # Source: https://www.systutorials.com/docs/linux/man/8-oscap/ if oscap_eval_type == 'xccdf': oscap_eval_success = False else: oscap_eval_success = error except sh.ErrorReturnCode as error: oscap_eval_success = error # get the oscap output oscap_eval_out = oscap_eval_out_buff.getvalue() # parse the oscap output # NOTE: oscap is puts carrage returns (\r / ^M) in their output, remove them oscap_eval_out = re.sub('\r', '', oscap_eval_out) # print the oscap output no matter the results print(oscap_eval_out) # if unexpected error throw error if isinstance(oscap_eval_success, Exception): raise StepRunnerException( f"Error running 'oscap {oscap_eval_type} eval': {oscap_eval_success} " ) from oscap_eval_success # NOTE: oscap oval eval returns exit code 0 whether or not any rules failed # need to search output to determine if there were any rule failures if oscap_eval_type == 'oval' and oscap_eval_success: oscap_eval_fails = "" #oval does not contain serverity in output so it always meets threshold oscap_failure_met_threshold=True for match in OpenSCAPGeneric.OSCAP_OVAL_STDOUT_PATTERN.finditer(oscap_eval_out): # NOTE: need to do regex and not == because may contain xterm color chars if OpenSCAPGeneric.OSCAP_OVAL_STDOUT_FAIL_PATTERN.search( match.groupdict()['ruleresult'] ): oscap_eval_fails += match.groupdict()['ruleblock'] oscap_eval_fails += "\n" oscap_eval_success = False # if failed xccdf eval then parse out the fails check if any are above severity # threshold if oscap_eval_type == 'xccdf' and not oscap_eval_success: oscap_eval_fails = "" for match in OpenSCAPGeneric.OSCAP_XCCDF_STDOUT_PATTERN.finditer(oscap_eval_out): # NOTE: need to do regex and not == because may contain xterm color chars if re.search(r'fail', match.groupdict()['ruleresult']): oscap_eval_fails += "\n" oscap_eval_fails += match.groupdict()['ruleblock'] oscap_eval_fails += "\n" #No need to run severity check if value is not set #or severity is not found for rule if (oscap_severity_index is not None and match.groupdict()['severity']): match_severity_index = OpenSCAPGeneric.__parse_sev_to_int( oscap_severity=match.groupdict()['severity'] ) #If severity is not found or #the set severity is the same or higher #then threshold is met if (match_severity_index is None or match_severity_index >= oscap_severity_index): oscap_failure_met_threshold=True else: oscap_failure_met_threshold=True return oscap_eval_success, oscap_eval_fails, oscap_failure_met_threshold @staticmethod def __parse_sev_to_int(oscap_severity): if oscap_severity is None: return None oscap_severity_index = None severity_dict = { 'low': 0, 'moderate' : 1, 'important' : 2, 'critical': 3 } oscap_severity_index = severity_dict.get(oscap_severity.strip().lower()) return oscap_severity_index
Ancestors
- StepImplementer
- abc.ABC
Subclasses
Class variables
Inherited members
StepImplementer
:config
create_working_dir_sub_dir
environment
get_config_value
get_copy_of_runtime_step_config
get_result_value
get_value
global_config_defaults
global_environment_config_defaults
has_config_value
run_step
step_config
step_config_overrides
step_environment_config
step_implementer_config_defaults
step_name
sub_step_implementer_name
sub_step_name
work_dir_path
workflow_result
write_working_file