Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
omriyoffe-panw committed Oct 27, 2024
1 parent fe5a7e9 commit 4fbdaeb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
21 changes: 12 additions & 9 deletions checkov/arm/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
if TYPE_CHECKING:
from checkov.common.graph.graph_builder.local_graph import _Block

DEPENDS_ON_FIELD = 'dependsOn'
RESOURCE_ID_FUNC = 'resourceId('
REFERENCE_FUNC = 'reference('

class ArmLocalGraph(LocalGraph[ArmBlock]):
def __init__(self, definitions: dict[str, dict[str, Any]]) -> None:
Expand All @@ -27,15 +30,15 @@ def __init__(self, definitions: dict[str, dict[str, Any]]) -> None:

def build_graph(self, render_variables: bool = False) -> None:
self._create_vertices()
logging.debug(f"[ArmLocalGraph] created {len(self.vertices)} vertices")
logging.warning(f"[ArmLocalGraph] created {len(self.vertices)} vertices")

self._create_vars_and_parameters_edges()
if render_variables:
renderer = ArmVariableRenderer(self)
renderer.render_variables_from_local_graph()

self._create_edges()
logging.debug(f"[ArmLocalGraph] created {len(self.edges)} edges")
logging.warning(f"[ArmLocalGraph] created {len(self.edges)} edges")

def _create_vertices(self) -> None:
for file_path, definition in self.definitions.items():
Expand Down Expand Up @@ -85,7 +88,7 @@ def _create_parameter_vertices(self, file_path: str, parameters: dict[str, dict[
if name in (START_LINE, END_LINE):
continue
if not isinstance(config, dict):
logging.debug(f"[ArmLocalGraph] parameter {name} has wrong type {type(config)}")
logging.warning(f"[ArmLocalGraph] parameter {name} has wrong type {type(config)}")
continue

attributes = pickle_deepcopy(config)
Expand Down Expand Up @@ -129,13 +132,13 @@ def _create_resource_vertices(self, file_path: str, resources: list[dict[str, An

def _create_edges(self) -> None:
for origin_vertex_index, vertex in enumerate(self.vertices):
if 'dependsOn' in vertex.attributes:
if DEPENDS_ON_FIELD in vertex.attributes:
self._create_explicit_edge(origin_vertex_index, vertex.name, vertex.attributes['dependsOn'])
self._create_implicit_edges(origin_vertex_index, vertex.name, vertex.attributes)

def _create_explicit_edge(self, origin_vertex_index: int, resource_name: str, deps: list[str]) -> None:
for dep in deps:
if 'resourceId' in dep:
if RESOURCE_ID_FUNC in dep:
processed_dep = extract_resource_name_from_resource_id_func(dep)
else:
processed_dep = dep.split('/')[-1]
Expand All @@ -144,7 +147,7 @@ def _create_explicit_edge(self, origin_vertex_index: int, resource_name: str, de
self._create_edge(processed_dep, origin_vertex_index, f'{resource_name}->{processed_dep}')
else:
# Dependency not found
logging.debug(f"[ArmLocalGraph] resource dependency {processed_dep} defined in {dep} for resource"
logging.warning(f"[ArmLocalGraph] resource dependency {processed_dep} defined in {dep} for resource"
f" {resource_name} not found")
continue

Expand All @@ -169,10 +172,10 @@ def _create_edge(self, element_name: str, origin_vertex_index: int, label: str)
self.out_edges[origin_vertex_index].append(edge)
self.in_edges[dest_vertex_index].append(edge)

def _create_implicit_edges(self, origin_vertex_index: int, resource_name: str, d: dict[str, Any]) -> None:
for _, value in d.items():
def _create_implicit_edges(self, origin_vertex_index: int, resource_name: str, resource: dict[str, Any]) -> None:
for value in resource.values():
if isinstance(value, str):
if 'reference(' in value:
if REFERENCE_FUNC in value:
self._create_implicit_edge(origin_vertex_index, resource_name, value)

def _create_implicit_edge(self, origin_vertex_index: int, resource_name: str, reference_string: str) -> None:
Expand Down
2 changes: 1 addition & 1 deletion checkov/arm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def extract_resource_name_from_reference_func(reference: str) -> str:
reference(resourceId('storageResourceGroup', 'Microsoft.Storage/storageAccounts', 'storageAccountName')), '2022-09-01') -> storageAccountName
reference(resourceId('Microsoft.Network/publicIPAddresses', 'ipAddressName')) -> ipAddressName
'''
resource_name = ''.join(reference.split('reference(', 1)[1].split(')')[:-1])
resource_name = ')'.join(reference.split('reference(', 1)[1].split(')')[:-1])
if 'resourceId' in resource_name:
return clean_string(
''.join(resource_name.split('resourceId(', 1)[1].split(')')[0]).split(',')[-1])
Expand Down
16 changes: 15 additions & 1 deletion tests/arm/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pathlib import Path

from checkov.arm.utils import get_files_definitions
from checkov.arm.utils import get_files_definitions, extract_resource_name_from_reference_func


def test_get_files_definitions_with_parsing_error():
Expand All @@ -15,3 +15,17 @@ def test_get_files_definitions_with_parsing_error():
assert definitions_raw == {}
assert len(parsing_errors) == 1
assert parsing_errors[0].endswith("parser/examples/json/with_comments.json")


def test_extract_resource_name_from_reference_func():
test_cases = ["reference('storageAccountName')",
"reference('myStorage').primaryEndpoints",
"reference('myStorage', '2022-09-01', 'Full').location",
"reference(resourceId('storageResourceGroup', 'Microsoft.Storage/storageAccounts', "
"'storageAccountName')), '2022-09-01')",
"reference(resourceId('Microsoft.Network/publicIPAddresses', 'ipAddressName'))"]

expected = ["storageAccountName", "myStorage", "myStorage", "storageAccountName", "ipAddressName"]

for i, test_case in enumerate(test_cases):
assert extract_resource_name_from_reference_func(test_case) == expected[i]

0 comments on commit 4fbdaeb

Please sign in to comment.