Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gcp][feat] Add SCC service collection #2291

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
compute,
container,
billing,
scc,
sqladmin,
storage,
aiplatform,
Expand Down Expand Up @@ -38,6 +39,7 @@
+ filestore.resources
+ cloudfunctions.resources
+ pubsub.resources
+ scc.resources
)


Expand Down Expand Up @@ -134,6 +136,10 @@ def get_last_run() -> Optional[datetime]:
global_builder.submit_work(self.collect_region, global_builder.for_region(region))
global_builder.executor.wait_for_submitted_work()

# call all registered after collect hooks
for after_collect in global_builder.after_collect_actions:
after_collect()

self.error_accumulator.report_all(global_builder.core_feedback)

if global_builder.config.collect_usage_metrics:
Expand Down
3 changes: 3 additions & 0 deletions plugins/gcp/fix_plugin_gcp/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
last_run_started_at: Optional[datetime] = None,
graph_nodes_access: Optional[Lock] = None,
graph_edges_access: Optional[Lock] = None,
after_collect_actions: Optional[List[Callable[[], Any]]] = None,
) -> None:
self.graph = graph
self.cloud = cloud
Expand All @@ -113,6 +114,7 @@ def __init__(
self.zone_by_name: Dict[str, GcpZone] = {}
self.graph_nodes_access = graph_nodes_access or Lock()
self.graph_edges_access = graph_edges_access or Lock()
self.after_collect_actions = after_collect_actions if after_collect_actions is not None else []

if last_run_started_at:
now = utc()
Expand Down Expand Up @@ -349,6 +351,7 @@ def for_region(self, region: GcpRegion) -> GraphBuilder:
self.last_run_started_at,
self.graph_nodes_access,
self.graph_edges_access,
after_collect_actions=self.after_collect_actions,
)


Expand Down
2 changes: 1 addition & 1 deletion plugins/gcp/fix_plugin_gcp/resources/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5419,7 +5419,7 @@ class GcpNotificationEndpointGrpcSettings:


@define(eq=False, slots=False)
class GcpNotificationEndpoint(GcpResource):
class GcpNotificationEndpoint(GcpResource, PhantomBaseResource):
kind: ClassVar[str] = "gcp_notification_endpoint"
_kind_display: ClassVar[str] = "GCP Notification Endpoint"
_kind_description: ClassVar[str] = "GCP Notification Endpoint is a Google Cloud Platform service that receives and processes notifications from various GCP resources. It acts as a central point for collecting and routing alerts, updates, and event data. Users can configure endpoints to direct notifications to specific destinations like email, SMS, or third-party applications for monitoring and response purposes." # fmt: skip
Expand Down
240 changes: 240 additions & 0 deletions plugins/gcp/fix_plugin_gcp/resources/scc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
from datetime import datetime
from functools import partial
from typing import ClassVar, Dict, Optional, List, Tuple, Type, Any

from attr import define, field

from fix_plugin_gcp.gcp_client import GcpApiSpec
from fix_plugin_gcp.resources.base import (
GcpRegion,
GcpResource,
GcpZone,
GraphBuilder,
GcpErrorHandler,
GcpProject,
GcpExpectedErrorCodes,
)
from fixlib.baseresources import SEVERITY_MAPPING, Finding, Severity
from fixlib.json_bender import Bender, S, Bend
from fixlib.types import Json


@define(eq=False, slots=False)
class GcpSourceProperties:
kind: ClassVar[str] = "gcp_source_properties"
mapping: ClassVar[Dict[str, Bender]] = {
"recommendation": S("Recommendation"),
"explanation": S("Explanation"),
}
recommendation: Optional[str] = field(default=None)
explanation: Optional[str] = field(default=None)


