diff --git a/fixlib/fixlib/baseresources.py b/fixlib/fixlib/baseresources.py index 45ffb32d67..1eb7a53f15 100644 --- a/fixlib/fixlib/baseresources.py +++ b/fixlib/fixlib/baseresources.py @@ -116,8 +116,7 @@ def get(self) -> Dict[str, Any]: return changes -# todo: replace to StrEnum once resoto is on 3.11 -class MetricName(str, Enum): +class MetricName(StrEnum): def __str__(self) -> str: return self.value @@ -150,6 +149,8 @@ def __str__(self) -> str: # load balancers RequestCount = "request" # _count will be added to the end because of the unit + RequestBytesCount = "request_bytes" # _count will be added to the end because of the unit + ResponseBytesCount = "response_bytes" # _count will be added to the end because of the unit ActiveConnectionCount = "active_connection" # _count will be added to the end because of the unit ALBActiveConnectionCount = "alb_active_connection" # _count will be added to the end because of the unit ConnectionAttemptCount = "connection_attempt" # _count will be added to the end because of the unit @@ -195,6 +196,8 @@ def __str__(self) -> str: DiskQueueDepth = "disk_queue_depth" NetworkReceiveThroughput = "network_receive_throughput" NetworkTransmitThroughput = "network_transmit_throughput" + NetworkBytesSent = "network_bytes_sent" + NetworkBytesReceived = "network_bytes_received" # serverless Invocations = "invocations" diff --git a/plugins/aws/fix_plugin_aws/collector.py b/plugins/aws/fix_plugin_aws/collector.py index 01c27acd92..8ffabc2a10 100644 --- a/plugins/aws/fix_plugin_aws/collector.py +++ b/plugins/aws/fix_plugin_aws/collector.py @@ -242,11 +242,11 @@ def get_last_run() -> Optional[datetime]: try: log.info(f"[Aws:{self.account.id}] Collect usage metrics.") self.collect_usage_metrics(global_builder) + shared_queue.wait_for_submitted_work() except Exception as e: log.warning( f"Failed to collect usage metrics on account {self.account.id} in region {global_builder.region.id}: {e}" ) - shared_queue.wait_for_submitted_work() # call all registered after collect hooks for after_collect in global_builder.after_collect_actions: @@ -334,8 +334,9 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> None: continue # region can be overridden in the query: s3 is global, but need to be queried per region if region := cast(AwsRegion, resource.region()): - lookup_map[resource.id] = resource resource_queries: List[cloudwatch.AwsCloudwatchQuery] = resource.collect_usage_metrics(builder) + if resource_queries: + lookup_map[resource.id] = resource for query in resource_queries: query_region = query.region or region start = query.start_delta or builder.metrics_delta diff --git a/plugins/azure/fix_plugin_azure/resource/metrics.py b/plugins/azure/fix_plugin_azure/resource/metrics.py index c476dd26a0..e22b243f28 100644 --- a/plugins/azure/fix_plugin_azure/resource/metrics.py +++ b/plugins/azure/fix_plugin_azure/resource/metrics.py @@ -1,3 +1,4 @@ +from copy import deepcopy from datetime import datetime, timedelta from concurrent.futures import as_completed import logging @@ -271,12 +272,13 @@ def _query_for_single( interval: str, ) -> "Tuple[Optional[AzureMetricData], Optional[str]]": try: + local_api_spec = deepcopy(api_spec) # Set the path for the API call based on the instance ID of the query - api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics" + local_api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics" # Retrieve metric data from the API aggregation = ",".join(query.aggregation) part = builder.client.list( - api_spec, + local_api_spec, metricnames=query.metric_name, metricNamespace=query.metric_namespace, timespan=timespan, diff --git a/plugins/gcp/fix_plugin_gcp/__init__.py b/plugins/gcp/fix_plugin_gcp/__init__.py index 4be20f3dcd..538bdf09cb 100644 --- a/plugins/gcp/fix_plugin_gcp/__init__.py +++ b/plugins/gcp/fix_plugin_gcp/__init__.py @@ -10,6 +10,7 @@ from fixlib.core.actions import CoreFeedback from fixlib.graph import Graph, MaxNodesExceeded from fixlib.logger import log, setup_logger +from fixlib.types import Json from .collector import GcpProjectCollector from .config import GcpConfig from .resources.base import GcpProject @@ -77,10 +78,11 @@ def collect(self) -> None: project_id, feedback, cloud, + self.task_data or {}, max_resources_per_account=self.max_resources_per_account, **collect_args, ) - for project_id in credentials.keys() + for project_id in credentials ] for future in futures.as_completed(wait_for): project_graph = future.result() @@ -98,6 +100,7 @@ def collect_project( project_id: str, core_feedback: CoreFeedback, cloud: Cloud, + task_data: Json, args: Optional[Namespace] = None, running_config: Optional[RunningConfig] = None, credentials: Optional[Dict[str, Any]] = None, @@ -130,7 +133,7 @@ def collect_project( try: core_feedback.progress_done(project_id, 0, 1) - gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, max_resources_per_account) + gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, task_data, max_resources_per_account) try: gpc.collect() except MaxNodesExceeded as ex: diff --git a/plugins/gcp/fix_plugin_gcp/collector.py b/plugins/gcp/fix_plugin_gcp/collector.py index cdd59e4711..654432541b 100644 --- a/plugins/gcp/fix_plugin_gcp/collector.py +++ b/plugins/gcp/fix_plugin_gcp/collector.py @@ -1,6 +1,7 @@ import logging from concurrent.futures import ThreadPoolExecutor -from typing import Type, List, Any, Optional +from datetime import datetime, timezone +from typing import Type, List, Any, Optional, cast from fix_plugin_gcp.config import GcpConfig from fix_plugin_gcp.gcp_client import GcpApiSpec @@ -14,12 +15,15 @@ firestore, filestore, cloudfunctions, + monitoring, ) from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone from fix_plugin_gcp.utils import Credentials from fixlib.baseresources import Cloud from fixlib.core.actions import CoreFeedback, ErrorAccumulator from fixlib.graph import Graph +from fixlib.json import value_in_path +from fixlib.types import Json log = logging.getLogger("fix.plugins.gcp") all_resources: List[Type[GcpResource]] = ( @@ -58,6 +62,7 @@ def __init__( cloud: Cloud, project: GcpProject, core_feedback: CoreFeedback, + task_data: Json, max_resources_per_account: Optional[int] = None, ) -> None: self.config = config @@ -67,6 +72,7 @@ def __init__( self.error_accumulator = ErrorAccumulator() self.graph = Graph(root=self.project, max_nodes=max_resources_per_account) self.credentials = Credentials.get(self.project.id) + self.task_data = task_data def collect(self) -> None: with ThreadPoolExecutor( @@ -77,7 +83,20 @@ def collect(self) -> None: # It should only be used in scenarios, where it is safe to do so. # This executor is shared between all regions. shared_queue = ExecutorQueue(executor, self.project.safe_name) + + def get_last_run() -> Optional[datetime]: + td = self.task_data + if not td: + return None + timestamp = value_in_path(td, ["timing", td.get("step", ""), "started_at"]) + + if timestamp is None: + return None + + return datetime.fromtimestamp(timestamp, timezone.utc) + project_global_region = GcpRegion.fallback_global_region(self.project) + last_run = get_last_run() global_builder = GraphBuilder( self.graph, self.cloud, @@ -87,6 +106,8 @@ def collect(self) -> None: self.core_feedback, self.error_accumulator, project_global_region, + config=self.config, + last_run_started_at=last_run, ) global_builder.add_node(project_global_region, {}) @@ -113,6 +134,13 @@ def collect(self) -> None: self.error_accumulator.report_all(global_builder.core_feedback) + if global_builder.config.collect_usage_metrics: + try: + log.info(f"[GCP:{self.project.id}] Collect usage metrics.") + self.collect_usage_metrics(global_builder) + global_builder.executor.wait_for_submitted_work() + except Exception as e: + log.warning(f"Failed to collect usage metrics in project {self.project.id}: {e}") log.info(f"[GCP:{self.project.id}] Connect resources and create edges.") # connect nodes for node, data in list(self.graph.nodes(data=True)): @@ -128,9 +156,19 @@ def collect(self) -> None: if isinstance(node, GcpResource): node.post_process_instance(global_builder, data.get("source", {})) + global_builder.executor.wait_for_submitted_work() + self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id]) log.info(f"[GCP:{self.project.id}] Collecting resources done.") + def collect_usage_metrics(self, builder: GraphBuilder) -> None: + for resource in builder.graph.nodes: + if isinstance(resource, GcpResource) and (mq := resource.collect_usage_metrics(builder)): + start_at = builder.created_at - builder.metrics_delta + region = cast(GcpRegion, resource.region()) + rb = builder.for_region(region) + monitoring.GcpMonitoringMetricData.query_for(rb, resource, mq, start_at, builder.created_at) + def remove_unconnected_nodes(self, builder: GraphBuilder) -> None: def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None: remove_nodes = set() diff --git a/plugins/gcp/fix_plugin_gcp/config.py b/plugins/gcp/fix_plugin_gcp/config.py index 28667303b6..e7373ccc75 100644 --- a/plugins/gcp/fix_plugin_gcp/config.py +++ b/plugins/gcp/fix_plugin_gcp/config.py @@ -1,6 +1,5 @@ -from fixlib.proc import num_default_threads from attrs import define, field -from typing import List, ClassVar +from typing import List, ClassVar, Optional @define @@ -17,7 +16,7 @@ class GcpConfig: metadata={"description": "GCP services to exclude (default: none)"}, ) project_pool_size: int = field( - factory=num_default_threads, + default=64, metadata={"description": "GCP project thread/process pool size"}, ) fork_process: bool = field( @@ -31,6 +30,10 @@ class GcpConfig: "If false, the error is logged and the resource is skipped." }, ) + collect_usage_metrics: Optional[bool] = field( + default=True, + metadata={"description": "Collect resource usage metrics via GCP Monitoring, enabled by default"}, + ) def should_collect(self, name: str) -> bool: if self.collect: diff --git a/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py b/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py index 7e21fcf38b..c8cd57dd4a 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py +++ b/plugins/gcp/fix_plugin_gcp/resources/aiplatform.py @@ -50,8 +50,6 @@ class AIPlatformRegionFilter: def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]: # Default behavior: in case the class has an ApiSpec, call the api and call collect. if issubclass(cls, GcpResource): - region_name = "global" if not builder.region else builder.region.safe_name - log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}") if spec := cls.api_spec: expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"} with GcpErrorHandler( @@ -66,7 +64,9 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpReso if builder.region: items = builder.client.list(spec, **kwargs) collected_resources = cls.collect(items, builder) - log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}") + log.info( + f"[GCP:{builder.project.id}:{builder.region.safe_name}] finished collecting: {cls.kind}" + ) return collected_resources return [] diff --git a/plugins/gcp/fix_plugin_gcp/resources/base.py b/plugins/gcp/fix_plugin_gcp/resources/base.py index da56cbc517..2372a20e0d 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/base.py +++ b/plugins/gcp/fix_plugin_gcp/resources/base.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime, timedelta import json import logging from concurrent.futures import Future @@ -8,9 +9,12 @@ from typing import Callable, List, ClassVar, Optional, TypeVar, Type, Any, Dict, Set, Tuple from attr import define, field +from attrs import frozen +from frozendict import frozendict from google.auth.credentials import Credentials as GoogleAuthCredentials from googleapiclient.errors import HttpError +from fix_plugin_gcp.config import GcpConfig from fix_plugin_gcp.gcp_client import GcpClient, GcpApiSpec, InternalZoneProp, RegionProp from fix_plugin_gcp.utils import Credentials from fixlib.baseresources import ( @@ -22,6 +26,9 @@ BaseZone, ModelReference, PhantomBaseResource, + MetricName, + MetricUnit, + StatName, ) from fixlib.config import Config from fixlib.core.actions import CoreFeedback, ErrorAccumulator @@ -30,6 +37,7 @@ from fixlib.json_bender import bend, Bender, S, Bend, MapDict, F from fixlib.threading import ExecutorQueue from fixlib.types import Json +from fixlib.utils import utc from fixinventorydata.cloud import regions as cloud_region_data log = logging.getLogger("fix.plugins.gcp") @@ -81,7 +89,9 @@ def __init__( core_feedback: CoreFeedback, error_accumulator: ErrorAccumulator, fallback_global_region: GcpRegion, + config: GcpConfig, region: Optional[GcpRegion] = None, + last_run_started_at: Optional[datetime] = None, graph_nodes_access: Optional[Lock] = None, graph_edges_access: Optional[Lock] = None, ) -> None: @@ -95,12 +105,39 @@ def __init__( self.core_feedback = core_feedback self.error_accumulator = error_accumulator self.fallback_global_region = fallback_global_region + self.config = config + self.created_at = utc() + self.last_run_started_at = last_run_started_at self.region_by_name: Dict[str, GcpRegion] = {} self.region_by_zone_name: Dict[str, GcpRegion] = {} self.zone_by_name: Dict[str, GcpZone] = {} self.graph_nodes_access = graph_nodes_access or Lock() self.graph_edges_access = graph_edges_access or Lock() + if last_run_started_at: + now = utc() + + # limit the metrics to the last 2 hours + if now - last_run_started_at > timedelta(hours=2): + start = now - timedelta(hours=2) + else: + start = last_run_started_at + + delta = now - start + + min_delta = max(delta, timedelta(seconds=60)) + # in case the last collection happened too quickly, raise the metrics timedelta to 60s, + if min_delta != delta: + start = now - min_delta + delta = min_delta + else: + now = utc() + delta = timedelta(hours=1) + start = now - delta + + self.metrics_start = start + self.metrics_delta = delta + def submit_work(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: """ Use this method for work that can be done in parallel. @@ -172,6 +209,23 @@ def _standard_edges(self, node: GcpResourceType, source: Optional[Json] = None) self.add_edge(node, node=node._region, reverse=True) return True + parts = node.id.split("/", maxsplit=4) + if len(parts) > 3 and parts[0] == "projects": + if parts[2] in ["locations", "zones", "regions"]: + location_name = parts[3] + # Check for zone first + if zone := self.zone_by_name.get(location_name): + node._zone = zone + node._region = self.region_by_zone_name.get(zone.id) + self.add_edge(zone, node=node) + return True + + # Then check for region + if region := self.region_by_name.get(location_name): + node._region = region + self.add_edge(region, node=node) + return True + if source is not None: if InternalZoneProp in source: if zone := self.zone_by_name.get(source[InternalZoneProp]): @@ -275,12 +329,60 @@ def for_region(self, region: GcpRegion) -> GraphBuilder: self.core_feedback, self.error_accumulator, self.fallback_global_region, + self.config, region, + self.last_run_started_at, self.graph_nodes_access, self.graph_edges_access, ) +@frozen(kw_only=True) +class MetricNormalization: + unit: MetricUnit + normalize_value: Callable[[float], float] = lambda x: x + compute_stats: Callable[[List[float]], List[Tuple[float, Optional[StatName]]]] = lambda x: [(sum(x) / len(x), None)] + + +@define(hash=True, frozen=True) +class GcpMonitoringQuery: + metric_name: MetricName # final name of the metric + query_name: str # name of the metric (e.g., GCP metric type) + period: timedelta # period of the metric + ref_id: str # unique id of the resource + metric_id: str # unique metric identifier + stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN + project_id: str # GCP project name + normalization: MetricNormalization # normalization info + metric_filters: frozendict[str, str] # filters for the metric + + @staticmethod + def create( + *, + query_name: str, + period: timedelta, + ref_id: str, + metric_name: MetricName, + stat: str, + project_id: str, + metric_filters: Dict[str, str], + normalization: MetricNormalization, + ) -> "GcpMonitoringQuery": + filter_suffix = "/" + "/".join(f"{key}={value}" for key, value in sorted(metric_filters.items())) + metric_id = f"{query_name}/{ref_id}/{stat}{filter_suffix}" + return GcpMonitoringQuery( + metric_name=metric_name, + query_name=query_name, + period=period, + ref_id=ref_id, + metric_id=metric_id, + stat=stat, + normalization=normalization, + project_id=project_id, + metric_filters=frozendict(metric_filters), + ) + + @define(eq=False, slots=False) class GcpResource(BaseResource): kind: ClassVar[str] = "gcp_resource" @@ -300,6 +402,16 @@ def _keys(self) -> Tuple[Any, ...]: return tuple(list(super()._keys()) + [self.link]) return super()._keys() + @property + def resource_raw_name(self) -> str: + """ + Extracts the last segment of the GCP resource ID. + + Returns: + str: The last segment of the resource ID (e.g., "function-1" from "projects/{project}/locations/{location}/functions/function-1"). + """ + return self.id.rsplit("/", maxsplit=1)[-1] + def delete(self, graph: Graph) -> bool: if not self.api_spec: return False @@ -374,13 +486,13 @@ def post_process_instance(self, builder: GraphBuilder, source: Json) -> None: """ pass + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + # Default behavior: do nothing + return [] + @classmethod def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]: # Default behavior: in case the class has an ApiSpec, call the api and call collect. - if kwargs: - log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind} with ({kwargs})") - else: - log.info(f"[GCP:{builder.project.id}] Collecting {cls.kind}") if spec := cls.api_spec: expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) with GcpErrorHandler( @@ -393,7 +505,9 @@ def collect_resources(cls: Type[GcpResource], builder: GraphBuilder, **kwargs: A ): items = builder.client.list(spec, **kwargs) resources = cls.collect(items, builder) - log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}") + log.info( + f"[GCP:{builder.project.id}:{builder.region.safe_name if builder.region else "global"}] finished collecting: {cls.kind}" + ) return resources return [] diff --git a/plugins/gcp/fix_plugin_gcp/resources/cloudfunctions.py b/plugins/gcp/fix_plugin_gcp/resources/cloudfunctions.py index 0fd5f113e2..857c2d3574 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/cloudfunctions.py +++ b/plugins/gcp/fix_plugin_gcp/resources/cloudfunctions.py @@ -4,8 +4,9 @@ from attr import define, field from fix_plugin_gcp.gcp_client import GcpApiSpec -from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus -from fixlib.baseresources import BaseServerlessFunction +from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery +from fix_plugin_gcp.resources.monitoring import normalizer_factory, STANDART_STAT_MAP, PERCENTILE_STAT_MAP +from fixlib.baseresources import BaseServerlessFunction, MetricName from fixlib.json_bender import Bender, S, Bend, ForallBend @@ -300,5 +301,70 @@ class GcpCloudFunction(GcpResource, BaseServerlessFunction): upgrade_info: Optional[GcpUpgradeInfo] = field(default=None) url: Optional[str] = field(default=None) + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="cloudfunctions.googleapis.com/function/execution_count", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.Invocations, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "metric.labels.status": "ok", + "resource.labels.function_name": self.resource_raw_name, + "resource.labels.region": self.region().id, + "resource.type": "cloud_function", + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="cloudfunctions.googleapis.com/function/execution_count", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.Errors, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "metric.labels.status": "error", + "resource.labels.function_name": self.resource_raw_name, + "resource.labels.region": self.region().id, + "resource.type": "cloud_function", + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="cloudfunctions.googleapis.com/function/execution_times", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.Duration, + # convert nanoseconds to milliseconds + normalization=normalizer_factory.milliseconds(lambda x: round(x / 1_000_000, ndigits=4)), + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.function_name": self.resource_raw_name, + "resource.labels.region": self.region().id, + "resource.type": "cloud_function", + }, + ) + for stat in PERCENTILE_STAT_MAP + ] + ) + return queries + resources: List[Type[GcpResource]] = [GcpCloudFunction] diff --git a/plugins/gcp/fix_plugin_gcp/resources/compute.py b/plugins/gcp/fix_plugin_gcp/resources/compute.py index f8aedcd9a3..62621968d3 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/compute.py +++ b/plugins/gcp/fix_plugin_gcp/resources/compute.py @@ -1,3 +1,4 @@ +from collections import defaultdict import logging import ipaddress from datetime import datetime @@ -7,8 +8,9 @@ from attr import define, field from fix_plugin_gcp.gcp_client import GcpApiSpec, InternalZoneProp -from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder +from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery from fix_plugin_gcp.resources.billing import GcpSku +from fix_plugin_gcp.resources.monitoring import STANDART_STAT_MAP, PERCENTILE_STAT_MAP, normalizer_factory from fixlib.baseresources import ( BaseAutoScalingGroup, BaseBucket, @@ -24,6 +26,7 @@ BaseSubnet, BaseTunnel, BaseVolumeType, + MetricName, ModelReference, BaseVolume, VolumeStatus, @@ -1203,6 +1206,66 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: builder.dependant_node(self, clazz=GcpInstance, link=user, reverse=True, delete_same_as_default=False) builder.add_edge(self, reverse=True, clazz=GcpDiskType, link=self.volume_type) + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="compute.googleapis.com/instance/disk/average_io_queue_depth", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.VolumeQueueLength, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.device_name": self.id, "resource.labels.zone": self.zone().id}, + ) + for stat in STANDART_STAT_MAP + ] + ) + + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.iops, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.device_name": self.id, "resource.labels.zone": self.zone().id}, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("compute.googleapis.com/instance/disk/read_ops_count", MetricName.DiskRead), + ("compute.googleapis.com/instance/disk/write_ops_count", MetricName.DiskWrite), + ] + ] + ) + + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.bytes, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.device_name": self.id}, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("compute.googleapis.com/instance/disk/read_bytes_count", MetricName.DiskRead), + ("compute.googleapis.com/instance/disk/write_bytes_count", MetricName.DiskWrite), + ] + ] + ) + return queries + @define(eq=False, slots=False) class GcpExternalVpnGatewayInterface: @@ -1669,9 +1732,73 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: GcpTargetPool, ) builder.add_edge(self, clazz=target_classes, link=self.target) - self._collect_backends(builder) - def _collect_backends(self, graph_builder: GraphBuilder) -> None: + def post_process_instance(self, builder: GraphBuilder, source: Json) -> None: + # Calculate the processed bytes + total_bytes: Dict[str, float] = defaultdict(float) + for metric_name, metric_values in self._resource_usage.items(): + if metric_name.endswith("_bytes_count"): + for name, value in metric_values.items(): + total_bytes[name] += value + if total_bytes: + self._resource_usage["processed_bytes"] = dict(total_bytes) + + self.collect_backends(builder) + + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + if not self.load_balancing_scheme: + return [] + lb_type = "external/regional" if "EXTERNAL" in self.load_balancing_scheme else "internal" + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.label.forwarding_rule_name": self.id, + "resource.labels.region": self.region().id, + }, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + (f"loadbalancing.googleapis.com/https/{lb_type}/request_count", MetricName.RequestCount), + (f"loadbalancing.googleapis.com/https/{lb_type}/request_bytes_count", MetricName.RequestBytesCount), + ( + f"loadbalancing.googleapis.com/https/{lb_type}/response_bytes_count", + MetricName.ResponseBytesCount, + ), + ] + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=f"loadbalancing.googleapis.com/https/{lb_type}/backend_latencies", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.Latency, + normalization=normalizer_factory.milliseconds(), + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.label.forwarding_rule_name": self.id, + "resource.labels.region": self.region().id, + }, + ) + for stat in PERCENTILE_STAT_MAP + ] + ) + + return queries + + def collect_backends(self, graph_builder: GraphBuilder) -> None: if not self.target: return backend_services = graph_builder.nodes(clazz=GcpBackendService) @@ -1713,7 +1840,7 @@ def fetch_instances(group: str) -> None: if vm_id := item.get("instance"): self.backends.append(vm_id) except Exception as e: - log.warning(f"An error occured while setting backends property: {e}") + log.warning(f"An error occurred while setting backends property: {e}") graph_builder.submit_work(fetch_instances, backend.group) @@ -3553,6 +3680,87 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: self, reverse=True, delete_same_as_default=True, clazz=GcpSubnetwork, link=nic.subnetwork ) + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + if self.instance_status != InstanceStatus.RUNNING: + return [] + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="compute.googleapis.com/instance/cpu/utilization", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.CpuUtilization, + normalization=normalizer_factory.percent, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id}, + ) + for stat in STANDART_STAT_MAP + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.bytes, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id}, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("compute.googleapis.com/instance/network/received_bytes_count", MetricName.NetworkIn), + ("compute.googleapis.com/instance/network/sent_bytes_count", MetricName.NetworkOut), + ] + ] + ) + + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.iops, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id}, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("compute.googleapis.com/instance/disk/read_ops_count", MetricName.DiskRead), + ("compute.googleapis.com/instance/disk/write_ops_count", MetricName.DiskWrite), + ] + ] + ) + + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.bytes, + stat=stat, + project_id=builder.project.id, + metric_filters={"metric.labels.instance_name": self.id, "resource.labels.zone": self.zone().id}, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("compute.googleapis.com/instance/disk/read_bytes_count", MetricName.DiskRead), + ("compute.googleapis.com/instance/disk/write_bytes_count", MetricName.DiskWrite), + ] + ] + ) + return queries + @classmethod def collect(cls: Type[GcpResource], raw: List[Json], builder: GraphBuilder) -> List[GcpResource]: # Additional behavior: iterate over list of collected GcpInstances and for each: diff --git a/plugins/gcp/fix_plugin_gcp/resources/filestore.py b/plugins/gcp/fix_plugin_gcp/resources/filestore.py index 7f7714fa15..309e037bc5 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/filestore.py +++ b/plugins/gcp/fix_plugin_gcp/resources/filestore.py @@ -289,7 +289,9 @@ def collect_snapshots() -> None: snapshots = GcpFilestoreInstanceSnapshot.collect(items, graph_builder) for snapshot in snapshots: graph_builder.add_edge(self, node=snapshot) - log.info(f"[GCP:{graph_builder.project.id}] finished collecting: {GcpFilestoreInstanceSnapshot.kind}") + log.info( + f"[GCP:{graph_builder.project.id}:{graph_builder.region.safe_name if graph_builder.region else "global"}] finished collecting: {GcpFilestoreInstanceSnapshot.kind}" + ) graph_builder.submit_work(collect_snapshots) diff --git a/plugins/gcp/fix_plugin_gcp/resources/firestore.py b/plugins/gcp/fix_plugin_gcp/resources/firestore.py index 5909408ce2..a9c3ac916c 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/firestore.py +++ b/plugins/gcp/fix_plugin_gcp/resources/firestore.py @@ -148,7 +148,9 @@ def collect_documents() -> None: documents = GcpFirestoreDocument.collect(items, graph_builder) for document in documents: graph_builder.add_edge(self, node=document) - log.info(f"[GCP:{graph_builder.project.id}] finished collecting: {GcpFirestoreDocument.kind}") + log.info( + f"[GCP:{graph_builder.project.id}:{graph_builder.region.safe_name if graph_builder.region else "global"}] finished collecting: {GcpFirestoreDocument.kind}" + ) graph_builder.submit_work(collect_documents) diff --git a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py new file mode 100644 index 0000000000..6219083e2e --- /dev/null +++ b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py @@ -0,0 +1,215 @@ +import logging +from copy import deepcopy +from datetime import datetime +from functools import cached_property, lru_cache +from typing import ClassVar, Dict, List, Optional, TypeVar, Callable + +from attr import define, field + +from fix_plugin_gcp.gcp_client import GcpApiSpec +from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery, MetricNormalization, GcpResource +from fixlib.baseresources import MetricUnit, StatName, BaseResource +from fixlib.durations import duration_str +from fixlib.json import from_json +from fixlib.json_bender import S, Bender, ForallBend, bend, K, AsFloat +from fixlib.utils import utc_str + +service_name = "monitoring" +log = logging.getLogger("fix.plugins.gcp") +T = TypeVar("T") +V = TypeVar("V", bound=BaseResource) + +STANDART_STAT_MAP: Dict[str, StatName] = { + "ALIGN_MIN": StatName.min, + "ALIGN_MEAN": StatName.avg, + "ALIGN_MAX": StatName.max, +} +PERCENTILE_STAT_MAP: Dict[str, StatName] = { + "ALIGN_PERCENTILE_05": StatName.min, + "ALIGN_PERCENTILE_50": StatName.avg, + "ALIGN_PERCENTILE_99": StatName.max, +} + + +@define(eq=False, slots=False) +class GcpMonitoringMetricData: + kind: ClassVar[str] = "gcp_monitoring_metric_data" + mapping: ClassVar[Dict[str, Bender]] = { + "metric_values": S("points") + >> ForallBend((S("value", "doubleValue").or_else(S("value", "int64Value")) >> AsFloat())).or_else(K([])), + "metric_kind": S("metricKind"), + "value_type": S("valueType"), + "metric_type": S("metric", "type"), + } + metric_values: List[float] = field(factory=list) + metric_kind: Optional[str] = field(default=None) + value_type: Optional[str] = field(default=None) + metric_type: Optional[str] = field(default=None) + + @classmethod + def called_collect_apis(cls) -> List[GcpApiSpec]: + api_spec = GcpApiSpec( + service="monitoring", + version="v3", + accessors=["projects", "timeSeries"], + action="list", + request_parameter={ + "name": "projects/{project}", + }, + request_parameter_in={"project"}, + response_path="timeSeries", + ) + return [api_spec] + + @staticmethod + def query_for( + builder: GraphBuilder, + resource: GcpResource, + queries: List[GcpMonitoringQuery], + start_time: datetime, + end_time: datetime, + ) -> None: + if builder.region: + log.info( + f"[{builder.region.safe_name}|{start_time}|{duration_str(end_time - start_time)}] Query for {len(queries)} metrics." + ) + else: + log.info(f"[global|{start_time}|{duration_str(end_time - start_time)}] Query for {len(queries)} metrics.") + + api_spec = GcpApiSpec( + service="monitoring", + version="v3", + accessors=["projects", "timeSeries"], + action="list", + request_parameter={ + "name": "projects/{project}", + "interval_endTime": utc_str(end_time), + "interval_startTime": utc_str(start_time), + "aggregation_crossSeriesReducer": "REDUCE_NONE", + "view": "FULL", + # Below parameters are intended to be set dynamically + # "aggregation_alignmentPeriod": None, + # "aggregation_perSeriesAligner": None, + # "filter": None, + }, + request_parameter_in={"project"}, + response_path="timeSeries", + ) + + for query in queries: + builder.submit_work( + GcpMonitoringMetricData._query_for_chunk, + builder, + resource, + api_spec, + query, + ) + + @staticmethod + def _query_for_chunk( + builder: GraphBuilder, + resource: GcpResource, + api_spec: GcpApiSpec, + query: GcpMonitoringQuery, + ) -> None: + local_api_spec = deepcopy(api_spec) + + # Base filter + filters = [ + f'metric.type = "{query.query_name}"', + f'resource.labels.project_id="{query.project_id}"', + ] + + # Add additional filters + if query.metric_filters: + filters.extend(f'{key} = "{value}"' for key, value in query.metric_filters.items()) + + # Join filters with " AND " to form the final filter string + local_api_spec.request_parameter["filter"] = " AND ".join(filters) + local_api_spec.request_parameter["aggregation_alignmentPeriod"] = f"{int(query.period.total_seconds())}s" + local_api_spec.request_parameter["aggregation_perSeriesAligner"] = query.stat + + try: + part = builder.client.list(local_api_spec) + for single in part: + metric = from_json(bend(GcpMonitoringMetricData.mapping, single), GcpMonitoringMetricData) + update_resource_metrics(resource, query, metric) + except Exception as e: + log.warning(f"An error occurred while processing a metric data: {e}") + + +def update_resource_metrics( + resource: GcpResource, + query: GcpMonitoringQuery, + metric: GcpMonitoringMetricData, +) -> None: + if len(metric.metric_values) == 0: + return + normalizer = query.normalization + for metric_value, maybe_stat_name in normalizer.compute_stats(metric.metric_values): + try: + metric_name = query.metric_name + if not metric_name: + continue + name = metric_name + "_" + normalizer.unit + value = normalizer.normalize_value(metric_value) + stat_name = maybe_stat_name or STANDART_STAT_MAP.get(query.stat) or PERCENTILE_STAT_MAP.get(query.stat) + if stat_name: + resource._resource_usage[name][str(stat_name)] = value + except KeyError as e: + log.warning(f"An error occurred while setting metric values: {e}") + raise + + +class NormalizerFactory: + @cached_property + def count(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Count, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def bytes(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Bytes, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def bytes_per_second(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.BytesPerSecond, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def iops(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.IOPS, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @cached_property + def seconds(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Seconds, + normalize_value=lambda x: round(x, ndigits=4), + ) + + @lru_cache(maxsize=128) + def milliseconds(self, normalize_value: Optional[Callable[[float], float]] = None) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Milliseconds, + normalize_value=normalize_value or (lambda x: round(x, ndigits=4)), + ) + + @cached_property + def percent(self) -> MetricNormalization: + return MetricNormalization( + unit=MetricUnit.Percent, + normalize_value=lambda x: round(x, ndigits=4), + ) + + +normalizer_factory = NormalizerFactory() diff --git a/plugins/gcp/fix_plugin_gcp/resources/sqladmin.py b/plugins/gcp/fix_plugin_gcp/resources/sqladmin.py index 8786452307..2402768f9e 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/sqladmin.py +++ b/plugins/gcp/fix_plugin_gcp/resources/sqladmin.py @@ -5,9 +5,10 @@ from attr import define, field from fix_plugin_gcp.gcp_client import GcpApiSpec -from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder +from fix_plugin_gcp.resources.base import GcpResource, GcpDeprecationStatus, GraphBuilder, GcpMonitoringQuery from fix_plugin_gcp.resources.compute import GcpSslCertificate -from fixlib.baseresources import BaseDatabase, DatabaseInstanceStatus, ModelReference +from fix_plugin_gcp.resources.monitoring import normalizer_factory, STANDART_STAT_MAP +from fixlib.baseresources import BaseDatabase, DatabaseInstanceStatus, MetricName, ModelReference from fixlib.json_bender import F, Bender, S, Bend, ForallBend, K, MapEnum, AsInt from fixlib.types import Json @@ -766,6 +767,74 @@ def collect_sql_resources(spec: GcpApiSpec, clazz: Type[GcpResource]) -> None: graph_builder.submit_work(collect_sql_resources, spec, cls) + def collect_usage_metrics(self, builder: GraphBuilder) -> List[GcpMonitoringQuery]: + queries: List[GcpMonitoringQuery] = [] + delta = builder.metrics_delta + queries.extend( + [ + GcpMonitoringQuery.create( + query_name="cloudsql.googleapis.com/database/cpu/utilization", + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=MetricName.CpuUtilization, + normalization=normalizer_factory.percent, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.database_id": f"{builder.project.id}:{self.id}", + "resource.labels.region": self.region().id, + }, + ) + for stat in STANDART_STAT_MAP + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.count, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.database_id": f"{builder.project.id}:{self.id}", + "resource.labels.region": self.region().id, + }, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("cloudsql.googleapis.com/database/network/connections", MetricName.DatabaseConnections), + ("cloudsql.googleapis.com/database/network/sent_bytes_count", MetricName.NetworkBytesSent), + ("cloudsql.googleapis.com/database/network/received_bytes_count", MetricName.NetworkBytesReceived), + ] + ] + ) + queries.extend( + [ + GcpMonitoringQuery.create( + query_name=name, + period=delta, + ref_id=f"{self.kind}/{self.id}/{self.region().id}", + metric_name=metric_name, + normalization=normalizer_factory.iops, + stat=stat, + project_id=builder.project.id, + metric_filters={ + "resource.labels.database_id": f"{builder.project.id}:{self.id}", + "resource.labels.region": self.region().id, + }, + ) + for stat in STANDART_STAT_MAP + for name, metric_name in [ + ("cloudsql.googleapis.com/database/disk/read_ops_count", MetricName.DiskRead), + ("cloudsql.googleapis.com/database/disk/write_ops_count", MetricName.DiskWrite), + ] + ] + ) + return queries + @classmethod def called_collect_apis(cls) -> List[GcpApiSpec]: return [ diff --git a/plugins/gcp/fix_plugin_gcp/utils.py b/plugins/gcp/fix_plugin_gcp/utils.py index f4fa418796..9f9c6063f8 100644 --- a/plugins/gcp/fix_plugin_gcp/utils.py +++ b/plugins/gcp/fix_plugin_gcp/utils.py @@ -1,7 +1,7 @@ import os import socket from datetime import datetime -from typing import Iterable, List, Union, Callable, Any, Dict, Optional +from typing import Iterable, TypeVar, List, Union, Callable, Any, Dict, Optional from google.oauth2 import service_account from googleapiclient import discovery @@ -19,6 +19,7 @@ log = fixlib.logger.getLogger("fix." + __name__) fixlib.logger.getLogger("googleapiclient").setLevel(fixlib.logger.ERROR) +T = TypeVar("T") SCOPES = ["https://www.googleapis.com/auth/cloud-platform"] diff --git a/plugins/gcp/test/conftest.py b/plugins/gcp/test/conftest.py index ec6b291b83..050ec77963 100644 --- a/plugins/gcp/test/conftest.py +++ b/plugins/gcp/test/conftest.py @@ -32,7 +32,15 @@ def random_builder() -> Iterator[GraphBuilder]: project_global_region = GcpRegion.fallback_global_region(project) credentials = AnonymousCredentials() # type: ignore builder = GraphBuilder( - Graph(), Cloud(id="gcp"), project, credentials, queue, feedback, accumulator, project_global_region + Graph(), + Cloud(id="gcp"), + project, + credentials, + queue, + feedback, + accumulator, + project_global_region, + GcpConfig(), ) builder.add_node(project_global_region, {}) # add predefined regions and zones diff --git a/plugins/gcp/test/test_collector.py b/plugins/gcp/test/test_collector.py index 2f6524a96e..3dfb5e5c80 100644 --- a/plugins/gcp/test/test_collector.py +++ b/plugins/gcp/test/test_collector.py @@ -24,6 +24,7 @@ def collector_with_graph(graph: Graph) -> GcpProjectCollector: cloud=Cloud(id="gcp"), project=GcpProject(id="test"), core_feedback=CoreFeedback("test", "test", "test", Queue()), + task_data={}, ) collector.graph = graph return collector @@ -32,7 +33,9 @@ def collector_with_graph(graph: Graph) -> GcpProjectCollector: def test_project_collection(random_builder: GraphBuilder) -> None: # create the collector from the builder values config: GcpConfig = current_config().gcp - project = GcpProjectCollector(config, random_builder.cloud, random_builder.project, random_builder.core_feedback) + project = GcpProjectCollector( + config, random_builder.cloud, random_builder.project, random_builder.core_feedback, {} + ) # use the graph provided by the random builder - it already has regions and zones # the random builder will not create new regions or zones during the test project.graph = random_builder.graph diff --git a/plugins/gcp/test/test_compute.py b/plugins/gcp/test/test_compute.py index d36beb554d..eb1046f35e 100644 --- a/plugins/gcp/test/test_compute.py +++ b/plugins/gcp/test/test_compute.py @@ -1,9 +1,19 @@ +from concurrent.futures import ThreadPoolExecutor +from datetime import timedelta, datetime import json import os +from typing import List + +from fix_plugin_gcp.resources.base import GraphBuilder, GcpRegion from fix_plugin_gcp.resources.compute import * from fix_plugin_gcp.resources.billing import GcpSku +from fix_plugin_gcp.resources.monitoring import GcpMonitoringMetricData +from fixlib.threading import ExecutorQueue +from fixlib.baseresources import InstanceStatus + +from google.auth.credentials import AnonymousCredentials + from .random_client import roundtrip, connect_resource, FixturedClient -from fix_plugin_gcp.resources.base import GraphBuilder, GcpRegion def test_gcp_accelerator_type(random_builder: GraphBuilder) -> None: @@ -168,6 +178,43 @@ def test_gcp_instance_custom_machine_type(random_builder: GraphBuilder) -> None: assert only_machine_type._region +def test_gcp_instance_usage_metrics(random_builder: GraphBuilder) -> None: + gcp_instance = roundtrip(GcpInstance, random_builder) + gcp_instance.instance_status = InstanceStatus.RUNNING + + random_builder.region = GcpRegion(id="us-east1", name="us-east1") + random_builder.created_at = datetime(2020, 5, 30, 15, 45, 30) + random_builder.metrics_delta = timedelta(hours=1) + + queries = gcp_instance.collect_usage_metrics(random_builder) + with ThreadPoolExecutor(max_workers=1) as executor: + queue = ExecutorQueue(executor, tasks_per_key=lambda _: 10, name="test") + g_builder = GraphBuilder( + random_builder.graph, + random_builder.cloud, + random_builder.project, + AnonymousCredentials(), # type: ignore + queue, + random_builder.core_feedback, + random_builder.error_accumulator, + GcpRegion(id="global", name="global"), + random_builder.config, + last_run_started_at=random_builder.last_run_started_at, + ) + GcpMonitoringMetricData.query_for( + g_builder, + gcp_instance, + queries, + random_builder.created_at, + random_builder.created_at + random_builder.metrics_delta, + ) + g_builder.executor.wait_for_submitted_work() + + assert gcp_instance._resource_usage["cpu_utilization_percent"]["avg"] > 0.0 + assert gcp_instance._resource_usage["network_in_bytes"]["avg"] > 0.0 + assert gcp_instance._resource_usage["disk_read_iops"]["avg"] > 0.0 + + def test_machine_type_ondemand_cost(random_builder: GraphBuilder) -> None: # Cross-checking with pricing calculated on https://gcpinstances.doit-intl.com/ known_prices_linux_ondemand_hourly = [ diff --git a/plugins/gcp/test/test_config.py b/plugins/gcp/test/test_config.py index 5fc9cae0ba..1ce445aead 100644 --- a/plugins/gcp/test/test_config.py +++ b/plugins/gcp/test/test_config.py @@ -1,4 +1,3 @@ -from fixlib.proc import num_default_threads from fixlib.config import Config from fix_plugin_gcp import GCPCollectorPlugin @@ -11,5 +10,5 @@ def test_args() -> None: assert len(Config.gcp.project) == 0 assert len(Config.gcp.collect) == 0 assert len(Config.gcp.no_collect) == 0 - assert Config.gcp.project_pool_size == num_default_threads() + assert Config.gcp.project_pool_size == 64 assert Config.gcp.fork_process is True diff --git a/plugins/gcp/test/test_monitoring.py b/plugins/gcp/test/test_monitoring.py new file mode 100644 index 0000000000..374fe9d4c2 --- /dev/null +++ b/plugins/gcp/test/test_monitoring.py @@ -0,0 +1,37 @@ +from datetime import timedelta, datetime, timezone + +from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery +from fix_plugin_gcp.resources.monitoring import GcpMonitoringMetricData, normalizer_factory +from fix_plugin_gcp.resources.compute import GcpInstance +from fixlib.baseresources import MetricName + + +def test_metric(random_builder: GraphBuilder) -> None: + now = datetime(2020, 3, 1, tzinfo=timezone.utc) + earlier = now - timedelta(days=60) + read = GcpMonitoringQuery.create( + query_name="compute.googleapis.com/instance/disk/read_ops_count", + period=timedelta(hours=1), + ref_id="gcp_instance/random_instance/global", + metric_name=MetricName.DiskRead, + normalization=normalizer_factory.iops, + stat="ALIGN_MIN", + project_id=random_builder.project.id, + metric_filters={"metric.labels.instance_name": "random_instance", "resource.labels.zone": "global"}, + ) + write = GcpMonitoringQuery.create( + query_name="compute.googleapis.com/instance/disk/write_ops_count", + period=timedelta(hours=1), + ref_id="gcp_instance/random_instance/global", + metric_name=MetricName.DiskWrite, + normalization=normalizer_factory.iops, + stat="ALIGN_MIN", + project_id=random_builder.project.id, + metric_filters={"metric.labels.instance_name": "random_instance", "resource.labels.zone": "global"}, + ) + gcp_instance = GcpInstance(id="random_instance") + GcpMonitoringMetricData.query_for(random_builder, gcp_instance, [read, write], earlier, now) + random_builder.executor.wait_for_submitted_work() + usages = list(gcp_instance._resource_usage.keys()) + assert usages[0] == f"{read.metric_name}_{read.normalization.unit}" + assert usages[1] == f"{write.metric_name}_{write.normalization.unit}"