Skip to content

Commit

Permalink
Merge pull request #9906 from evgenyz/add_platform_templates_v2
Browse files Browse the repository at this point in the history
Introduce templated platforms (CPEs)
  • Loading branch information
jan-cerny authored Dec 2, 2022
2 parents 5d12bbc + 11c1a07 commit e27ff7d
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 81 deletions.
38 changes: 26 additions & 12 deletions docs/manual/developer/06_contributing_with_content.md
Original file line number Diff line number Diff line change
Expand Up @@ -1150,31 +1150,45 @@ At the moment, only the CPE mechanism is supported.
### Applicability by CPE
The CPEs defined by the project are declared in
`shared/applicability/cpes.yml`.
`shared/applicability/*.yml`, one CPE per file.
The id of the CPE is inferred from the file name.
Syntax is as follows (using examples of existing CPEs):
cpes:
- machine: ## The id of the CPE
machine.yml: ## The id of the CPE is 'machine'
name: "cpe:/a:machine" ## The CPE Name as defined by the CPE standard
title: "Bare-metal or Virtual Machine" ## Human readable title for the CPE
check_id: installed_env_is_a_machine ## ID of OVAL implementing the applicability check
- gdm:
name: "cpe:/a:gdm"
title: "Package gdm is installed"
check_id: installed_env_has_gdm_package
The first entry above defines a CPE whose `id` is `machine`, this CPE
package.yml:
name: "cpe:/a:{arg}"
title: "Package {pkgname} is installed"
check_id: cond_package_{arg}
bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}} ## The conditional expression for Bash remediations
ansible_conditional: {{{ ansible_pkg_conditional("{pkgname}") }}} ## The conditional expression for Ansible remediations
template: ## Instead of static OVAL checks a CPE can use templates
name: cond_package ## Name of the template with OVAL applicability check
args: ## CPEs can be parametrized: 'package[*]'.
ntp: ## This is the map of substitution values for 'package[ntp]'
pkgname: ntp ## "Package {pkgname} is installed" -> "Package ntp is installed"
title: NTP daemon and utilities
The first file above defines a CPE whose `id` is `machine`, this CPE
is used for rules not applicable to containers.
A rule or profile with `platform: machine` will be evaluated only if the
targeted scan environment is either bare-metal or virtual machine.
The second entry defines a CPE for GDM.
By setting the `platform` to `gdm`, the rule will have its applicability
restricted to only environments which have `gdm` package installed.
The second file defines a parametrized CPE. This allows us to define multiple
similar CPEs that differ in their argument. In our example, we define
the `package` CPE. Within the `args` key we configure a set of its possible
arguments and their values. In our example, there is a single possible value: `ntp`.
By setting the `platform` to `package[ntp]`, the rule will have its applicability
restricted to only environments which have `ntp` package installed.
The OVAL checks for the CPE need to be of `inventory` class, and must be
under `shared/checks/oval/`.
under `shared/checks/oval/` or have a template under `shared/templates/`.
#### Setting a product's default CPE

Expand Down
11 changes: 11 additions & 0 deletions shared/applicability/package.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "cpe:/a:{arg}"
title: "Package {pkgname} is installed"
bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}}
ansible_conditional: {{{ ansible_pkg_conditional("{pkgname}") }}}
check_id: platform_package_{arg}
template:
name: platform_package
args:
ntp:
pkgname: ntp
title: NTP daemon and utilities
11 changes: 11 additions & 0 deletions shared/templates/platform_package/oval.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<def-group>
<definition class="inventory" id="platform_{{{ _RULE_ID }}}"
version="1">
{{{ oval_metadata("The " + pkg_system|upper + " package " + PKGNAME + " should be installed.", affected_platforms=["multi_platform_all"]) }}}
<criteria>
<criterion comment="Package {{{ PKGNAME }}} is installed"
test_ref="platform_test_{{{ _RULE_ID }}}_installed" />
</criteria>
</definition>
{{{ oval_test_package_installed(package=PKGNAME, test_id="platform_test_" + _RULE_ID + "_installed") }}}
</def-group>
2 changes: 2 additions & 0 deletions shared/templates/platform_package/template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
supported_languages:
- oval
20 changes: 8 additions & 12 deletions ssg/boolean_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,16 @@ def name(self):
return self.requirement.project_name

