From 4fbdaeb6d5e9423f92c22de209e29980bb74ae53 Mon Sep 17 00:00:00 2001 From: Omri Yoffe Date: Sun, 27 Oct 2024 14:52:33 +0200 Subject: [PATCH] review fixes --- checkov/arm/graph_builder/local_graph.py | 21 ++++++++++++--------- checkov/arm/utils.py | 2 +- tests/arm/test_utils.py | 16 +++++++++++++++- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/checkov/arm/graph_builder/local_graph.py b/checkov/arm/graph_builder/local_graph.py index 69b87d69c8b..01454207686 100644 --- a/checkov/arm/graph_builder/local_graph.py +++ b/checkov/arm/graph_builder/local_graph.py @@ -16,6 +16,9 @@ if TYPE_CHECKING: from checkov.common.graph.graph_builder.local_graph import _Block +DEPENDS_ON_FIELD = 'dependsOn' +RESOURCE_ID_FUNC = 'resourceId(' +REFERENCE_FUNC = 'reference(' class ArmLocalGraph(LocalGraph[ArmBlock]): def __init__(self, definitions: dict[str, dict[str, Any]]) -> None: @@ -27,7 +30,7 @@ def __init__(self, definitions: dict[str, dict[str, Any]]) -> None: def build_graph(self, render_variables: bool = False) -> None: self._create_vertices() - logging.debug(f"[ArmLocalGraph] created {len(self.vertices)} vertices") + logging.warning(f"[ArmLocalGraph] created {len(self.vertices)} vertices") self._create_vars_and_parameters_edges() if render_variables: @@ -35,7 +38,7 @@ def build_graph(self, render_variables: bool = False) -> None: renderer.render_variables_from_local_graph() self._create_edges() - logging.debug(f"[ArmLocalGraph] created {len(self.edges)} edges") + logging.warning(f"[ArmLocalGraph] created {len(self.edges)} edges") def _create_vertices(self) -> None: for file_path, definition in self.definitions.items(): @@ -85,7 +88,7 @@ def _create_parameter_vertices(self, file_path: str, parameters: dict[str, dict[ if name in (START_LINE, END_LINE): continue if not isinstance(config, dict): - logging.debug(f"[ArmLocalGraph] parameter {name} has wrong type {type(config)}") + logging.warning(f"[ArmLocalGraph] parameter {name} has wrong type {type(config)}") continue attributes = pickle_deepcopy(config) @@ -129,13 +132,13 @@ def _create_resource_vertices(self, file_path: str, resources: list[dict[str, An def _create_edges(self) -> None: for origin_vertex_index, vertex in enumerate(self.vertices): - if 'dependsOn' in vertex.attributes: + if DEPENDS_ON_FIELD in vertex.attributes: self._create_explicit_edge(origin_vertex_index, vertex.name, vertex.attributes['dependsOn']) self._create_implicit_edges(origin_vertex_index, vertex.name, vertex.attributes) def _create_explicit_edge(self, origin_vertex_index: int, resource_name: str, deps: list[str]) -> None: for dep in deps: - if 'resourceId' in dep: + if RESOURCE_ID_FUNC in dep: processed_dep = extract_resource_name_from_resource_id_func(dep) else: processed_dep = dep.split('/')[-1] @@ -144,7 +147,7 @@ def _create_explicit_edge(self, origin_vertex_index: int, resource_name: str, de self._create_edge(processed_dep, origin_vertex_index, f'{resource_name}->{processed_dep}') else: # Dependency not found - logging.debug(f"[ArmLocalGraph] resource dependency {processed_dep} defined in {dep} for resource" + logging.warning(f"[ArmLocalGraph] resource dependency {processed_dep} defined in {dep} for resource" f" {resource_name} not found") continue @@ -169,10 +172,10 @@ def _create_edge(self, element_name: str, origin_vertex_index: int, label: str) self.out_edges[origin_vertex_index].append(edge) self.in_edges[dest_vertex_index].append(edge) - def _create_implicit_edges(self, origin_vertex_index: int, resource_name: str, d: dict[str, Any]) -> None: - for _, value in d.items(): + def _create_implicit_edges(self, origin_vertex_index: int, resource_name: str, resource: dict[str, Any]) -> None: + for value in resource.values(): if isinstance(value, str): - if 'reference(' in value: + if REFERENCE_FUNC in value: self._create_implicit_edge(origin_vertex_index, resource_name, value) def _create_implicit_edge(self, origin_vertex_index: int, resource_name: str, reference_string: str) -> None: diff --git a/checkov/arm/utils.py b/checkov/arm/utils.py index 65e3bb8f994..bf4da4f272c 100644 --- a/checkov/arm/utils.py +++ b/checkov/arm/utils.py @@ -79,7 +79,7 @@ def extract_resource_name_from_reference_func(reference: str) -> str: reference(resourceId('storageResourceGroup', 'Microsoft.Storage/storageAccounts', 'storageAccountName')), '2022-09-01') -> storageAccountName reference(resourceId('Microsoft.Network/publicIPAddresses', 'ipAddressName')) -> ipAddressName ''' - resource_name = ''.join(reference.split('reference(', 1)[1].split(')')[:-1]) + resource_name = ')'.join(reference.split('reference(', 1)[1].split(')')[:-1]) if 'resourceId' in resource_name: return clean_string( ''.join(resource_name.split('resourceId(', 1)[1].split(')')[0]).split(',')[-1]) diff --git a/tests/arm/test_utils.py b/tests/arm/test_utils.py index b71e44561a2..63363fcd06c 100644 --- a/tests/arm/test_utils.py +++ b/tests/arm/test_utils.py @@ -1,6 +1,6 @@ from pathlib import Path -from checkov.arm.utils import get_files_definitions +from checkov.arm.utils import get_files_definitions, extract_resource_name_from_reference_func def test_get_files_definitions_with_parsing_error(): @@ -15,3 +15,17 @@ def test_get_files_definitions_with_parsing_error(): assert definitions_raw == {} assert len(parsing_errors) == 1 assert parsing_errors[0].endswith("parser/examples/json/with_comments.json") + + +def test_extract_resource_name_from_reference_func(): + test_cases = ["reference('storageAccountName')", + "reference('myStorage').primaryEndpoints", + "reference('myStorage', '2022-09-01', 'Full').location", + "reference(resourceId('storageResourceGroup', 'Microsoft.Storage/storageAccounts', " + "'storageAccountName')), '2022-09-01')", + "reference(resourceId('Microsoft.Network/publicIPAddresses', 'ipAddressName'))"] + + expected = ["storageAccountName", "myStorage", "myStorage", "storageAccountName", "ipAddressName"] + + for i, test_case in enumerate(test_cases): + assert extract_resource_name_from_reference_func(test_case) == expected[i]