Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arm): add variable and parameters edges and rendering #6787

Merged
merged 15 commits into from
Oct 22, 2024
3 changes: 2 additions & 1 deletion checkov/arm/graph_builder/graph_components/block_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@

@dataclass
class BlockType(CommonBlockType):
PARAMETER: Literal["parameter"] = "parameter"
PARAMETER: Literal["parameters"] = "parameters"
VARIABLE: Literal["variables"] = "variables"
69 changes: 66 additions & 3 deletions checkov/arm/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import logging
import re
from typing import Any, TYPE_CHECKING

from checkov.arm.graph_builder.graph_components.block_types import BlockType
from checkov.arm.graph_builder.graph_components.blocks import ArmBlock
from checkov.arm.graph_builder.variable_rendering.renderer import ArmVariableRenderer
from checkov.arm.utils import ArmElements
from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.graph.graph_builder import CustomAttributes, Edge
from checkov.common.graph.graph_builder.local_graph import LocalGraph
from checkov.common.util.consts import START_LINE, END_LINE
from checkov.common.util.data_structures_utils import pickle_deepcopy
Expand All @@ -21,23 +23,57 @@ def __init__(self, definitions: dict[str, dict[str, Any]]) -> None:
self.vertices: list[ArmBlock] = []
self.definitions = definitions
self.vertices_by_path_and_id: dict[tuple[str, str], int] = {}
self.vertices_by_name: dict[str, int] = {}

def build_graph(self, render_variables: bool = False) -> None:
self._create_vertices()
logging.debug(f"[ArmLocalGraph] created {len(self.vertices)} vertices")

self._create_edges()
logging.debug(f"[ArmLocalGraph] created {len(self.edges)} edges")
if render_variables:
renderer = ArmVariableRenderer(self)
renderer.render_variables_from_local_graph()

def _create_vertices(self) -> None:
for file_path, definition in self.definitions.items():
self._create_parameter_vertices(file_path=file_path, parameters=definition.get(ArmElements.PARAMETERS))
self._create_resource_vertices(file_path=file_path, resources=definition.get(ArmElements.RESOURCES))
self._create_variables_vertices(file_path=file_path, variables=definition.get(ArmElements.VARIABLES))

for i, vertex in enumerate(self.vertices):
self.vertices_by_block_type[vertex.block_type].append(i)
self.vertices_block_name_map[vertex.block_type][vertex.name].append(i)
self.vertices_by_path_and_id[(vertex.path, vertex.id)] = i
self.vertices_by_name[vertex.name] = i

self.in_edges[i] = []
self.out_edges[i] = []

def _create_variables_vertices(self, file_path: str, variables: dict[str, dict[str, Any]] | None) -> None:
if not variables:
return

for name, conf in variables.items():
if name in ['__startline__', '__endline__']:
lirshindalman marked this conversation as resolved.
Show resolved Hide resolved
continue
if not isinstance(conf, dict):
full_conf = {"value": pickle_deepcopy(conf)}
else:
full_conf = conf
config = pickle_deepcopy(full_conf)
attributes = pickle_deepcopy(full_conf)

self.vertices.append(
ArmBlock(
name=name,
config=config,
path=file_path,
block_type=BlockType.VARIABLE,
attributes=attributes,
id=f"{BlockType.VARIABLE}.{name}",
)
)

