diff --git a/checkov/arm/graph_builder/graph_components/block_types.py b/checkov/arm/graph_builder/graph_components/block_types.py index bf8022f4e54..73d46082d8c 100644 --- a/checkov/arm/graph_builder/graph_components/block_types.py +++ b/checkov/arm/graph_builder/graph_components/block_types.py @@ -8,4 +8,5 @@ @dataclass class BlockType(CommonBlockType): - PARAMETER: Literal["parameter"] = "parameter" + PARAMETER: Literal["parameters"] = "parameters" + VARIABLE: Literal["variables"] = "variables" diff --git a/checkov/arm/graph_builder/local_graph.py b/checkov/arm/graph_builder/local_graph.py index 35c37fede82..93ce8fab980 100644 --- a/checkov/arm/graph_builder/local_graph.py +++ b/checkov/arm/graph_builder/local_graph.py @@ -1,12 +1,14 @@ from __future__ import annotations import logging +import re from typing import Any, TYPE_CHECKING from checkov.arm.graph_builder.graph_components.block_types import BlockType from checkov.arm.graph_builder.graph_components.blocks import ArmBlock +from checkov.arm.graph_builder.variable_rendering.renderer import ArmVariableRenderer from checkov.arm.utils import ArmElements -from checkov.common.graph.graph_builder import CustomAttributes +from checkov.common.graph.graph_builder import CustomAttributes, Edge from checkov.common.graph.graph_builder.local_graph import LocalGraph from checkov.common.util.consts import START_LINE, END_LINE from checkov.common.util.data_structures_utils import pickle_deepcopy @@ -21,6 +23,7 @@ def __init__(self, definitions: dict[str, dict[str, Any]]) -> None: self.vertices: list[ArmBlock] = [] self.definitions = definitions self.vertices_by_path_and_id: dict[tuple[str, str], int] = {} + self.vertices_by_name: dict[str, int] = {} def build_graph(self, render_variables: bool = False) -> None: self._create_vertices() @@ -28,16 +31,49 @@ def build_graph(self, render_variables: bool = False) -> None: self._create_edges() logging.debug(f"[ArmLocalGraph] created {len(self.edges)} edges") + if render_variables: + renderer = ArmVariableRenderer(self) + renderer.render_variables_from_local_graph() def _create_vertices(self) -> None: for file_path, definition in self.definitions.items(): self._create_parameter_vertices(file_path=file_path, parameters=definition.get(ArmElements.PARAMETERS)) self._create_resource_vertices(file_path=file_path, resources=definition.get(ArmElements.RESOURCES)) + self._create_variables_vertices(file_path=file_path, variables=definition.get(ArmElements.VARIABLES)) for i, vertex in enumerate(self.vertices): self.vertices_by_block_type[vertex.block_type].append(i) self.vertices_block_name_map[vertex.block_type][vertex.name].append(i) self.vertices_by_path_and_id[(vertex.path, vertex.id)] = i + self.vertices_by_name[vertex.name] = i + + self.in_edges[i] = [] + self.out_edges[i] = [] + + def _create_variables_vertices(self, file_path: str, variables: dict[str, dict[str, Any]] | None) -> None: + if not variables: + return + + for name, conf in variables.items(): + if name in [START_LINE, END_LINE]: + continue + if not isinstance(conf, dict): + full_conf = {"value": pickle_deepcopy(conf)} + else: + full_conf = conf + config = pickle_deepcopy(full_conf) + attributes = pickle_deepcopy(full_conf) + + self.vertices.append( + ArmBlock( + name=name, + config=config, + path=file_path, + block_type=BlockType.VARIABLE, + attributes=attributes, + id=f"{BlockType.VARIABLE}.{name}", + ) + ) def _create_parameter_vertices(self, file_path: str, parameters: dict[str, dict[str, Any]] | None) -> None: if not parameters: @@ -90,8 +126,35 @@ def _create_resource_vertices(self, file_path: str, resources: list[dict[str, An ) def _create_edges(self) -> None: - # no edges yet - pass + self._create_vars_and_parameters_edges() + # todo add explicit references edges + + def _create_edge(self, element_name: str, origin_vertex_index: int, label: str) -> None: + vertex_name = element_name + if "." in vertex_name: + # special case for bicep and arm elements, when properties are accessed + vertex_name = vertex_name.split(".")[0] + + dest_vertex_index = self.vertices_by_name.get(vertex_name) + if dest_vertex_index or dest_vertex_index == 0: + if origin_vertex_index == dest_vertex_index: + return + edge = Edge(origin_vertex_index, dest_vertex_index, label) + self.edges.append(edge) + self.out_edges[origin_vertex_index].append(edge) + self.in_edges[dest_vertex_index].append(edge) + + def _create_vars_and_parameters_edges(self) -> None: + pattern = r"(variables|parameters)\('(\w+)'\)" + for origin_vertex_index, vertex in enumerate(self.vertices): + for attr_key, attr_value in vertex.attributes.items(): + if not isinstance(attr_value, str): + continue + if ArmElements.VARIABLES in attr_value or ArmElements.PARAMETERS in attr_value: + matches = re.findall(pattern, attr_value) + for match in matches: + var_name = match[1] + self._create_edge(var_name, origin_vertex_index, attr_key) def update_vertices_configs(self) -> None: # not used diff --git a/checkov/arm/graph_builder/variable_rendering/__init__.py b/checkov/arm/graph_builder/variable_rendering/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/checkov/arm/graph_builder/variable_rendering/renderer.py b/checkov/arm/graph_builder/variable_rendering/renderer.py new file mode 100644 index 00000000000..14f0c72c30c --- /dev/null +++ b/checkov/arm/graph_builder/variable_rendering/renderer.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from checkov.arm.graph_builder.graph_components.block_types import BlockType +from checkov.common.graph.graph_builder import Edge +from checkov.common.graph.graph_builder.utils import adjust_value +from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer +from checkov.common.util.data_structures_utils import pickle_deepcopy + +if TYPE_CHECKING: + from checkov.arm.graph_builder.local_graph import ArmLocalGraph + + +class ArmVariableRenderer(VariableRenderer["ArmLocalGraph"]): + def __init__(self, local_graph: ArmLocalGraph) -> None: + super().__init__(local_graph) + + def _render_variables_from_vertices(self) -> None: + # need to add rendering to function like format, reference etc + pass + + def evaluate_vertex_attribute_from_edge(self, edge_list: list[Edge]) -> None: + origin_vertex_attributes = self.local_graph.vertices[edge_list[0].origin].attributes + val_to_eval = pickle_deepcopy(origin_vertex_attributes.get(edge_list[0].label, "")) + attr_path = None + for edge in edge_list: + attr_path, attr_value = self.extract_dest_attribute_path_and_value(dest_index=edge.dest, + origin_value=val_to_eval) + '''if the arg start with '[parameters'/ '[variables' its mean we need to eval the all attribute + like here - "addressPrefix": "[parameters('subnetAddressPrefix')]" ''' + if len(edge_list) == 1 and val_to_eval.startswith(("[parameters", "[variables")): + val_to_eval = attr_value + continue + ''' + if the value i need to eval is part of the full attribute like "[format('{0}/{1}', parameters('vnetName'), variables('subnetName'))]" + or "[resourceId('Microsoft.Network/networkProfiles', variables('networkProfileName'))]". + vertices[edge.dest].id = variables.networkProfileName -> variables('networkProfileName') + ''' + val_to_replace = self.local_graph.vertices[edge.dest].id.replace(".", "('") + "')" + val_to_eval = val_to_eval.replace(val_to_replace, attr_value) + + self.local_graph.update_vertex_attribute( + vertex_index=edge_list[0].origin, + attribute_key=edge_list[0].label, + attribute_value=val_to_eval, + change_origin_id=edge_list[0].dest, + attribute_at_dest=attr_path, + ) + + def extract_dest_attribute_path_and_value(self, dest_index: int, origin_value: Any) -> tuple[str, Any] | tuple[None, None]: + vertex = self.local_graph.vertices[dest_index] + if vertex.block_type == BlockType.PARAMETER: + new_value = vertex.attributes.get("defaultValue") + if new_value: + new_value = adjust_value(element_name=origin_value, value=new_value) + return "defaultValue", new_value + elif vertex.block_type == BlockType.VARIABLE: + new_value = adjust_value(element_name=origin_value, value=vertex.attributes["value"]) + return "value", new_value + return None, None + + def evaluate_non_rendered_values(self) -> None: + pass diff --git a/checkov/bicep/graph_builder/local_graph.py b/checkov/bicep/graph_builder/local_graph.py index 0bf98d1cb13..ea5efde60a0 100644 --- a/checkov/bicep/graph_builder/local_graph.py +++ b/checkov/bicep/graph_builder/local_graph.py @@ -11,10 +11,10 @@ from checkov.bicep.graph_builder.graph_components.block_types import BlockType from checkov.bicep.graph_builder.graph_components.blocks import BicepBlock from checkov.bicep.graph_builder.variable_rendering.renderer import BicepVariableRenderer -from checkov.bicep.utils import adjust_value from checkov.common.graph.graph_builder.graph_components.edge import Edge from checkov.common.graph.graph_builder.local_graph import LocalGraph from checkov.common.graph.graph_builder.utils import filter_sub_keys +from checkov.common.graph.graph_builder.utils import adjust_value from checkov.common.util.data_structures_utils import pickle_deepcopy from checkov.common.util.type_forcers import force_int diff --git a/checkov/bicep/graph_builder/variable_rendering/renderer.py b/checkov/bicep/graph_builder/variable_rendering/renderer.py index 589513d827d..d8ec3d22924 100644 --- a/checkov/bicep/graph_builder/variable_rendering/renderer.py +++ b/checkov/bicep/graph_builder/variable_rendering/renderer.py @@ -5,8 +5,8 @@ from pycep.transformer import BicepElement from checkov.bicep.graph_builder.graph_components.block_types import BlockType -from checkov.bicep.utils import adjust_value from checkov.common.graph.graph_builder import Edge +from checkov.common.graph.graph_builder.utils import adjust_value from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer from checkov.common.util.data_structures_utils import pickle_deepcopy diff --git a/checkov/bicep/utils.py b/checkov/bicep/utils.py index fa2016c1a0f..4eee4a70b35 100644 --- a/checkov/bicep/utils.py +++ b/checkov/bicep/utils.py @@ -5,7 +5,7 @@ import re from collections.abc import Collection from pathlib import Path -from typing import Any, TYPE_CHECKING +from typing import TYPE_CHECKING from checkov.common.runners.base_runner import filter_ignored_paths from checkov.runner_filter import RunnerFilter @@ -88,26 +88,3 @@ def create_definitions( logging.warning(f"[bicep] found errors while parsing definitions: {parsing_errors}") return definitions, definitions_raw - - -def adjust_value(element_name: str, value: Any) -> Any: - """Adjusts the value, if the 'element_name' references a nested key - - Ex: - element_name = publicKey.keyData - value = {"keyData": "key-data", "path": "path"} - - returns new_value = "key-data" - """ - - if "." in element_name and isinstance(value, dict): - key_parts = element_name.split(".") - new_value = value.get(key_parts[1]) - - if new_value is None: - # couldn't find key in in value object - return None - - return adjust_value(".".join(key_parts[1:]), new_value) - - return value diff --git a/checkov/common/graph/graph_builder/utils.py b/checkov/common/graph/graph_builder/utils.py index 40ebf864ef6..319f24229ea 100644 --- a/checkov/common/graph/graph_builder/utils.py +++ b/checkov/common/graph/graph_builder/utils.py @@ -44,3 +44,26 @@ def filter_sub_keys(key_list: list[str]) -> list[str]: def is_include_dup_dynamic(key: str, list_keys: list[str]) -> bool: return f"dynamic.{key.split('.')[0]}" not in list_keys + + +def adjust_value(element_name: str, value: Any) -> Any: + """Adjusts the value, if the 'element_name' references a nested key + + Ex: + element_name = publicKey.keyData + value = {"keyData": "key-data", "path": "path"} + + returns new_value = "key-data" + """ + + if "." in element_name and isinstance(value, dict): + key_parts = element_name.split(".") + new_value = value.get(key_parts[1]) + + if new_value is None: + # couldn't find key in in value object + return None + + return adjust_value(".".join(key_parts[1:]), new_value) + + return value diff --git a/tests/arm/graph_builder/test_local_graph.py b/tests/arm/graph_builder/test_local_graph.py index bae80932eeb..17ef9342efe 100644 --- a/tests/arm/graph_builder/test_local_graph.py +++ b/tests/arm/graph_builder/test_local_graph.py @@ -18,8 +18,9 @@ def test_build_graph(): local_graph.build_graph(render_variables=False) # then - assert len(local_graph.vertices) == 15 - assert len(local_graph.edges) == 0 + assert len(local_graph.vertices) == 18 + assert len(local_graph.edges) == 20 assert len(local_graph.vertices_by_block_type[BlockType.PARAMETER]) == 11 assert len(local_graph.vertices_by_block_type[BlockType.RESOURCE]) == 4 + assert len(local_graph.vertices_by_block_type[BlockType.VARIABLE]) == 3 diff --git a/tests/arm/rendering/__init__.py b/tests/arm/rendering/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/arm/rendering/test_rendering.json b/tests/arm/rendering/test_rendering.json new file mode 100644 index 00000000000..cb7228a2aef --- /dev/null +++ b/tests/arm/rendering/test_rendering.json @@ -0,0 +1,48 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "metadata": { + "_generator": { + "name": "bicep", + "version": "0.5.6.12127", + "templateHash": "9027093124117826122" + } + }, + "parameters": { + "vnetName": { + "type": "string", + "defaultValue": "aci-vnet", + "metadata": { + "description": "VNet name" + } + }, + "vnetAddressPrefix": { + "type": "string", + "defaultValue": "10.0.0.0/16", + "metadata": { + "description": "Address prefix" + } + } + }, + "variables": { + "networkProfileName": "aci-networkProfile", + "location": "eth0" + + }, + "resources": [ + { + "type": "Microsoft.Network/virtualNetworks", + "apiVersion": "2020-11-01", + "name": "[format('{0}/{1}', parameters('vnetName'), variables('networkProfileName'))]", + "location": "[variables('location')]", + "id": "[resourceId('Microsoft.Network/networkProfiles', variables('networkProfileName'))]", + "properties": { + "addressSpace": { + "addressPrefixes": [ + "[parameters('vnetAddressPrefix')]" + ] + } + } + } + ] +} \ No newline at end of file diff --git a/tests/arm/rendering/test_rendering.py b/tests/arm/rendering/test_rendering.py new file mode 100644 index 00000000000..1c6d5e725b1 --- /dev/null +++ b/tests/arm/rendering/test_rendering.py @@ -0,0 +1,24 @@ +from pathlib import Path + +from checkov.arm.graph_builder.local_graph import ArmLocalGraph +from checkov.arm.utils import get_files_definitions + +EXAMPLES_DIR = Path(__file__).parent + +def test_rander_vars(): + # given + test_file = EXAMPLES_DIR / "test_rendering.json" + definitions, _, _ = get_files_definitions([str(test_file)]) + local_graph = ArmLocalGraph(definitions=definitions) + # when + local_graph.build_graph(render_variables=True) + + # then + assert len(local_graph.vertices) == 5 + assert len(local_graph.edges) == 5 + assert local_graph.vertices[2].attributes['name'] == "[format('{0}/{1}', aci-vnet, aci-networkProfile)]" + assert local_graph.vertices[2].attributes['id'] == "[resourceId('Microsoft.Network/networkProfiles', aci-networkProfile)]" + assert local_graph.vertices[2].attributes['location'] == "eth0" + assert local_graph.vertices[2].attributes['properties.addressSpace.addressPrefixes.0'] == "10.0.0.0/16" + assert local_graph.vertices[2].attributes['properties']['addressSpace']['addressPrefixes'][0] == "10.0.0.0/16" + diff --git a/tests/arm/test_graph_manager.py b/tests/arm/test_graph_manager.py index 21df6ebe102..4471bcf1839 100644 --- a/tests/arm/test_graph_manager.py +++ b/tests/arm/test_graph_manager.py @@ -32,8 +32,8 @@ def test_build_graph_from_definitions(graph_connector): local_graph = graph_manager.build_graph_from_definitions(definitions=definitions) # then - assert len(local_graph.vertices) == 15 - assert len(local_graph.edges) == 0 + assert len(local_graph.vertices) == 18 + assert len(local_graph.edges) == 20 # resource name will change, when variable rendering is supported container_idx = local_graph.vertices_by_path_and_id[(test_file, "Microsoft.ContainerInstance/containerGroups.[parameters('containerGroupName')]")]