@staticmethod
def cpe_id_is_parametrized(cpe_id):
return re.search(r'^\w+\[\w+\]$', cpe_id)
def is_parametrized(name):
return bool(pkg_resources.Requirement.parse(name).extras)

@staticmethod
def is_cpe_name(cpe_id_or_name):
return cpe_id_or_name.startswith("cpe:")
def get_base_of_parametrized_name(name):
"""
If given a parametrized platform name such as package[test],
it returns the package part only.
"""
return pkg_resources.Requirement.parse(name).project_name


class Algebra(boolean.BooleanAlgebra):
Expand All @@ -125,11 +129,3 @@ def __init__(self, symbol_cls, function_cls):
super(Algebra, self).__init__(allowed_in_token=VERSION_SYMBOLS+SPEC_SYMBOLS,
Symbol_class=symbol_cls,
NOT_class=not_cls, AND_class=and_cls, OR_class=or_cls)


def get_base_name_of_parametrized_platform(name):
"""
If given a parametrized platform name such as package[test],
it returns the package part only.
"""
return pkg_resources.Requirement.parse(name).project_name
70 changes: 49 additions & 21 deletions ssg/build_cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from .utils import required_key, apply_formatting_on_dict_values
from .xml import ElementTree as ET
from .boolean_expression import Algebra, Symbol, Function
from .boolean_expression import get_base_name_of_parametrized_platform
from .entities.common import XCCDFEntity
from .entities.common import XCCDFEntity, Templatable
from .yaml import convert_string_to_bool


Expand Down Expand Up @@ -82,16 +81,27 @@ def add_cpe_item(self, cpe_item):

def get_cpe(self, cpe_id_or_name):
try:
if Symbol.is_cpe_name(cpe_id_or_name):
if CPEItem.is_cpe_name(cpe_id_or_name):
return self.cpes_by_name[cpe_id_or_name]
else:
if Symbol.cpe_id_is_parametrized(cpe_id_or_name):
cpe_id_or_name = get_base_name_of_parametrized_platform(
if CPEALFactRef.cpe_id_is_parametrized(cpe_id_or_name):
cpe_id_or_name = CPEALFactRef.get_base_name_of_parametrized_cpe_id(
cpe_id_or_name)
return self.cpes_by_id[cpe_id_or_name]
except KeyError:
raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name)

def add_resolved_cpe_items_from_platform(self, platform):
for fact_ref in platform.test.get_symbols():
if fact_ref.arg:
cpe = self.get_cpe(fact_ref.cpe_name)
new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref)
self.add_cpe_item(new_cpe)
fact_ref.cpe_name = new_cpe.name

def get_cpe_for_fact_ref(self, fact_ref):
return self.get_cpe(fact_ref.as_id())

def get_cpe_name(self, cpe_id):
cpe = self.get_cpe(cpe_id)
return cpe.name
Expand Down Expand Up @@ -133,7 +143,7 @@ def to_file(self, file_name, cpe_oval_file):
tree.write(file_name, encoding="utf-8")


class CPEItem(XCCDFEntity):
class CPEItem(XCCDFEntity, Templatable):
"""
Represents the cpe-item element from the CPE standard.
"""
Expand All @@ -144,8 +154,10 @@ class CPEItem(XCCDFEntity):
bash_conditional=lambda: "",
ansible_conditional=lambda: "",
is_product_cpe=lambda: False,
args=lambda: {},
** XCCDFEntity.KEYS
)
KEYS.update(**Templatable.KEYS)

MANDATORY_KEYS = [
"name",
Expand Down Expand Up @@ -177,6 +189,29 @@ def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe)
return cpe_item

def set_template_variables(self, *sources):
if self.is_templated():
self.template["vars"] = {}
for source in sources:
self.template["vars"].update(source)

