Skip to content

Commit

Permalink
Implement AwsSpanMetricsProcessor and MetricsAttributeGenerator (#8)
Browse files Browse the repository at this point in the history
In this commit, we are implementing AwsSpanMetricsProcessor and
AwsSpanMetricsProcessorBuilder. We needed to also implement the
MetricsAttributeGenerator interface, since it is a dependency of
AwsSpanMetricsProcessor, and we needed to implement a stub for
AwsMetricsAttributeGenerator, since it is a dependency of
AwsSpanMetricsProcessorBuilder. As much as possible, we are attempting
to mirror the implementation of these clases found in
https://github.com/aws-observability/aws-otel-java-instrumentation

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
thpierce authored Jan 10, 2024
2 parents dc54290 + 29b75ad commit 51c9ec0
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from metric_attribute_generator import MetricAttributeGenerator

from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan


class AwsMetricAttributeGenerator(MetricAttributeGenerator):
"""AwsMetricAttributeGenerator generates specific metric attributes for incoming and outgoing traffic.
AwsMetricAttributeGenerator generates very specific metric attributes based on low-cardinality span and resource
attributes. If such attributes are not present, we fallback to default values.
The goal of these particular metric attributes is to get metrics for incoming and outgoing traffic for a service.
Namely, SpanKind#SERVER and SpanKind#CONSUMER spans represent "incoming" traffic, SpanKind#CLIENT and
SpanKind#PRODUCER spans represent "outgoing" traffic, and SpanKind#INTERNAL spans are ignored.
"""

@staticmethod
def generate_metric_attributes_dict_from_span(span: ReadableSpan, resource: Resource) -> [str, BoundedAttributes]:
"""This method is used by the AwsSpanMetricsProcessor to generate service and dependency metrics"""
# TODO
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Optional

from metric_attribute_generator import MetricAttributeGenerator
from typing_extensions import override

from opentelemetry.context import Context
from opentelemetry.metrics import Histogram
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan, Span, SpanProcessor, StatusCode
from opentelemetry.semconv.trace import SpanAttributes

_HTTP_STATUS_CODE = SpanAttributes.HTTP_STATUS_CODE
_NANOS_TO_MILLIS: float = 1_000_000.0

# Constants for deriving error and fault metrics
_ERROR_CODE_LOWER_BOUND: int = 400
_ERROR_CODE_UPPER_BOUND: int = 499
_FAULT_CODE_LOWER_BOUND: int = 500
_FAULT_CODE_UPPER_BOUND: int = 599


class AwsSpanMetricsProcessor(SpanProcessor):
"""AwsSpanMetricsProcessor is SpanProcessor that generates metrics from spans
This processor will generate metrics based on span data. It depends on a MetricAttributeGenerator being provided on
instantiation, which will provide a means to determine attributes which should be used to create metrics. A Resource
must also be provided, which is used to generate metrics. Finally, three Histogram must be provided, which will be
used to actually create desired metrics (see below)
AwsSpanMetricsProcessor produces metrics for errors (e.g. HTTP 4XX status codes), faults (e.g. HTTP 5XX status
codes), and latency (in Milliseconds). Errors and faults are counted, while latency is measured with a histogram.
Metrics are emitted with attributes derived from span attributes.
For highest fidelity metrics, this processor should be coupled with the AlwaysRecordSampler, which will result in
100% of spans being sent to the processor.
"""

# Metric instruments
_error_histogram: Histogram
_fault_histogram: Histogram
_latency_histogram: Histogram

_generator: MetricAttributeGenerator
_resource: Resource

def __init__(
self,
error_histogram: Histogram,
fault_histogram: Histogram,
latency_histogram: Histogram,
generator: MetricAttributeGenerator,
resource: Resource,
):
self._error_histogram = error_histogram
self._fault_histogram = fault_histogram
self._latency_histogram = latency_histogram
self._generator = generator
self._resource = resource

# pylint: disable=no-self-use
@override
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
return

@override
def on_end(self, span: ReadableSpan) -> None:
attribute_dict: dict[str, BoundedAttributes] = self._generator.generate_metric_attributes_dict_from_span(
span, self._resource
)
map(lambda attributes: self._record_metrics(span, attributes), attribute_dict.values())

@override
def shutdown(self) -> None:
self.force_flush()

# pylint: disable=no-self-use
@override
def force_flush(self, timeout_millis: int = None) -> bool:
return True

def _record_metrics(self, span: ReadableSpan, attributes: BoundedAttributes) -> None:
# Only record metrics if non-empty attributes are returned.
if len(attributes) > 0:
self._record_error_or_fault(span, attributes)
self._record_latency(span, attributes)

def _record_error_or_fault(self, span: ReadableSpan, attributes: BoundedAttributes) -> None:
# The logic to record error and fault should be kept in sync with the aws-xray exporter whenever possible except
# for the throttle.
# https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awsxrayexporter/internal/translator/cause.go#L121-L160
http_status_code: int = span.attributes.get(_HTTP_STATUS_CODE)
status_code: StatusCode = span.status.status_code

if http_status_code is None:
http_status_code = attributes.get(_HTTP_STATUS_CODE)

if _is_not_error_or_fault(http_status_code):
if StatusCode.ERROR == status_code:
self._error_histogram.record(0, attributes)
self._fault_histogram.record(1, attributes)
else:
self._error_histogram.record(0, attributes)
self._fault_histogram.record(0, attributes)
elif _ERROR_CODE_LOWER_BOUND <= http_status_code <= _ERROR_CODE_UPPER_BOUND:
self._error_histogram.record(1, attributes)
self._fault_histogram.record(0, attributes)
elif _FAULT_CODE_LOWER_BOUND <= http_status_code <= _FAULT_CODE_UPPER_BOUND:
self._error_histogram.record(0, attributes)
self._fault_histogram.record(1, attributes)

def _record_latency(self, span: ReadableSpan, attributes: BoundedAttributes) -> None:
nanos: int = span.end_time - span.start_time
millis: float = nanos / _NANOS_TO_MILLIS
self._latency_histogram.record(millis, attributes)


def _is_not_error_or_fault(http_status_code: int) -> bool:
return (
http_status_code is None
or http_status_code < _ERROR_CODE_LOWER_BOUND
or http_status_code > _FAULT_CODE_UPPER_BOUND
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from aws_metric_attribute_generator import AwsMetricAttributeGenerator
from aws_span_metrics_processor import AwsSpanMetricsProcessor
from metric_attribute_generator import MetricAttributeGenerator

from opentelemetry.sdk.metrics import Histogram, Meter, MeterProvider
from opentelemetry.sdk.resources import Resource

# Metric instrument configuration constants
_ERROR: str = "Error"
_FAULT: str = "Fault"
_LATENCY: str = "Latency"
_LATENCY_UNITS: str = "Milliseconds"

# Defaults
_DEFAULT_GENERATOR: MetricAttributeGenerator = AwsMetricAttributeGenerator()
_DEFAULT_SCOPE_NAME: str = "AwsSpanMetricsProcessor"


class AwsSpanMetricsProcessorBuilder:
"""A builder for AwsSpanMetricsProcessor"""

# Required builder elements
_meter_provider: MeterProvider
_resource: Resource

# Optional builder elements
_generator: MetricAttributeGenerator = _DEFAULT_GENERATOR
_scope_name: str = _DEFAULT_SCOPE_NAME

def __init__(self, meter_provider: MeterProvider, resource: Resource):
self.meter_provider = meter_provider
self.resource = resource

def set_generator(self, generator: MetricAttributeGenerator) -> "AwsSpanMetricsProcessorBuilder":
"""
Sets the generator used to generate attributes used in metrics produced by span metrics processor. If unset,
defaults to _DEFAULT_GENERATOR. Must not be None.
"""
if generator is None:
raise ValueError("generator must not be None")
self._generator = generator
return self

def set_scope_name(self, scope_name: str) -> "AwsSpanMetricsProcessorBuilder":
"""
Sets the scope name used in the creation of metrics by the span metrics processor. If unset, defaults to
_DEFAULT_SCOPE_NAME. Must not be None.
"""
if scope_name is None:
raise ValueError("scope_name must not be None")
self._scope_name = scope_name
return self

def build(self) -> AwsSpanMetricsProcessor:
meter: Meter = self._meter_provider.get_meter(self._scope_name)
error_histogram: Histogram = meter.create_histogram(_ERROR)
fault_histogram: Histogram = meter.create_histogram(_FAULT)
latency_histogram: Histogram = meter.create_histogram(_LATENCY, unit=_LATENCY_UNITS)

return AwsSpanMetricsProcessor(
error_histogram, fault_histogram, latency_histogram, self._generator, self._resource
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan


class MetricAttributeGenerator:
"""MetricAttributeGenerator is an interface for generating metric attributes from a span.
Metric attribute generator defines an interface for classes that can generate specific attributes to be used by an
AwsSpanMetricsProcessor to produce metrics and by AwsMetricAttributesSpanExporter to wrap the original span.
"""

SERVICE_METRIC: str = "Service"
DEPENDENCY_METRIC: str = "Dependency"

@staticmethod
def generate_metric_attributes_dict_from_span(span: ReadableSpan, resource: Resource) -> [str, BoundedAttributes]:
"""Generate metric attributes from a span.
Given a span and associated resource, produce meaningful metric attributes for metrics produced from the span.
If no metrics should be generated from this span, return empty attributes.
Args:
span - ReadableSpan to be used to generate metric attributes.
resource - Resource associated with Span to be used to generate metric attributes.
Returns:
A dictionary of Attributes objects with values assigned to key "Service" or "Dependency". It will contain
either 0, 1, or 2 items.
"""

0 comments on commit 51c9ec0

Please sign in to comment.