@define(eq=False, slots=False)
class GcpFinding:
kind: ClassVar[str] = "gcp_finding"
mapping: ClassVar[Dict[str, Bender]] = {
"severity": S("severity"),
"source_properties": S("sourceProperties", default={}) >> Bend(GcpSourceProperties.mapping),
"description": S("description"),
"event_time": S("eventTime"),
"parent_display_name": S("parentDisplayName"),
# "access": S("access", default={}) >> Bend(GcpAccess.mapping),
# "application": S("application", default={}) >> Bend(GcpApplication.mapping),
# "attack_exposure": S("attackExposure", default={}) >> Bend(GcpAttackExposure.mapping),
# "backup_disaster_recovery": S("backupDisasterRecovery", default={}) >> Bend(GcpBackupDisasterRecovery.mapping),
# "canonical_name": S("canonicalName"),
# "category": S("category"),
# "cloud_armor": S("cloudArmor", default={}) >> Bend(GcpCloudArmor.mapping),
# "cloud_dlp_data_profile": S("cloudDlpDataProfile", default={}) >> Bend(GcpCloudDlpDataProfile.mapping),
# "cloud_dlp_inspection": S("cloudDlpInspection", default={}) >> Bend(GcpCloudDlpInspection.mapping),
# "compliances": S("compliances", default=[]) >> ForallBend(GcpCompliance.mapping),
# "connections": S("connections", default=[]) >> ForallBend(GcpConnection.mapping),
# "contacts": S("contacts", default={}) >> MapDict(value_bender=Bend(GcpContactDetails.mapping)),
# "containers": S("containers", default=[]) >> ForallBend(GcpContainer.mapping),
# "create_time": S("createTime"),
# "data_access_events": S("dataAccessEvents", default=[]) >> ForallBend(GcpDataAccessEvent.mapping),
# "data_flow_events": S("dataFlowEvents", default=[]) >> ForallBend(GcpDataFlowEvent.mapping),
# "database": S("database", default={}) >> Bend(GcpDatabase.mapping),
# "exfiltration": S("exfiltration", default={}) >> Bend(GcpExfiltration.mapping),
# "external_systems": S("externalSystems", default={})
# >> MapDict(value_bender=Bend(GcpGoogleCloudSecuritycenterV1ExternalSystem.mapping)),
# "external_uri": S("externalUri"),
# "files": S("files", default=[]) >> ForallBend(GcpFile.mapping),
# "finding_class": S("findingClass"),
# "group_memberships": S("groupMemberships", default=[]) >> ForallBend(GcpGroupMembership.mapping),
# "iam_bindings": S("iamBindings", default=[]) >> ForallBend(GcpIamBinding.mapping),
# "indicator": S("indicator", default={}) >> Bend(GcpIndicator.mapping),
# "kernel_rootkit": S("kernelRootkit", default={}) >> Bend(GcpKernelRootkit.mapping),
# "kubernetes": S("kubernetes", default={}) >> Bend(GcpKubernetes.mapping),
# "load_balancers": S("loadBalancers", default=[]) >> ForallBend(S("name")),
# "log_entries": S("logEntries", default=[]) >> ForallBend(GcpLogEntry.mapping),
# "mitre_attack": S("mitreAttack", default={}) >> Bend(GcpMitreAttack.mapping),
# "module_name": S("moduleName"),
# "mute": S("mute"),
# "mute_info": S("muteInfo", default={}) >> Bend(GcpMuteInfo.mapping),
# "mute_initiator": S("muteInitiator"),
# "mute_update_time": S("muteUpdateTime"),
# "name": S("name"),
# "next_steps": S("nextSteps"),
# "notebook": S("notebook", default={}) >> Bend(GcpNotebook.mapping),
# "org_policies": S("orgPolicies", default=[]) >> ForallBend(S("name")),
# "parent": S("parent"),
# "processes": S("processes", default=[]) >> ForallBend(GcpProcess.mapping),
# "resource_name": S("resourceName"),
# "security_marks": S("securityMarks", default={}) >> Bend(GcpSecurityMarks.mapping),
# "security_posture": S("securityPosture", default={}) >> Bend(GcpSecurityPosture.mapping),
# "state": S("state"),
# "toxic_combination": S("toxicCombination", default={}) >> Bend(GcpToxicCombination.mapping),
# "vulnerability": S("vulnerability", default={}) >> Bend(GcpVulnerability.mapping),
}
description: Optional[str] = field(default=None)
event_time: Optional[datetime] = field(default=None)
parent_display_name: Optional[str] = field(default=None)
severity: Optional[str] = field(default=None)
source_properties: Optional[GcpSourceProperties] = field(default=None)