def _create_parameter_vertices(self, file_path: str, parameters: dict[str, dict[str, Any]] | None) -> None:
if not parameters:
Expand Down Expand Up @@ -90,8 +126,35 @@ def _create_resource_vertices(self, file_path: str, resources: list[dict[str, An
)

def _create_edges(self) -> None:
# no edges yet
pass
self._create_vars_and_parameters_edges()
# todo add explicit references edges

def _create_edge(self, element_name: str, origin_vertex_index: int, label: str) -> None:
vertex_name = element_name
if "." in vertex_name:
# special case for`bicep elements, when properties are accessed
lirshindalman marked this conversation as resolved.
Show resolved Hide resolved
vertex_name = vertex_name.split(".")[0]

dest_vertex_index = self.vertices_by_name.get(vertex_name)
if dest_vertex_index or dest_vertex_index == 0:
if origin_vertex_index == dest_vertex_index:
return
edge = Edge(origin_vertex_index, dest_vertex_index, label)
self.edges.append(edge)
self.out_edges[origin_vertex_index].append(edge)
self.in_edges[dest_vertex_index].append(edge)

def _create_vars_and_parameters_edges(self) -> None:
pattern = r"(variables|parameters)\('(\w+)'\)"
for origin_vertex_index, vertex in enumerate(self.vertices):
for attr_key, attr_value in vertex.attributes.items():
if not isinstance(attr_value, str):
continue
if ArmElements.VARIABLES in attr_value or ArmElements.PARAMETERS in attr_value:
matches = re.findall(pattern, attr_value)
for match in matches:
var_name = match[1]
self._create_edge(var_name, origin_vertex_index, attr_key)

def update_vertices_configs(self) -> None:
# not used
Expand Down
Empty file.
64 changes: 64 additions & 0 deletions checkov/arm/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from checkov.arm.graph_builder.graph_components.block_types import BlockType
from checkov.common.graph.graph_builder import Edge
from checkov.common.graph.graph_builder.utils import adjust_value
from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer
from checkov.common.util.data_structures_utils import pickle_deepcopy

if TYPE_CHECKING:
from checkov.arm.graph_builder.local_graph import ArmLocalGraph


class ArmVariableRenderer(VariableRenderer["ArmLocalGraph"]):
def __init__(self, local_graph: ArmLocalGraph) -> None:
super().__init__(local_graph)

def _render_variables_from_vertices(self) -> None:
pass

def evaluate_vertex_attribute_from_edge(self, edge_list: list[Edge]) -> None:
origin_vertex_attributes = self.local_graph.vertices[edge_list[0].origin].attributes
val_to_eval = pickle_deepcopy(origin_vertex_attributes.get(edge_list[0].label, ""))
attr_path = ''
for edge in edge_list:
attr_path, attr_value = self.extract_dest_attribute_path_and_value(dest_index=edge.dest,
origin_value=val_to_eval)
'''if the arg start with '[parameters'/ '[variables' its mean we need to eval the all attribute
like here - "addressPrefix": "[parameters('subnetAddressPrefix')]" '''
if len(edge_list) == 1 and val_to_eval.startswith(("[parameters", "[variables")):
val_to_eval = attr_value
continue
'''
if the value i need to eval is part of the full attribute like "[format('{0}/{1}', parameters('vnetName'), variables('subnetName'))]"
or "[resourceId('Microsoft.Network/networkProfiles', variables('networkProfileName'))]".
vertices[edge.dest].id = variables.networkProfileName -> variables('networkProfileName')
'''
val_to_replace = self.local_graph.vertices[edge.dest].id.replace(".", "('") + "')"
val_to_eval = val_to_eval.replace(val_to_replace, attr_value)

self.local_graph.update_vertex_attribute(
vertex_index=edge_list[0].origin,
attribute_key=edge_list[0].label,
attribute_value=val_to_eval,
change_origin_id=edge_list[0].dest,
attribute_at_dest=attr_path,
)

def extract_dest_attribute_path_and_value(self, dest_index: int, origin_value: Any) -> tuple[str, Any] | tuple[None, None]:
vertex = self.local_graph.vertices[dest_index]
if vertex.block_type == BlockType.PARAMETER:
new_value = vertex.attributes.get("defaultValue")
if new_value:
new_value = adjust_value(element_name=origin_value, value=new_value)
return "defaultValue", new_value
elif vertex.block_type == BlockType.VARIABLE:
new_value = adjust_value(element_name=origin_value, value=vertex.attributes["value"])
return "value", new_value
return None, None

def evaluate_non_rendered_values(self) -> None:
lirshindalman marked this conversation as resolved.
Show resolved Hide resolved
# not used
pass
2 changes: 1 addition & 1 deletion checkov/bicep/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from checkov.bicep.graph_builder.graph_components.block_types import BlockType
from checkov.bicep.graph_builder.graph_components.blocks import BicepBlock
from checkov.bicep.graph_builder.variable_rendering.renderer import BicepVariableRenderer
from checkov.bicep.utils import adjust_value
from checkov.common.graph.graph_builder.graph_components.edge import Edge
from checkov.common.graph.graph_builder.local_graph import LocalGraph
from checkov.common.graph.graph_builder.utils import filter_sub_keys
from checkov.common.graph.graph_builder.utils import adjust_value
from checkov.common.util.data_structures_utils import pickle_deepcopy
from checkov.common.util.type_forcers import force_int

Expand Down
2 changes: 1 addition & 1 deletion checkov/bicep/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from pycep.transformer import BicepElement

from checkov.bicep.graph_builder.graph_components.block_types import BlockType
from checkov.bicep.utils import adjust_value
from checkov.common.graph.graph_builder import Edge
from checkov.common.graph.graph_builder.utils import adjust_value
from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer
from checkov.common.util.data_structures_utils import pickle_deepcopy

Expand Down
25 changes: 1 addition & 24 deletions checkov/bicep/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re
from collections.abc import Collection
from pathlib import Path
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING

from checkov.common.runners.base_runner import filter_ignored_paths
from checkov.runner_filter import RunnerFilter
Expand Down Expand Up @@ -88,26 +88,3 @@ def create_definitions(
logging.warning(f"[bicep] found errors while parsing definitions: {parsing_errors}")

return definitions, definitions_raw


def adjust_value(element_name: str, value: Any) -> Any:
"""Adjusts the value, if the 'element_name' references a nested key

Ex:
element_name = publicKey.keyData
value = {"keyData": "key-data", "path": "path"}

returns new_value = "key-data"
"""

if "." in element_name and isinstance(value, dict):
key_parts = element_name.split(".")
new_value = value.get(key_parts[1])

if new_value is None:
# couldn't find key in in value object
return None

return adjust_value(".".join(key_parts[1:]), new_value)

return value
23 changes: 23 additions & 0 deletions checkov/common/graph/graph_builder/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,26 @@ def filter_sub_keys(key_list: list[str]) -> list[str]:

def is_include_dup_dynamic(key: str, list_keys: list[str]) -> bool:
return f"dynamic.{key.split('.')[0]}" not in list_keys


def adjust_value(element_name: str, value: Any) -> Any:
"""Adjusts the value, if the 'element_name' references a nested key

Ex:
element_name = publicKey.keyData
value = {"keyData": "key-data", "path": "path"}

returns new_value = "key-data"
"""

if "." in element_name and isinstance(value, dict):
key_parts = element_name.split(".")
new_value = value.get(key_parts[1])

if new_value is None:
# couldn't find key in in value object
return None

return adjust_value(".".join(key_parts[1:]), new_value)

return value
5 changes: 3 additions & 2 deletions tests/arm/graph_builder/test_local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ def test_build_graph():
local_graph.build_graph(render_variables=False)

# then
assert len(local_graph.vertices) == 15
assert len(local_graph.edges) == 0
assert len(local_graph.vertices) == 18
assert len(local_graph.edges) == 20

assert len(local_graph.vertices_by_block_type[BlockType.PARAMETER]) == 11
assert len(local_graph.vertices_by_block_type[BlockType.RESOURCE]) == 4
assert len(local_graph.vertices_by_block_type[BlockType.VARIABLE]) == 3
48 changes: 48 additions & 0 deletions tests/arm/randerer/test_rendering.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"metadata": {
"_generator": {
"name": "bicep",
"version": "0.5.6.12127",
"templateHash": "9027093124117826122"
}
},
"parameters": {
"vnetName": {
"type": "string",
"defaultValue": "aci-vnet",
"metadata": {
"description": "VNet name"
}
},
"vnetAddressPrefix": {
"type": "string",
"defaultValue": "10.0.0.0/16",
"metadata": {
"description": "Address prefix"
}
}
},
"variables": {
"networkProfileName": "aci-networkProfile",
"location": "eth0"

},
"resources": [
{
"type": "Microsoft.Network/virtualNetworks",
"apiVersion": "2020-11-01",
"name": "[format('{0}/{1}', parameters('vnetName'), variables('networkProfileName'))]",
"location": "[variables('location')]",
"id": "[resourceId('Microsoft.Network/networkProfiles', variables('networkProfileName'))]",
"properties": {
"addressSpace": {
"addressPrefixes": [
"[parameters('vnetAddressPrefix')]"
]
}
}
}
]
}
25 changes: 25 additions & 0 deletions tests/arm/randerer/test_rendering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import unittest

from checkov.arm.graph_builder.local_graph import ArmLocalGraph
from checkov.arm.utils import get_files_definitions


def test_rander_vars():
# given
test_file = "test_rendering.json"
definitions, _, _ = get_files_definitions([str(test_file)])

local_graph = ArmLocalGraph(definitions=definitions)

# when
local_graph.build_graph(render_variables=True)

# then
assert len(local_graph.vertices) == 5
assert len(local_graph.edges) == 5
assert local_graph.vertices[2].attributes['name'] == "[format('{0}/{1}', aci-vnet, aci-networkProfile)]"
assert local_graph.vertices[2].attributes['id'] == "[resourceId('Microsoft.Network/networkProfiles', aci-networkProfile)]"
assert local_graph.vertices[2].attributes['location'] == "eth0"
assert local_graph.vertices[2].attributes['properties.addressSpace.addressPrefixes.0'] == "10.0.0.0/16"
assert local_graph.vertices[2].attributes['properties']['addressSpace']['addressPrefixes'][0] == "10.0.0.0/16"

Loading