Skip to content

Commit

Permalink
Rework a few GCP detections to use deep_walk (#841)
Browse files Browse the repository at this point in the history
Co-authored-by: Ed⁦ <[email protected]>
  • Loading branch information
Evan Gibler and Ed⁦ authored Jul 10, 2023
1 parent 9bbd2e1 commit c03c858
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 53 deletions.
14 changes: 7 additions & 7 deletions global_helpers/panther_base_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def deep_get(dictionary: dict, *keys, default=None):

# pylint: disable=too-complex,too-many-return-statements
def deep_walk(
obj: Optional[Any], *keys: str, default: str = None, return_val: str = "all"
obj: Optional[Any], *keys: str, default: Optional[str] = None, return_val: str = "all"
) -> Union[Optional[Any], Optional[List[Any]]]:
"""Safely retrieve a value stored in complex dictionary structure
Expand Down Expand Up @@ -343,7 +343,7 @@ def _empty_list(sub_obj: Any):
return default if _empty_list(obj) else obj

current_key = keys[0]
found = OrderedDict()
found: OrderedDict = OrderedDict()

if isinstance(obj, Mapping):
next_key = obj.get(current_key, None)
Expand All @@ -362,13 +362,13 @@ def _empty_list(sub_obj: Any):
else:
found[value] = None

found = list(found.keys())
if not found:
found_list: list[Any] = list(found.keys())
if not found_list:
return default
return {
"first": found[0],
"last": found[-1],
"all": found[0] if len(found) == 1 else found,
"first": found_list[0],
"last": found_list[-1],
"all": found_list[0] if len(found_list) == 1 else found_list,
}.get(return_val, "all")


Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from panther_base_helpers import deep_get
from panther_base_helpers import deep_get, deep_walk


def rule(event):
if event.get("severity", "") == "ERROR":
if deep_get(event, "protoPayload", "status", "code") == 7:
details = deep_get(event, "protoPayload", "status", "details", default=[])
for detail in details:
violations = deep_get(detail, "violations", default=[])
for violation in violations:
if violation.get("type", "") == "VPC_SERVICE_CONTROLS":
return True
severity = deep_get(event, "severity", default="")
status_code = deep_get(event, "protoPayload", "status", "code", default="")
violation_types = deep_walk(
event, "protoPayload", "status", "details", "violations", "type", default=[]
)
if all(
[
severity == "ERROR",
status_code == 7,
"VPC_SERVICE_CONTROLS" in violation_types,
]
):
return True
return False


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ Tests:
violations:
- description: gBc-wuGVCapNMnTUePoHos_VyJmr3CsMKlr48kVa4b6XpsT_OWKRng
type: VPC_SERVICE_CONTROLS
- description: gCc-wuJa334DJ9940ssdiw_V8400skgjj3912500sldgjzh_LGJANr
type: OTHER_CONTROL_VIOLATION
message: 'Request is prohibited by organization''s policy. vpcServiceControlsUniqueIdentifier: gBc-wuGVCapNMnTUePoHos_VyJmr3CsMKlr48kVa4b6XpsT_OWKRng'
receiveTimestamp: "2023-03-09T16:28:42.567340480Z"
resource:
Expand Down
8 changes: 2 additions & 6 deletions rules/gcp_audit_rules/gcp_log_bucket_or_sink_deleted.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import re

from gcp_base_helpers import gcp_alert_context
from panther_base_helpers import deep_get
from panther_base_helpers import deep_get, deep_walk


def rule(event):
authenticated = deep_get(
deep_get(event, "protoPayload", "authorizationInfo", default=[{}])[0],
"granted",
default=False,
)
authenticated = deep_walk(event, "protoPayload", "authorizationInfo", "granted", default=False)
method_pattern = r"(?:\w+\.)*v\d\.(?:ConfigServiceV\d\.(?:Delete(Bucket|Sink)))"
match = re.search(method_pattern, deep_get(event, "protoPayload", "methodName", default=""))
return authenticated and match is not None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from panther_base_helpers import deep_get
from panther_base_helpers import deep_get, deep_walk

SERVICE_ACCOUNT_MANAGE_ROLES = [
"roles/iam.serviceAccountTokenCreator",
Expand All @@ -8,17 +8,27 @@

def rule(event):
if "SetIAMPolicy" in deep_get(event, "protoPayload", "methodName", default=""):
binding_deltas = deep_get(
event, "protoPayload", "serviceData", "policyDelta", "bindingDeltas", default=[{}]
role = deep_walk(
event,
"ProtoPayload",
"serviceData",
"policyDelta",
"bindingDeltas",
"role",
default="",
return_val="last",
)
for binding_delta in binding_deltas:
if all(
[
binding_delta.get("role", "") in SERVICE_ACCOUNT_MANAGE_ROLES,
binding_delta.get("action", "") == "ADD",
]
):
return True
action = deep_walk(
event,
"ProtoPayload",
"serviceData",
"policyDelta",
"bindingDeltas",
"action",
default="",
return_val="last",
)
return role in SERVICE_ACCOUNT_MANAGE_ROLES and action == "ADD"
return False


Expand Down
49 changes: 29 additions & 20 deletions rules/gcp_k8s_rules/gcp_k8s_exec_into_pod.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
from gcp_base_helpers import get_k8s_info
from gcp_environment import PRODUCTION_PROJECT_IDS, rule_exceptions
from panther_base_helpers import deep_get
from panther_base_helpers import deep_walk


def rule(event):
# Defaults to False (no alert) unless method is exec and principal not allowed
if not deep_get(
event, "protoPayload", "methodName"
) == "io.k8s.core.v1.pods.exec.create" or not deep_get(event, "protoPayload", "resourceName"):
if not any(
[
deep_walk(event, "protoPayload", "methodName") == "io.k8s.core.v1.pods.exec.create",
deep_walk(event, "protoPayload", "resourceName"),
]
):
return False

k8s_info = get_k8s_info(event)
principal = k8s_info["principal"]
namespace = k8s_info["namespace"]
project_id = k8s_info["project_id"]
principal = deep_walk(k8s_info, "principal", default="<NO PRINCIPAL>")
namespace = deep_walk(k8s_info, "namespace", default="<NO NAMESPACE>")
project_id = deep_walk(k8s_info, "project_id", default="<NO PROJECT_ID>")
# rule_exceptions that are allowed temporarily are defined in gcp_environment.py
# Some execs have principal which is long numerical UUID, appears to be k8s internals
for allowed_principal in rule_exceptions["gcp_k8s_exec_into_pod"]["allowed_principals"]:
for allowed_principal in deep_walk(
rule_exceptions, "gcp_k8s_exec_into_pod", "allowed_principals", default=[]
):
allowed_principals = deep_walk(allowed_principal, "principals", default=[])
allowed_namespaces = deep_walk(allowed_principal, "namespaces", default=[])
allowed_project_ids = deep_walk(allowed_principal, "projects", default=[])
if (
principal in allowed_principal["principals"]
and (
not allowed_principal["namespaces"] or namespace in allowed_principal["namespaces"]
)
and (not allowed_principal["projects"] or project_id in allowed_principal["projects"])
principal in allowed_principals
and (namespace in allowed_namespaces or allowed_namespaces == [])
and (project_id in allowed_project_ids or allowed_project_ids == [])
):
# nested if since without we get linting error R0916
if principal.find("@") == -1:
if "@" not in principal:
return False
return True


def severity(event):
project_id = get_k8s_info(event)["project_id"]
project_id = deep_walk(get_k8s_info(event), "project_id", default="<NO PROJECT_ID>")
if project_id in PRODUCTION_PROJECT_IDS:
return "high"
return "info"
Expand All @@ -40,10 +45,14 @@ def severity(event):
def title(event):
# TODO: use unified data model field in title for actor
k8s_info = get_k8s_info(event)
principal = k8s_info["principal"]
project_id = k8s_info["project_id"]
pod = k8s_info["pod"]
namespace = k8s_info["namespace"]
principal = deep_walk(k8s_info, "principal", default="<NO PRINCIPAL>")
project_id = deep_walk(
k8s_info,
"project_id",
default="",
)
pod = deep_walk(k8s_info, "pod", default="")
namespace = deep_walk(k8s_info, "namespace", default="")
return f"Exec into pod namespace/{namespace}/pod/{pod} by {principal} in {project_id}"


Expand Down

0 comments on commit c03c858

Please sign in to comment.