Skip to content

Commit

Permalink
feat(arm): add variable and parameters edges and rendering (#6787)
Browse files Browse the repository at this point in the history
* vars and parameters edges

* vars and parameters edges

* add tests

* add tests

* add tests

* lint

* lint

* lint

* lint

* lint

* lint

* .

* .

* .
  • Loading branch information
lirshindalman authored Oct 22, 2024
1 parent 1da7e33 commit e2fa39d
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 34 deletions.
3 changes: 2 additions & 1 deletion checkov/arm/graph_builder/graph_components/block_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@

@dataclass
class BlockType(CommonBlockType):
PARAMETER: Literal["parameter"] = "parameter"
PARAMETER: Literal["parameters"] = "parameters"
VARIABLE: Literal["variables"] = "variables"
69 changes: 66 additions & 3 deletions checkov/arm/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import logging
import re
from typing import Any, TYPE_CHECKING

from checkov.arm.graph_builder.graph_components.block_types import BlockType
from checkov.arm.graph_builder.graph_components.blocks import ArmBlock
from checkov.arm.graph_builder.variable_rendering.renderer import ArmVariableRenderer
from checkov.arm.utils import ArmElements
from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.graph.graph_builder import CustomAttributes, Edge
from checkov.common.graph.graph_builder.local_graph import LocalGraph
from checkov.common.util.consts import START_LINE, END_LINE
from checkov.common.util.data_structures_utils import pickle_deepcopy
Expand All @@ -21,23 +23,57 @@ def __init__(self, definitions: dict[str, dict[str, Any]]) -> None:
self.vertices: list[ArmBlock] = []
self.definitions = definitions
self.vertices_by_path_and_id: dict[tuple[str, str], int] = {}
self.vertices_by_name: dict[str, int] = {}

def build_graph(self, render_variables: bool = False) -> None:
self._create_vertices()
logging.debug(f"[ArmLocalGraph] created {len(self.vertices)} vertices")

self._create_edges()
logging.debug(f"[ArmLocalGraph] created {len(self.edges)} edges")
if render_variables:
renderer = ArmVariableRenderer(self)
renderer.render_variables_from_local_graph()

def _create_vertices(self) -> None:
for file_path, definition in self.definitions.items():
self._create_parameter_vertices(file_path=file_path, parameters=definition.get(ArmElements.PARAMETERS))
self._create_resource_vertices(file_path=file_path, resources=definition.get(ArmElements.RESOURCES))
self._create_variables_vertices(file_path=file_path, variables=definition.get(ArmElements.VARIABLES))

for i, vertex in enumerate(self.vertices):
self.vertices_by_block_type[vertex.block_type].append(i)
self.vertices_block_name_map[vertex.block_type][vertex.name].append(i)
self.vertices_by_path_and_id[(vertex.path, vertex.id)] = i
self.vertices_by_name[vertex.name] = i

self.in_edges[i] = []
self.out_edges[i] = []

def _create_variables_vertices(self, file_path: str, variables: dict[str, dict[str, Any]] | None) -> None:
if not variables:
return

for name, conf in variables.items():
if name in [START_LINE, END_LINE]:
continue
if not isinstance(conf, dict):
full_conf = {"value": pickle_deepcopy(conf)}
else:
full_conf = conf
config = pickle_deepcopy(full_conf)
attributes = pickle_deepcopy(full_conf)

self.vertices.append(
ArmBlock(
name=name,
config=config,
path=file_path,
block_type=BlockType.VARIABLE,
attributes=attributes,
id=f"{BlockType.VARIABLE}.{name}",
)
)

def _create_parameter_vertices(self, file_path: str, parameters: dict[str, dict[str, Any]] | None) -> None:
if not parameters:
Expand Down Expand Up @@ -90,8 +126,35 @@ def _create_resource_vertices(self, file_path: str, resources: list[dict[str, An
)

def _create_edges(self) -> None:
# no edges yet
pass
self._create_vars_and_parameters_edges()
# todo add explicit references edges

def _create_edge(self, element_name: str, origin_vertex_index: int, label: str) -> None:
vertex_name = element_name
if "." in vertex_name:
# special case for bicep and arm elements, when properties are accessed
vertex_name = vertex_name.split(".")[0]

dest_vertex_index = self.vertices_by_name.get(vertex_name)
if dest_vertex_index or dest_vertex_index == 0:
if origin_vertex_index == dest_vertex_index:
return
edge = Edge(origin_vertex_index, dest_vertex_index, label)
self.edges.append(edge)
self.out_edges[origin_vertex_index].append(edge)
self.in_edges[dest_vertex_index].append(edge)

def _create_vars_and_parameters_edges(self) -> None:
pattern = r"(variables|parameters)\('(\w+)'\)"
for origin_vertex_index, vertex in enumerate(self.vertices):
for attr_key, attr_value in vertex.attributes.items():
if not isinstance(attr_value, str):
continue
if ArmElements.VARIABLES in attr_value or ArmElements.PARAMETERS in attr_value:
matches = re.findall(pattern, attr_value)
for match in matches:
var_name = match[1]
self._create_edge(var_name, origin_vertex_index, attr_key)

def update_vertices_configs(self) -> None:
# not used
Expand Down
Empty file.
64 changes: 64 additions & 0 deletions checkov/arm/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from checkov.arm.graph_builder.graph_components.block_types import BlockType
from checkov.common.graph.graph_builder import Edge
from checkov.common.graph.graph_builder.utils import adjust_value
from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer
from checkov.common.util.data_structures_utils import pickle_deepcopy

if TYPE_CHECKING:
from checkov.arm.graph_builder.local_graph import ArmLocalGraph


class ArmVariableRenderer(VariableRenderer["ArmLocalGraph"]):
def __init__(self, local_graph: ArmLocalGraph) -> None:
super().__init__(local_graph)

def _render_variables_from_vertices(self) -> None:
# need to add rendering to function like format, reference etc
pass

def evaluate_vertex_attribute_from_edge(self, edge_list: list[Edge]) -> None:
origin_vertex_attributes = self.local_graph.vertices[edge_list[0].origin].attributes
val_to_eval = pickle_deepcopy(origin_vertex_attributes.get(edge_list[0].label, ""))
attr_path = None
for edge in edge_list:
attr_path, attr_value = self.extract_dest_attribute_path_and_value(dest_index=edge.dest,
origin_value=val_to_eval)
'''if the arg start with '[parameters'/ '[variables' its mean we need to eval the all attribute
like here - "addressPrefix": "[parameters('subnetAddressPrefix')]" '''
if len(edge_list) == 1 and val_to_eval.startswith(("[parameters", "[variables")):
val_to_eval = attr_value
continue
'''
if the value i need to eval is part of the full attribute like "[format('{0}/{1}', parameters('vnetName'), variables('subnetName'))]"
or "[resourceId('Microsoft.Network/networkProfiles', variables('networkProfileName'))]".
vertices[edge.dest].id = variables.networkProfileName -> variables('networkProfileName')
'''
val_to_replace = self.local_graph.vertices[edge.dest].id.replace(".", "('") + "')"
val_to_eval = val_to_eval.replace(val_to_replace, attr_value)

self.local_graph.update_vertex_attribute(
vertex_index=edge_list[0].origin,
attribute_key=edge_list[0].label,
attribute_value=val_to_eval,
change_origin_id=edge_list[0].dest,
attribute_at_dest=attr_path,
)

def extract_dest_attribute_path_and_value(self, dest_index: int, origin_value: Any) -> tuple[str, Any] | tuple[None, None]:
vertex = self.local_graph.vertices[dest_index]
if vertex.block_type == BlockType.PARAMETER:
new_value = vertex.attributes.get("defaultValue")
if new_value:
new_value = adjust_value(element_name=origin_value, value=new_value)
return "defaultValue", new_value
elif vertex.block_type == BlockType.VARIABLE:
new_value = adjust_value(element_name=origin_value, value=vertex.attributes["value"])
return "value", new_value
return None, None

def evaluate_non_rendered_values(self) -> None:
pass
2 changes: 1 addition & 1 deletion checkov/bicep/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from checkov.bicep.graph_builder.graph_components.block_types import BlockType
from checkov.bicep.graph_builder.graph_components.blocks import BicepBlock
from checkov.bicep.graph_builder.variable_rendering.renderer import BicepVariableRenderer
from checkov.bicep.utils import adjust_value
from checkov.common.graph.graph_builder.graph_components.edge import Edge
from checkov.common.graph.graph_builder.local_graph import LocalGraph
from checkov.common.graph.graph_builder.utils import filter_sub_keys
from checkov.common.graph.graph_builder.utils import adjust_value
from checkov.common.util.data_structures_utils import pickle_deepcopy
from checkov.common.util.type_forcers import force_int

Expand Down
2 changes: 1 addition & 1 deletion checkov/bicep/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from pycep.transformer import BicepElement

from checkov.bicep.graph_builder.graph_components.block_types import BlockType
from checkov.bicep.utils import adjust_value
from checkov.common.graph.graph_builder import Edge
from checkov.common.graph.graph_builder.utils import adjust_value
from checkov.common.graph.graph_builder.variable_rendering.renderer import VariableRenderer
from checkov.common.util.data_structures_utils import pickle_deepcopy

Expand Down
25 changes: 1 addition & 24 deletions checkov/bicep/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re
from collections.abc import Collection
from pathlib import Path
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING

from checkov.common.runners.base_runner import filter_ignored_paths
from checkov.runner_filter import RunnerFilter
Expand Down Expand Up @@ -88,26 +88,3 @@ def create_definitions(
logging.warning(f"[bicep] found errors while parsing definitions: {parsing_errors}")

return definitions, definitions_raw


def adjust_value(element_name: str, value: Any) -> Any:
"""Adjusts the value, if the 'element_name' references a nested key
Ex:
element_name = publicKey.keyData
value = {"keyData": "key-data", "path": "path"}
returns new_value = "key-data"
"""

if "." in element_name and isinstance(value, dict):
key_parts = element_name.split(".")
new_value = value.get(key_parts[1])

if new_value is None:
# couldn't find key in in value object
return None

return adjust_value(".".join(key_parts[1:]), new_value)

return value
23 changes: 23 additions & 0 deletions checkov/common/graph/graph_builder/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,26 @@ def filter_sub_keys(key_list: list[str]) -> list[str]:

def is_include_dup_dynamic(key: str, list_keys: list[str]) -> bool:
return f"dynamic.{key.split('.')[0]}" not in list_keys


def adjust_value(element_name: str, value: Any) -> Any:
"""Adjusts the value, if the 'element_name' references a nested key
Ex:
element_name = publicKey.keyData
value = {"keyData": "key-data", "path": "path"}
returns new_value = "key-data"
"""

if "." in element_name and isinstance(value, dict):
key_parts = element_name.split(".")
new_value = value.get(key_parts[1])

if new_value is None:
# couldn't find key in in value object
return None

return adjust_value(".".join(key_parts[1:]), new_value)

return value
5 changes: 3 additions & 2 deletions tests/arm/graph_builder/test_local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ def test_build_graph():
local_graph.build_graph(render_variables=False)

# then
assert len(local_graph.vertices) == 15
assert len(local_graph.edges) == 0
assert len(local_graph.vertices) == 18
assert len(local_graph.edges) == 20

assert len(local_graph.vertices_by_block_type[BlockType.PARAMETER]) == 11
assert len(local_graph.vertices_by_block_type[BlockType.RESOURCE]) == 4
assert len(local_graph.vertices_by_block_type[BlockType.VARIABLE]) == 3
Empty file added tests/arm/rendering/__init__.py
Empty file.
48 changes: 48 additions & 0 deletions tests/arm/rendering/test_rendering.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"metadata": {
"_generator": {
"name": "bicep",
"version": "0.5.6.12127",
"templateHash": "9027093124117826122"
}
},
"parameters": {
"vnetName": {
"type": "string",
"defaultValue": "aci-vnet",
"metadata": {
"description": "VNet name"
}
},
"vnetAddressPrefix": {
"type": "string",
"defaultValue": "10.0.0.0/16",
"metadata": {
"description": "Address prefix"
}
}
},
"variables": {
"networkProfileName": "aci-networkProfile",
"location": "eth0"

},
"resources": [
{
"type": "Microsoft.Network/virtualNetworks",
"apiVersion": "2020-11-01",
"name": "[format('{0}/{1}', parameters('vnetName'), variables('networkProfileName'))]",
"location": "[variables('location')]",
"id": "[resourceId('Microsoft.Network/networkProfiles', variables('networkProfileName'))]",
"properties": {
"addressSpace": {
"addressPrefixes": [
"[parameters('vnetAddressPrefix')]"
]
}
}
}
]
}
24 changes: 24 additions & 0 deletions tests/arm/rendering/test_rendering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from pathlib import Path

from checkov.arm.graph_builder.local_graph import ArmLocalGraph
from checkov.arm.utils import get_files_definitions

EXAMPLES_DIR = Path(__file__).parent

def test_rander_vars():
# given
test_file = EXAMPLES_DIR / "test_rendering.json"
definitions, _, _ = get_files_definitions([str(test_file)])
local_graph = ArmLocalGraph(definitions=definitions)
# when
local_graph.build_graph(render_variables=True)

# then
assert len(local_graph.vertices) == 5
assert len(local_graph.edges) == 5
assert local_graph.vertices[2].attributes['name'] == "[format('{0}/{1}', aci-vnet, aci-networkProfile)]"
assert local_graph.vertices[2].attributes['id'] == "[resourceId('Microsoft.Network/networkProfiles', aci-networkProfile)]"
assert local_graph.vertices[2].attributes['location'] == "eth0"
assert local_graph.vertices[2].attributes['properties.addressSpace.addressPrefixes.0'] == "10.0.0.0/16"
assert local_graph.vertices[2].attributes['properties']['addressSpace']['addressPrefixes'][0] == "10.0.0.0/16"

4 changes: 2 additions & 2 deletions tests/arm/test_graph_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def test_build_graph_from_definitions(graph_connector):
local_graph = graph_manager.build_graph_from_definitions(definitions=definitions)

# then
assert len(local_graph.vertices) == 15
assert len(local_graph.edges) == 0
assert len(local_graph.vertices) == 18
assert len(local_graph.edges) == 20

# resource name will change, when variable rendering is supported
container_idx = local_graph.vertices_by_path_and_id[(test_file, "Microsoft.ContainerInstance/containerGroups.[parameters('containerGroupName')]")]
Expand Down

0 comments on commit e2fa39d

Please sign in to comment.