def create_resolved_cpe_item_for_fact_ref(self, fact_ref):
resolved_parameters = self.args[fact_ref.arg]
resolved_parameters.update(fact_ref.as_dict())
cpe_item_as_dict = self.represent_as_dict()
cpe_item_as_dict["args"] = None
cpe_item_as_dict["id_"] = fact_ref.as_id()
new_associated_cpe_item_as_dict = apply_formatting_on_dict_values(
cpe_item_as_dict, resolved_parameters)
new_associated_cpe_item = CPEItem.get_instance_from_full_dict(
new_associated_cpe_item_as_dict)
new_associated_cpe_item.set_template_variables(resolved_parameters)
return new_associated_cpe_item

@staticmethod
def is_cpe_name(cpe_id_or_name):
return cpe_id_or_name.startswith("cpe:")


class CPEALLogicalTest(Function):

Expand All @@ -199,10 +234,6 @@ def enrich_with_cpe_info(self, cpe_products):
for arg in self.args:
arg.enrich_with_cpe_info(cpe_products)

def pass_parameters(self, product_cpes):
for arg in self.args:
arg.pass_parameters(product_cpes)

def to_bash_conditional(self):
child_bash_conds = [
a.to_bash_conditional() for a in self.args
Expand Down Expand Up @@ -262,17 +293,6 @@ def enrich_with_cpe_info(self, cpe_products):
self.ansible_conditional = cpe_products.get_cpe(self.cpe_name).ansible_conditional
self.cpe_name = cpe_products.get_cpe_name(self.cpe_name)

def pass_parameters(self, product_cpes):
if self.arg:
associated_cpe_item_as_dict = product_cpes.get_cpe(self.cpe_name).represent_as_dict()
new_associated_cpe_item_as_dict = apply_formatting_on_dict_values(
associated_cpe_item_as_dict, self.as_dict())
new_associated_cpe_item_as_dict["id_"] = self.as_id()
new_associated_cpe_item = CPEItem.get_instance_from_full_dict(
new_associated_cpe_item_as_dict)
product_cpes.add_cpe_item(new_associated_cpe_item)
self.cpe_name = new_associated_cpe_item.name

def to_xml_element(self):
cpe_factref = ET.Element("{%s}fact-ref" % CPEALFactRef.ns)
cpe_factref.set('name', self.cpe_name)
Expand All @@ -284,6 +304,14 @@ def to_bash_conditional(self):
def to_ansible_conditional(self):
return self.ansible_conditional

@staticmethod
def cpe_id_is_parametrized(cpe_id):
return Symbol.is_parametrized(cpe_id)

@staticmethod
def get_base_name_of_parametrized_cpe_id(cpe_id):
return Symbol.get_base_of_parametrized_name(cpe_id)


def extract_subelement(objects, sub_elem_type):
"""
Expand Down
25 changes: 12 additions & 13 deletions ssg/build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,8 @@ def load_benchmark(self, directory):
self.benchmark.unselect_empty_groups()

def load_compiled_content(self):
self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml)

self.fixes = ssg.build_remediations.load_compiled_remediations(self.fixes_dir)

filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
Expand All @@ -1443,8 +1445,6 @@ def load_compiled_content(self):
self.load_entities_by_id(filenames, self.platforms, Platform)
self.product_cpes.platforms = self.platforms

self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml)

for g in self.groups.values():
g.load_entities(self.rules, self.values, self.groups)

Expand Down Expand Up @@ -1512,14 +1512,13 @@ class Platform(XCCDFEntity):
def from_text(cls, expression, product_cpes):
if not product_cpes:
return None
test = product_cpes.algebra.parse(
expression, simplify=True)
id = test.as_id()
platform = cls(id)
test = product_cpes.algebra.parse(expression, simplify=True)
id_ = test.as_id()
platform = cls(id_)
platform.test = test
platform.test.pass_parameters(product_cpes)
product_cpes.add_resolved_cpe_items_from_platform(platform)
platform.test.enrich_with_cpe_info(product_cpes)
platform.name = id
platform.name = id_
platform.original_expression = expression
platform.xml_content = platform.get_xml()
platform.bash_conditional = platform.test.to_bash_conditional()
Expand All @@ -1529,8 +1528,8 @@ def from_text(cls, expression, product_cpes):
def get_xml(self):
cpe_platform = ET.Element("{%s}platform" % Platform.ns)
cpe_platform.set('id', self.name)
# in case the platform contains only single CPE name, fake the logical test
# we have to athere to CPE specification
# In case the platform contains only single CPE name, fake the logical test
# we have to adhere to CPE specification
if isinstance(self.test, CPEALFactRef):
cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
cpe_test.set('operator', 'AND')
Expand All @@ -1557,11 +1556,11 @@ def get_remediation_conditional(self, language):
def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
platform.xml_content = ET.fromstring(platform.xml_content)
# if we did receive a product_cpes, we can restore also the original test object
# If we received a product_cpes, we can restore also the original test object
# it can be later used e.g. for comparison
if product_cpes:
platform.test = product_cpes.algebra.parse(
platform.original_expression, simplify=True)
platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True)
product_cpes.add_resolved_cpe_items_from_platform(platform)
return platform

def __eq__(self, other):
Expand Down
22 changes: 22 additions & 0 deletions ssg/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,20 @@ def build_lang_for_templatable(self, templatable, lang):
filled_template = self.get_lang_contents_for_templatable(templatable, lang)
self.write_lang_contents_for_templatable(filled_template, lang, templatable)

def build_cpe(self, cpe):
for lang in self.get_resolved_langs_to_generate(cpe):
self.build_lang_for_templatable(cpe, lang)

def build_platform(self, platform):
"""
Builds templated content of a given Platform (all CPEs/Symbols) for all available
languages, writing the output to the correct build directories.
"""
for fact_ref in platform.test.get_symbols():
cpe = self.product_cpes.get_cpe_for_fact_ref(fact_ref)
if cpe.is_templated():
self.build_cpe(cpe)

def build_rule(self, rule):
"""
Builds templated content of a given Rule for all available languages,
Expand All @@ -230,6 +244,13 @@ def build_extra_ovals(self):
})
self.build_lang_for_templatable(rule, LANGUAGES["oval"])

