diff --git a/docs/manual/developer/06_contributing_with_content.md b/docs/manual/developer/06_contributing_with_content.md index 36e664a49b8..17e37535668 100644 --- a/docs/manual/developer/06_contributing_with_content.md +++ b/docs/manual/developer/06_contributing_with_content.md @@ -1150,31 +1150,45 @@ At the moment, only the CPE mechanism is supported. ### Applicability by CPE The CPEs defined by the project are declared in -`shared/applicability/cpes.yml`. +`shared/applicability/*.yml`, one CPE per file. + +The id of the CPE is inferred from the file name. Syntax is as follows (using examples of existing CPEs): - cpes: - - machine: ## The id of the CPE + machine.yml: ## The id of the CPE is 'machine' name: "cpe:/a:machine" ## The CPE Name as defined by the CPE standard title: "Bare-metal or Virtual Machine" ## Human readable title for the CPE check_id: installed_env_is_a_machine ## ID of OVAL implementing the applicability check - - gdm: - name: "cpe:/a:gdm" - title: "Package gdm is installed" - check_id: installed_env_has_gdm_package -The first entry above defines a CPE whose `id` is `machine`, this CPE + package.yml: + name: "cpe:/a:{arg}" + title: "Package {pkgname} is installed" + check_id: cond_package_{arg} + bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}} ## The conditional expression for Bash remediations + ansible_conditional: {{{ ansible_pkg_conditional("{pkgname}") }}} ## The conditional expression for Ansible remediations + template: ## Instead of static OVAL checks a CPE can use templates + name: cond_package ## Name of the template with OVAL applicability check + args: ## CPEs can be parametrized: 'package[*]'. + ntp: ## This is the map of substitution values for 'package[ntp]' + pkgname: ntp ## "Package {pkgname} is installed" -> "Package ntp is installed" + title: NTP daemon and utilities + +The first file above defines a CPE whose `id` is `machine`, this CPE is used for rules not applicable to containers. A rule or profile with `platform: machine` will be evaluated only if the targeted scan environment is either bare-metal or virtual machine. -The second entry defines a CPE for GDM. -By setting the `platform` to `gdm`, the rule will have its applicability -restricted to only environments which have `gdm` package installed. +The second file defines a parametrized CPE. This allows us to define multiple +similar CPEs that differ in their argument. In our example, we define +the `package` CPE. Within the `args` key we configure a set of its possible +arguments and their values. In our example, there is a single possible value: `ntp`. + +By setting the `platform` to `package[ntp]`, the rule will have its applicability +restricted to only environments which have `ntp` package installed. The OVAL checks for the CPE need to be of `inventory` class, and must be -under `shared/checks/oval/`. +under `shared/checks/oval/` or have a template under `shared/templates/`. #### Setting a product's default CPE diff --git a/shared/applicability/package.yml b/shared/applicability/package.yml new file mode 100644 index 00000000000..de1d67f5256 --- /dev/null +++ b/shared/applicability/package.yml @@ -0,0 +1,11 @@ +name: "cpe:/a:{arg}" +title: "Package {pkgname} is installed" +bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}} +ansible_conditional: {{{ ansible_pkg_conditional("{pkgname}") }}} +check_id: platform_package_{arg} +template: + name: platform_package +args: + ntp: + pkgname: ntp + title: NTP daemon and utilities diff --git a/shared/templates/platform_package/oval.template b/shared/templates/platform_package/oval.template new file mode 100644 index 00000000000..5d15cbad39e --- /dev/null +++ b/shared/templates/platform_package/oval.template @@ -0,0 +1,11 @@ + + + {{{ oval_metadata("The " + pkg_system|upper + " package " + PKGNAME + " should be installed.", affected_platforms=["multi_platform_all"]) }}} + + + + +{{{ oval_test_package_installed(package=PKGNAME, test_id="platform_test_" + _RULE_ID + "_installed") }}} + diff --git a/shared/templates/platform_package/template.yml b/shared/templates/platform_package/template.yml new file mode 100644 index 00000000000..2f6f2d2c7cb --- /dev/null +++ b/shared/templates/platform_package/template.yml @@ -0,0 +1,2 @@ +supported_languages: + - oval diff --git a/ssg/boolean_expression.py b/ssg/boolean_expression.py index e0ca79e574d..206aa088815 100644 --- a/ssg/boolean_expression.py +++ b/ssg/boolean_expression.py @@ -94,12 +94,16 @@ def name(self): return self.requirement.project_name @staticmethod - def cpe_id_is_parametrized(cpe_id): - return re.search(r'^\w+\[\w+\]$', cpe_id) + def is_parametrized(name): + return bool(pkg_resources.Requirement.parse(name).extras) @staticmethod - def is_cpe_name(cpe_id_or_name): - return cpe_id_or_name.startswith("cpe:") + def get_base_of_parametrized_name(name): + """ + If given a parametrized platform name such as package[test], + it returns the package part only. + """ + return pkg_resources.Requirement.parse(name).project_name class Algebra(boolean.BooleanAlgebra): @@ -125,11 +129,3 @@ def __init__(self, symbol_cls, function_cls): super(Algebra, self).__init__(allowed_in_token=VERSION_SYMBOLS+SPEC_SYMBOLS, Symbol_class=symbol_cls, NOT_class=not_cls, AND_class=and_cls, OR_class=or_cls) - - -def get_base_name_of_parametrized_platform(name): - """ - If given a parametrized platform name such as package[test], - it returns the package part only. - """ - return pkg_resources.Requirement.parse(name).project_name diff --git a/ssg/build_cpe.py b/ssg/build_cpe.py index 0a694490ffe..ffcb9f4ae07 100644 --- a/ssg/build_cpe.py +++ b/ssg/build_cpe.py @@ -12,8 +12,7 @@ from .utils import required_key, apply_formatting_on_dict_values from .xml import ElementTree as ET from .boolean_expression import Algebra, Symbol, Function -from .boolean_expression import get_base_name_of_parametrized_platform -from .entities.common import XCCDFEntity +from .entities.common import XCCDFEntity, Templatable from .yaml import convert_string_to_bool @@ -82,16 +81,27 @@ def add_cpe_item(self, cpe_item): def get_cpe(self, cpe_id_or_name): try: - if Symbol.is_cpe_name(cpe_id_or_name): + if CPEItem.is_cpe_name(cpe_id_or_name): return self.cpes_by_name[cpe_id_or_name] else: - if Symbol.cpe_id_is_parametrized(cpe_id_or_name): - cpe_id_or_name = get_base_name_of_parametrized_platform( + if CPEALFactRef.cpe_id_is_parametrized(cpe_id_or_name): + cpe_id_or_name = CPEALFactRef.get_base_name_of_parametrized_cpe_id( cpe_id_or_name) return self.cpes_by_id[cpe_id_or_name] except KeyError: raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name) + def add_resolved_cpe_items_from_platform(self, platform): + for fact_ref in platform.test.get_symbols(): + if fact_ref.arg: + cpe = self.get_cpe(fact_ref.cpe_name) + new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref) + self.add_cpe_item(new_cpe) + fact_ref.cpe_name = new_cpe.name + + def get_cpe_for_fact_ref(self, fact_ref): + return self.get_cpe(fact_ref.as_id()) + def get_cpe_name(self, cpe_id): cpe = self.get_cpe(cpe_id) return cpe.name @@ -133,7 +143,7 @@ def to_file(self, file_name, cpe_oval_file): tree.write(file_name, encoding="utf-8") -class CPEItem(XCCDFEntity): +class CPEItem(XCCDFEntity, Templatable): """ Represents the cpe-item element from the CPE standard. """ @@ -144,8 +154,10 @@ class CPEItem(XCCDFEntity): bash_conditional=lambda: "", ansible_conditional=lambda: "", is_product_cpe=lambda: False, + args=lambda: {}, ** XCCDFEntity.KEYS ) + KEYS.update(**Templatable.KEYS) MANDATORY_KEYS = [ "name", @@ -177,6 +189,29 @@ def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe) return cpe_item + def set_template_variables(self, *sources): + if self.is_templated(): + self.template["vars"] = {} + for source in sources: + self.template["vars"].update(source) + + def create_resolved_cpe_item_for_fact_ref(self, fact_ref): + resolved_parameters = self.args[fact_ref.arg] + resolved_parameters.update(fact_ref.as_dict()) + cpe_item_as_dict = self.represent_as_dict() + cpe_item_as_dict["args"] = None + cpe_item_as_dict["id_"] = fact_ref.as_id() + new_associated_cpe_item_as_dict = apply_formatting_on_dict_values( + cpe_item_as_dict, resolved_parameters) + new_associated_cpe_item = CPEItem.get_instance_from_full_dict( + new_associated_cpe_item_as_dict) + new_associated_cpe_item.set_template_variables(resolved_parameters) + return new_associated_cpe_item + + @staticmethod + def is_cpe_name(cpe_id_or_name): + return cpe_id_or_name.startswith("cpe:") + class CPEALLogicalTest(Function): @@ -199,10 +234,6 @@ def enrich_with_cpe_info(self, cpe_products): for arg in self.args: arg.enrich_with_cpe_info(cpe_products) - def pass_parameters(self, product_cpes): - for arg in self.args: - arg.pass_parameters(product_cpes) - def to_bash_conditional(self): child_bash_conds = [ a.to_bash_conditional() for a in self.args @@ -262,17 +293,6 @@ def enrich_with_cpe_info(self, cpe_products): self.ansible_conditional = cpe_products.get_cpe(self.cpe_name).ansible_conditional self.cpe_name = cpe_products.get_cpe_name(self.cpe_name) - def pass_parameters(self, product_cpes): - if self.arg: - associated_cpe_item_as_dict = product_cpes.get_cpe(self.cpe_name).represent_as_dict() - new_associated_cpe_item_as_dict = apply_formatting_on_dict_values( - associated_cpe_item_as_dict, self.as_dict()) - new_associated_cpe_item_as_dict["id_"] = self.as_id() - new_associated_cpe_item = CPEItem.get_instance_from_full_dict( - new_associated_cpe_item_as_dict) - product_cpes.add_cpe_item(new_associated_cpe_item) - self.cpe_name = new_associated_cpe_item.name - def to_xml_element(self): cpe_factref = ET.Element("{%s}fact-ref" % CPEALFactRef.ns) cpe_factref.set('name', self.cpe_name) @@ -284,6 +304,14 @@ def to_bash_conditional(self): def to_ansible_conditional(self): return self.ansible_conditional + @staticmethod + def cpe_id_is_parametrized(cpe_id): + return Symbol.is_parametrized(cpe_id) + + @staticmethod + def get_base_name_of_parametrized_cpe_id(cpe_id): + return Symbol.get_base_of_parametrized_name(cpe_id) + def extract_subelement(objects, sub_elem_type): """ diff --git a/ssg/build_yaml.py b/ssg/build_yaml.py index c87ccf8444c..9024a758196 100644 --- a/ssg/build_yaml.py +++ b/ssg/build_yaml.py @@ -1428,6 +1428,8 @@ def load_benchmark(self, directory): self.benchmark.unselect_empty_groups() def load_compiled_content(self): + self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml) + self.fixes = ssg.build_remediations.load_compiled_remediations(self.fixes_dir) filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml")) @@ -1443,8 +1445,6 @@ def load_compiled_content(self): self.load_entities_by_id(filenames, self.platforms, Platform) self.product_cpes.platforms = self.platforms - self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml) - for g in self.groups.values(): g.load_entities(self.rules, self.values, self.groups) @@ -1512,14 +1512,13 @@ class Platform(XCCDFEntity): def from_text(cls, expression, product_cpes): if not product_cpes: return None - test = product_cpes.algebra.parse( - expression, simplify=True) - id = test.as_id() - platform = cls(id) + test = product_cpes.algebra.parse(expression, simplify=True) + id_ = test.as_id() + platform = cls(id_) platform.test = test - platform.test.pass_parameters(product_cpes) + product_cpes.add_resolved_cpe_items_from_platform(platform) platform.test.enrich_with_cpe_info(product_cpes) - platform.name = id + platform.name = id_ platform.original_expression = expression platform.xml_content = platform.get_xml() platform.bash_conditional = platform.test.to_bash_conditional() @@ -1529,8 +1528,8 @@ def from_text(cls, expression, product_cpes): def get_xml(self): cpe_platform = ET.Element("{%s}platform" % Platform.ns) cpe_platform.set('id', self.name) - # in case the platform contains only single CPE name, fake the logical test - # we have to athere to CPE specification + # In case the platform contains only single CPE name, fake the logical test + # we have to adhere to CPE specification if isinstance(self.test, CPEALFactRef): cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns) cpe_test.set('operator', 'AND') @@ -1557,11 +1556,11 @@ def get_remediation_conditional(self, language): def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): platform = super(Platform, cls).from_yaml(yaml_file, env_yaml) platform.xml_content = ET.fromstring(platform.xml_content) - # if we did receive a product_cpes, we can restore also the original test object + # If we received a product_cpes, we can restore also the original test object # it can be later used e.g. for comparison if product_cpes: - platform.test = product_cpes.algebra.parse( - platform.original_expression, simplify=True) + platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True) + product_cpes.add_resolved_cpe_items_from_platform(platform) return platform def __eq__(self, other): diff --git a/ssg/templates.py b/ssg/templates.py index 268c30cabc9..7e2cc63a136 100644 --- a/ssg/templates.py +++ b/ssg/templates.py @@ -207,6 +207,20 @@ def build_lang_for_templatable(self, templatable, lang): filled_template = self.get_lang_contents_for_templatable(templatable, lang) self.write_lang_contents_for_templatable(filled_template, lang, templatable) + def build_cpe(self, cpe): + for lang in self.get_resolved_langs_to_generate(cpe): + self.build_lang_for_templatable(cpe, lang) + + def build_platform(self, platform): + """ + Builds templated content of a given Platform (all CPEs/Symbols) for all available + languages, writing the output to the correct build directories. + """ + for fact_ref in platform.test.get_symbols(): + cpe = self.product_cpes.get_cpe_for_fact_ref(fact_ref) + if cpe.is_templated(): + self.build_cpe(cpe) + def build_rule(self, rule): """ Builds templated content of a given Rule for all available languages, @@ -230,6 +244,13 @@ def build_extra_ovals(self): }) self.build_lang_for_templatable(rule, LANGUAGES["oval"]) + def build_all_platforms(self): + for platform_file in sorted(os.listdir(self.platforms_dir)): + platform_path = os.path.join(self.platforms_dir, platform_file) + platform = ssg.build_yaml.Platform.from_yaml(platform_path, self.env_yaml, + self.product_cpes) + self.build_platform(platform) + def build_all_rules(self): for rule_file in sorted(os.listdir(self.resolved_rules_dir)): rule_path = os.path.join(self.resolved_rules_dir, rule_file) @@ -252,3 +273,4 @@ def build(self): self.build_extra_ovals() self.build_all_rules() + self.build_all_platforms() diff --git a/tests/unit/ssg-module/data/applicability/package.yml b/tests/unit/ssg-module/data/applicability/package.yml index 4c842bfd164..86015f36d69 100644 --- a/tests/unit/ssg-module/data/applicability/package.yml +++ b/tests/unit/ssg-module/data/applicability/package.yml @@ -1,3 +1,10 @@ name: "cpe:/a:{arg}" -title: "Package {arg} is installed" +title: "Package {pkgname} is installed" +bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}} check_id: installed_env_has_{arg}_package +template: + name: package_installed +args: + ntp: + pkgname: ntp + title: NTP diff --git a/tests/unit/ssg-module/data/package_ntp.yml b/tests/unit/ssg-module/data/package_ntp.yml new file mode 100644 index 00000000000..cfcc7f7f258 --- /dev/null +++ b/tests/unit/ssg-module/data/package_ntp.yml @@ -0,0 +1,9 @@ +name: package_ntp +original_expression: package[ntp] +xml_content: +bash_conditional: ( ( rpm --quiet -q ntp ) ) +ansible_conditional: ( ( "ntp" in ansible_facts.packages ) ) +definition_location: '' +documentation_complete: true +title: 'NTP Package' diff --git a/tests/unit/ssg-module/data/templates/package_installed/template.py b/tests/unit/ssg-module/data/templates/package_installed/template.py deleted file mode 100644 index cfb47b7af5d..00000000000 --- a/tests/unit/ssg-module/data/templates/package_installed/template.py +++ /dev/null @@ -1,12 +0,0 @@ -import re - - -def preprocess(data, lang): - if "evr" in data: - evr = data["evr"] - if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0): - raise RuntimeError( - "ERROR: input violation: evr key should be in " - "epoch:version-release format, but package {0} has set " - "evr to {1}".format(data["pkgname"], evr)) - return data diff --git a/tests/unit/ssg-module/test_build_yaml.py b/tests/unit/ssg-module/test_build_yaml.py index d4229a93d85..b99f0d410f9 100644 --- a/tests/unit/ssg-module/test_build_yaml.py +++ b/tests/unit/ssg-module/test_build_yaml.py @@ -362,21 +362,22 @@ def test_platform_as_dict(product_cpes): assert d["bash_conditional"] == "( rpm --quiet -q chrony )" assert "xml_content" in d + def test_platform_get_invalid_conditional_language(product_cpes): platform = ssg.build_yaml.Platform.from_text("ntp or chrony", product_cpes) with pytest.raises(AttributeError): assert platform.get_remediation_conditional("foo") + def test_parametrized_platform(product_cpes): - platform = ssg.build_yaml.Platform.from_text("package[test]", product_cpes) + platform = ssg.build_yaml.Platform.from_text("package[ntp]", product_cpes) assert platform.test.cpe_name != "cpe:/a:{arg}" - assert platform.test.cpe_name == "cpe:/a:test" + assert platform.test.cpe_name == "cpe:/a:ntp" cpe_item = product_cpes.get_cpe(platform.test.cpe_name) - assert cpe_item.name == "cpe:/a:test" - assert cpe_item.title == "Package test is installed" - assert cpe_item.check_id == "installed_env_has_test_package" - - + assert cpe_item.id_ == "package_ntp" + assert cpe_item.name == "cpe:/a:ntp" + assert cpe_item.title == "Package ntp is installed" + assert cpe_item.check_id == "installed_env_has_ntp_package" def test_derive_id_from_file_name(): diff --git a/tests/unit/ssg-module/test_templates.py b/tests/unit/ssg-module/test_templates.py index 723e749df1a..a24bda79cad 100644 --- a/tests/unit/ssg-module/test_templates.py +++ b/tests/unit/ssg-module/test_templates.py @@ -13,6 +13,7 @@ ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "data")) templates_dir = os.path.join(DATADIR, "templates") +platforms_dir = os.path.join(DATADIR, ".") cpe_items_dir = os.path.join(DATADIR, "applicability") build_config_yaml = os.path.join(ssg_root, "build", "build_config.yml") @@ -33,6 +34,32 @@ def test_render_extra_ovals(): "title": oval_def_id, "template": template, }) - oval_content = builder.get_lang_contents_for_templatable(rule, - ssg.templates.LANGUAGES["oval"]) - assert "%s" % (oval_def_id,) in oval_content + + oval_content = builder.get_lang_contents_for_templatable( + rule, ssg.templates.LANGUAGES["oval"]) + + assert 'id="package_%s_installed"' % (rule.template['vars']['pkgname']) \ + in oval_content + + assert "%s" % (oval_def_id,) \ + in oval_content + + +def test_platform_templates(): + builder = ssg.templates.Builder( + env_yaml, '', templates_dir, + '', '', platforms_dir, cpe_items_dir) + + platform_path = os.path.join(builder.platforms_dir, "package_ntp.yml") + platform = ssg.build_yaml.Platform.from_yaml(platform_path, builder.env_yaml, + builder.product_cpes) + for fact_ref in platform.test.get_symbols(): + cpe = builder.product_cpes.get_cpe_for_fact_ref(fact_ref) + oval_content = builder.get_lang_contents_for_templatable( + cpe, ssg.templates.LANGUAGES["oval"]) + + assert 'id="package_%s"' % (cpe.template['vars']['pkgname']) \ + in oval_content + + assert "Package %s is installed" % (cpe.template['vars']['pkgname'],) \ + in oval_content