@define(eq=False, slots=False)
class GcpFindingResource:
kind: ClassVar[str] = "gcp_fingding_resource"
mapping: ClassVar[Dict[str, Bender]] = {
"cloud_provider": S("cloudProvider"),
"display_name": S("displayName"),
"location": S("location"),
# "aws_metadata": S("awsMetadata", default={}) >> Bend(GcpAwsMetadata.mapping),
# "azure_metadata": S("azureMetadata", default={}) >> Bend(GcpAzureMetadata.mapping),
# "folders": S("folders", default=[]) >> ForallBend(GcpFolder.mapping),
# "name": S("name"),
# "organization": S("organization"),
# "parent_display_name": S("parentDisplayName"),
# "parent_name": S("parentName"),
# "project_display_name": S("projectDisplayName"),
# "project_name": S("projectName"),
# "resource_path": S("resourcePath", default={}) >> Bend(GcpResourcePath.mapping),
# "resource_path_string": S("resourcePathString"),
# "service": S("service"),
# "type": S("type"),
}
cloud_provider: Optional[str] = field(default=None)
display_name: Optional[str] = field(default=None)
location: Optional[str] = field(default=None)


@define(eq=False, slots=False)
class GcpSccFinding(GcpResource):
kind: ClassVar[str] = "gcp_scc_finding"
_model_export: ClassVar[bool] = False
api_spec: ClassVar[GcpApiSpec] = GcpApiSpec(
service="securitycenter",
version="v1",
accessors=["projects", "sources", "findings"],
action="list",
request_parameter={"parent": "projects/{project}/sources/-", "filter": 'state="ACTIVE"'},
request_parameter_in={"project"},
response_path="listFindingsResults",
response_regional_sub_path=None,
)
mapping: ClassVar[Dict[str, Bender]] = {
"id": S("finding", "name"),
"tags": S("labels", default={}),
"name": S("finding", "name"),
"ctime": S("creationTimestamp"),
"finding_information": S("finding", default={}) >> Bend(GcpFinding.mapping),
"resource_information": S("resource", default={}) >> Bend(GcpFindingResource.mapping),
"state_change": S("stateChange"),
}
finding_information: Optional[GcpFinding] = field(default=None)
resource_information: Optional[GcpFindingResource] = field(default=None)
state_change: Optional[str] = field(default=None)

def parse_finding(self, source: Json) -> Optional[Finding]:
if finding := self.finding_information:
description = finding.description
if finding.source_properties:
remediation = finding.source_properties.recommendation
title = finding.source_properties.explanation or "unknown"
else:
remediation = None
title = "unknown"
source_finding = source.get("finding", {})
source_resource = source.get("resource", {})
details = source_finding.get("sourceProperties", {})
aws_metadata = source_resource.get("awsMetadata", {})
azure_metadata = source_resource.get("azureMetadata", {})
severity = SEVERITY_MAPPING.get(finding.severity or "") or Severity.medium
return Finding(
title, severity, description, remediation, finding.event_time, details | aws_metadata | azure_metadata
)
return None

@classmethod
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
def add_finding(
provider: str, finding: Finding, clazz: Optional[Type[GcpResource]] = None, **node: Any
) -> None:
if resource := builder.node(clazz=clazz or GcpResource, **node):
resource.add_finding(provider, finding)