def build_all_platforms(self):
for platform_file in sorted(os.listdir(self.platforms_dir)):
platform_path = os.path.join(self.platforms_dir, platform_file)
platform = ssg.build_yaml.Platform.from_yaml(platform_path, self.env_yaml,
self.product_cpes)
self.build_platform(platform)

def build_all_rules(self):
for rule_file in sorted(os.listdir(self.resolved_rules_dir)):
rule_path = os.path.join(self.resolved_rules_dir, rule_file)
Expand All @@ -252,3 +273,4 @@ def build(self):

self.build_extra_ovals()
self.build_all_rules()
self.build_all_platforms()
9 changes: 8 additions & 1 deletion tests/unit/ssg-module/data/applicability/package.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
name: "cpe:/a:{arg}"
title: "Package {arg} is installed"
title: "Package {pkgname} is installed"
bash_conditional: {{{ bash_pkg_conditional("{pkgname}") }}}
check_id: installed_env_has_{arg}_package
template:
name: package_installed
args:
ntp:
pkgname: ntp
title: NTP
9 changes: 9 additions & 0 deletions tests/unit/ssg-module/data/package_ntp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: package_ntp
original_expression: package[ntp]
xml_content: <ns0:platform xmlns:ns0="http://cpe.mitre.org/language/2.0" id="package_ntp"><ns0:logical-test
operator="AND" negate="false"><ns0:fact-ref name="cpe:/a:ntp" /></ns0:logical-test></ns0:platform>
bash_conditional: ( ( rpm --quiet -q ntp ) )
ansible_conditional: ( ( "ntp" in ansible_facts.packages ) )
definition_location: ''
documentation_complete: true
title: 'NTP Package'
12 changes: 0 additions & 12 deletions tests/unit/ssg-module/data/templates/package_installed/template.py

This file was deleted.

Loading

0 comments on commit e27ff7d

Please sign in to comment.