if spec := cls.api_spec:
with GcpErrorHandler(
spec.action,
builder.error_accumulator,
spec.service,
builder.region.safe_name if builder.region else None,
GcpExpectedErrorCodes,
f" in {builder.project.id} kind {cls.kind}",
):
for item in builder.client.list(spec, **kwargs):
if finding := GcpSccFinding.from_api(item, builder):
if (ri := finding.resource_information) and (r_name := ri.display_name):
provider = ri.cloud_provider or "google_cloud_scc"
parsed_finding = finding.parse_finding(item)
if not parsed_finding:
continue
if r_name == builder.project.id and ri.location is None:
builder.after_collect_actions.append(
partial(
add_finding,
provider.lower(),
parsed_finding,
GcpProject,
id=r_name,
)
)

def resolve_location(
builder: GraphBuilder, location: str
) -> Tuple[Optional[GcpZone], Optional[GcpRegion]]:
zone = builder.zone_by_name.get(location)
region = builder.region_by_name.get(location)
return zone, region

if ri.location:
zone, region = resolve_location(builder, ri.location)
if zone:
builder.after_collect_actions.append(
partial(
add_finding,
provider.lower(),
parsed_finding,
GcpResource,
id=r_name,
_zone=zone,
)
)
elif region:
builder.after_collect_actions.append(
partial(
add_finding,
provider.lower(),
parsed_finding,
GcpResource,
id=r_name,
_region=region,
)
)
return []


resources: List[Type[GcpResource]] = [GcpSccFinding]
2 changes: 2 additions & 0 deletions plugins/gcp/test/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def all_base_classes(cls: Type[Any]) -> Set[Type[Any]]:
expected_declared_properties = ["kind", "_kind_display"]
expected_props_in_hierarchy = ["_kind_service", "_metadata"]
for rc in all_resources:
if not rc._model_export:
continue
for prop in expected_declared_properties:
assert prop in rc.__dict__, f"{rc.__name__} missing {prop}"
with_bases = (all_base_classes(rc) | {rc}) - {GcpResource, BaseResource}
Expand Down
31 changes: 31 additions & 0 deletions plugins/gcp/test/test_scc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from functools import partial
from typing import Any

from fix_plugin_gcp.resources.base import GraphBuilder
from fix_plugin_gcp.resources.compute import GcpFirewall
from fix_plugin_gcp.resources.scc import GcpSccFinding
from .random_client import roundtrip


class DefaultDict(dict): # type: ignore
def __init__(self, default_value: Any, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.default_value = default_value

def get(self, key: str, default: Any = None) -> Any:
if key in self:
return super().get(key, default)
return self.default_value


def test_gcp_scc_findings(random_builder: GraphBuilder) -> None:
firewall = roundtrip(GcpFirewall, random_builder)
# for random location name we will use the default global location
random_builder.region_by_name = DefaultDict(random_builder.fallback_global_region)
GcpSccFinding.collect_resources(random_builder)

partial(random_builder.after_collect_actions[0], id=firewall.id)() # type: ignore

assert len(firewall._assessments) > 0
assert len(firewall._assessments[0].findings) > 0
assert firewall._assessments[0].findings[0].severity is not None
4 changes: 3 additions & 1 deletion plugins/gcp/tools/model_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ def generate_test_classes() -> None:
},
"firestore": {"parent": "projects/{project_id}/databases/{database_id}/documents", "collectionId": "", "name": ""},
"file": {"name": "", "parent": "projects/{projectId}/locations/-"},
"securitycenter": {"parent": "projects/{projectId}", "name": ""},
"pubsub": {"project": "projects/{project}", "parent": ""},
}

Expand All @@ -532,7 +533,8 @@ def generate_test_classes() -> None:
# ("aiplatform", "v1", "", []),
# ("firestore", "v1", "", []),
# ("cloudfunctions", "v2", "", []),
# ("file", "v1", "", []),
# # ("file", "v1", "", []),
# ("securitycenter", "v1", "", []),
("pubsub", "v1", "", [])
]

Expand Down
Loading