-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
AWS X-Ray Remote Sampler Part 1 - Initial Rules Poller Implementation (…
…#33) *Issue #, if available:* First PR of 3 parts for adding the X-Ray remote sampling support for OTel Python SDK. *Description of changes:* - Python Classes - `AwsXRayRemoteSampler` - extends `opentelemetry.sdk.trace.sampling.Sampler` and implements `should_sample`. - Upon initialization, starts polling for sampling rules by scheduling a threading.Timer to execute a poll after a configurable interval of time. After this interval, it will repeat this process indefinitely by scheduling the same threading.Timer upon completion of the previous timer. - OTel `resource`, Collector `endpoint`, rules `polling_interval` are configurable. - `AwsXRaySamplingClient` - client to call GetSamplingRules - `SamplingRule` - Class for SamplingRules type Testing Script to poll Sampling Rules every 5 seconds: ``` import logging import time from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from opentelemetry.sdk.resources import Resource logging.basicConfig(level=logging.INFO) sampler = AwsXRayRemoteSampler(Resource.get_empty(), polling_interval=5) time.sleep(15) ``` Output: ``` 88665a53c0dd:sampler jjllee$ python3 mytesting.py INFO:amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler:Got Sampling Rules: {'[{"Attributes": {}, "FixedRate": 0.05, "HTTPMethod": "*", "Host": "*", "Priority": 10000, "ReservoirSize": 100, "ResourceARN": "*", "RuleARN": "arn:aws:xray:us-east-1:999999999999:sampling-rule/Default", "RuleName": "Default", "ServiceName": "*", "ServiceType": "*", "URLPath": "*", "Version": 1}, {"Attributes": {"abc": "1234"}, "FixedRate": 0.11, "HTTPMethod": "*", "Host": "*", "Priority": 20, "ReservoirSize": 1, "ResourceARN": "*", "RuleARN": "arn:aws:xray:us-east-1:999999999999:sampling-rule/test", "RuleName": "test", "ServiceName": "*", "ServiceType": "*", "URLPath": "*", "Version": 1}]'} INFO:amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler:Got Sampling Rules: {'[{"Attributes": {}, "FixedRate": 0.05, "HTTPMethod": "*", "Host": "*", "Priority": 10000, "ReservoirSize": 100, "ResourceARN": "*", "RuleARN": "arn:aws:xray:us-east-1:999999999999:sampling-rule/Default", "RuleName": "Default", "ServiceName": "*", "ServiceType": "*", "URLPath": "*", "Version": 1}, {"Attributes": {"abc": "1234"}, "FixedRate": 0.11, "HTTPMethod": "*", "Host": "*", "Priority": 20, "ReservoirSize": 1, "ResourceARN": "*", "RuleARN": "arn:aws:xray:us-east-1:999999999999:sampling-rule/test", "RuleName": "test", "ServiceName": "*", "ServiceType": "*", "URLPath": "*", "Version": 1}]'} INFO:amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler:Got Sampling Rules: {'[{"Attributes": {}, "FixedRate": 0.05, "HTTPMethod": "*", "Host": "*", "Priority": 10000, "ReservoirSize": 100, "ResourceARN": "*", "RuleARN": "arn:aws:xray:us-east-1:999999999999:sampling-rule/Default", "RuleName": "Default", "ServiceName": "*", "ServiceType": "*", "URLPath": "*", "Version": 1}, {"Attributes": {"abc": "1234"}, "FixedRate": 0.11, "HTTPMethod": "*", "Host": "*", "Priority": 20, "ReservoirSize": 1, "ResourceARN": "*", "RuleARN": "arn:aws:xray:us-east-1:999999999999:sampling-rule/test", "RuleName": "test", "ServiceName": "*", "ServiceType": "*", "URLPath": "*", "Version": 1}]'} 88665a53c0dd:sampler jjllee$ ``` By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --------- Co-authored-by: Prashant Srivastava <[email protected]>
- Loading branch information
Showing
6 changed files
with
404 additions
and
0 deletions.
There are no files selected for viewing
51 changes: 51 additions & 0 deletions
51
...opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_aws_xray_sampling_client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import json | ||
from logging import getLogger | ||
|
||
import requests | ||
|
||
from amazon.opentelemetry.distro.sampler._sampling_rule import _SamplingRule | ||
|
||
_logger = getLogger(__name__) | ||
|
||
|
||
class _AwsXRaySamplingClient: | ||
def __init__(self, endpoint=None, log_level=None): | ||
# Override default log level | ||
if log_level is not None: | ||
_logger.setLevel(log_level) | ||
|
||
if endpoint is None: | ||
_logger.error("endpoint must be specified") | ||
self.__get_sampling_rules_endpoint = endpoint + "/GetSamplingRules" | ||
|
||
def get_sampling_rules(self) -> [_SamplingRule]: | ||
sampling_rules = [] | ||
headers = {"content-type": "application/json"} | ||
|
||
try: | ||
xray_response = requests.post(url=self.__get_sampling_rules_endpoint, headers=headers, timeout=20) | ||
if xray_response is None: | ||
_logger.error("GetSamplingRules response is None") | ||
return [] | ||
sampling_rules_response = xray_response.json() | ||
if "SamplingRuleRecords" not in sampling_rules_response: | ||
_logger.error( | ||
"SamplingRuleRecords is missing in getSamplingRules response: %s", sampling_rules_response | ||
) | ||
return [] | ||
|
||
sampling_rules_records = sampling_rules_response["SamplingRuleRecords"] | ||
for record in sampling_rules_records: | ||
if "SamplingRule" not in record: | ||
_logger.error("SamplingRule is missing in SamplingRuleRecord") | ||
else: | ||
sampling_rules.append(_SamplingRule(**record["SamplingRule"])) | ||
|
||
except requests.exceptions.RequestException as req_err: | ||
_logger.error("Request error occurred: %s", req_err) | ||
except json.JSONDecodeError as json_err: | ||
_logger.error("Error in decoding JSON response: %s", json_err) | ||
|
||
return sampling_rules |
37 changes: 37 additions & 0 deletions
37
aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/_sampling_rule.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
|
||
# Disable snake_case naming style so this class can match the sampling rules response from X-Ray | ||
# pylint: disable=invalid-name | ||
class _SamplingRule: | ||
def __init__( | ||
self, | ||
Attributes: dict = None, | ||
FixedRate=None, | ||
HTTPMethod=None, | ||
Host=None, | ||
Priority=None, | ||
ReservoirSize=None, | ||
ResourceARN=None, | ||
RuleARN=None, | ||
RuleName=None, | ||
ServiceName=None, | ||
ServiceType=None, | ||
URLPath=None, | ||
Version=None, | ||
): | ||
self.Attributes = Attributes if Attributes is not None else {} | ||
self.FixedRate = FixedRate if FixedRate is not None else 0.0 | ||
self.HTTPMethod = HTTPMethod if HTTPMethod is not None else "" | ||
self.Host = Host if Host is not None else "" | ||
# Default to value with lower priority than default rule | ||
self.Priority = Priority if Priority is not None else 10001 | ||
self.ReservoirSize = ReservoirSize if ReservoirSize is not None else 0 | ||
self.ResourceARN = ResourceARN if ResourceARN is not None else "" | ||
self.RuleARN = RuleARN if RuleARN is not None else "" | ||
self.RuleName = RuleName if RuleName is not None else "" | ||
self.ServiceName = ServiceName if ServiceName is not None else "" | ||
self.ServiceType = ServiceType if ServiceType is not None else "" | ||
self.URLPath = URLPath if URLPath is not None else "" | ||
self.Version = Version if Version is not None else 0 |
101 changes: 101 additions & 0 deletions
101
aws-opentelemetry-distro/src/amazon/opentelemetry/distro/sampler/aws_xray_remote_sampler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import json | ||
from logging import getLogger | ||
from threading import Timer | ||
from typing import Optional, Sequence | ||
|
||
from typing_extensions import override | ||
|
||
from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient | ||
from opentelemetry.context import Context | ||
from opentelemetry.sdk.resources import Resource | ||
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF, Sampler, SamplingResult | ||
from opentelemetry.trace import Link, SpanKind | ||
from opentelemetry.trace.span import TraceState | ||
from opentelemetry.util.types import Attributes | ||
|
||
_logger = getLogger(__name__) | ||
|
||
DEFAULT_RULES_POLLING_INTERVAL_SECONDS = 300 | ||
DEFAULT_TARGET_POLLING_INTERVAL_SECONDS = 10 | ||
DEFAULT_SAMPLING_PROXY_ENDPOINT = "http://127.0.0.1:2000" | ||
|
||
|
||
class AwsXRayRemoteSampler(Sampler): | ||
""" | ||
Remote Sampler for OpenTelemetry that gets sampling configurations from AWS X-Ray | ||
Args: | ||
resource: OpenTelemetry Resource (Required) | ||
endpoint: proxy endpoint for AWS X-Ray Sampling (Optional) | ||
polling_interval: Polling interval for getSamplingRules call (Optional) | ||
log_level: custom log level configuration for remote sampler (Optional) | ||
""" | ||
|
||
__resource: Resource | ||
__polling_interval: int | ||
__xray_client: _AwsXRaySamplingClient | ||
|
||
def __init__( | ||
self, | ||
resource: Resource, | ||
endpoint=DEFAULT_SAMPLING_PROXY_ENDPOINT, | ||
polling_interval=DEFAULT_RULES_POLLING_INTERVAL_SECONDS, | ||
log_level=None, | ||
): | ||
# Override default log level | ||
if log_level is not None: | ||
_logger.setLevel(log_level) | ||
|
||
self.__xray_client = _AwsXRaySamplingClient(endpoint, log_level=log_level) | ||
self.__polling_interval = polling_interval | ||
|
||
# pylint: disable=unused-private-member | ||
if resource is not None: | ||
self.__resource = resource | ||
else: | ||
_logger.warning("OTel Resource provided is `None`. Defaulting to empty resource") | ||
self.__resource = Resource.get_empty() | ||
|
||
# Schedule the next rule poll now | ||
# Python Timers only run once, so they need to be recreated for every poll | ||
self._timer = Timer(0, self.__start_sampling_rule_poller) | ||
self._timer.daemon = True # Ensures that when the main thread exits, the Timer threads are killed | ||
self._timer.start() | ||
|
||
# pylint: disable=no-self-use | ||
@override | ||
def should_sample( | ||
self, | ||
parent_context: Optional["Context"], | ||
trace_id: int, | ||
name: str, | ||
kind: SpanKind = None, | ||
attributes: Attributes = None, | ||
links: Sequence["Link"] = None, | ||
trace_state: "TraceState" = None, | ||
) -> SamplingResult: | ||
# TODO: add sampling functionality | ||
return ALWAYS_OFF.should_sample( | ||
parent_context, trace_id, name, kind=kind, attributes=attributes, links=links, trace_state=trace_state | ||
) | ||
|
||
# pylint: disable=no-self-use | ||
@override | ||
def get_description(self) -> str: | ||
description = "AwsXRayRemoteSampler{remote sampling with AWS X-Ray}" | ||
return description | ||
|
||
def __get_and_update_sampling_rules(self): | ||
sampling_rules = self.__xray_client.get_sampling_rules() | ||
|
||
# TODO: Update sampling rules cache | ||
_logger.info("Got Sampling Rules: %s", {json.dumps([ob.__dict__ for ob in sampling_rules])}) | ||
|
||
def __start_sampling_rule_poller(self): | ||
self.__get_and_update_sampling_rules() | ||
# Schedule the next sampling rule poll | ||
self._timer = Timer(self.__polling_interval, self.__start_sampling_rule_poller) | ||
self._timer.daemon = True | ||
self._timer.start() |
65 changes: 65 additions & 0 deletions
65
...ro/tests/amazon/opentelemetry/distro/sampler/data/get-sampling-rules-response-sample.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
{ | ||
"NextToken": null, | ||
"SamplingRuleRecords": [ | ||
{ | ||
"CreatedAt": 1.67799933E9, | ||
"ModifiedAt": 1.67799933E9, | ||
"SamplingRule": { | ||
"Attributes": { | ||
"foo": "bar", | ||
"doo": "baz" | ||
}, | ||
"FixedRate": 0.05, | ||
"HTTPMethod": "*", | ||
"Host": "*", | ||
"Priority": 1000, | ||
"ReservoirSize": 10, | ||
"ResourceARN": "*", | ||
"RuleARN": "arn:aws:xray:us-west-2:123456789000:sampling-rule/Rule1", | ||
"RuleName": "Rule1", | ||
"ServiceName": "*", | ||
"ServiceType": "AWS::Foo::Bar", | ||
"URLPath": "*", | ||
"Version": 1 | ||
} | ||
}, | ||
{ | ||
"CreatedAt": 0.0, | ||
"ModifiedAt": 1.611564245E9, | ||
"SamplingRule": { | ||
"Attributes": {}, | ||
"FixedRate": 0.05, | ||
"HTTPMethod": "*", | ||
"Host": "*", | ||
"Priority": 10000, | ||
"ReservoirSize": 1, | ||
"ResourceARN": "*", | ||
"RuleARN": "arn:aws:xray:us-west-2:123456789000:sampling-rule/Default", | ||
"RuleName": "Default", | ||
"ServiceName": "*", | ||
"ServiceType": "*", | ||
"URLPath": "*", | ||
"Version": 1 | ||
} | ||
}, | ||
{ | ||
"CreatedAt": 1.676038494E9, | ||
"ModifiedAt": 1.676038494E9, | ||
"SamplingRule": { | ||
"Attributes": {}, | ||
"FixedRate": 0.2, | ||
"HTTPMethod": "GET", | ||
"Host": "*", | ||
"Priority": 1, | ||
"ReservoirSize": 10, | ||
"ResourceARN": "*", | ||
"RuleARN": "arn:aws:xray:us-west-2:123456789000:sampling-rule/Rule2", | ||
"RuleName": "Rule2", | ||
"ServiceName": "FooBar", | ||
"ServiceType": "*", | ||
"URLPath": "/foo/bar", | ||
"Version": 1 | ||
} | ||
} | ||
] | ||
} |
45 changes: 45 additions & 0 deletions
45
...elemetry-distro/tests/amazon/opentelemetry/distro/sampler/test_aws_xray_remote_sampler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
from logging import DEBUG | ||
from unittest import TestCase | ||
|
||
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler | ||
from opentelemetry.sdk.resources import Resource | ||
|
||
|
||
class TestAwsXRayRemoteSampler(TestCase): | ||
def test_create_remote_sampler_with_empty_resource(self): | ||
rs = AwsXRayRemoteSampler(resource=Resource.get_empty()) | ||
self.assertIsNotNone(rs._timer) | ||
self.assertEqual(rs._AwsXRayRemoteSampler__polling_interval, 300) | ||
self.assertIsNotNone(rs._AwsXRayRemoteSampler__xray_client) | ||
self.assertIsNotNone(rs._AwsXRayRemoteSampler__resource) | ||
|
||
def test_create_remote_sampler_with_populated_resource(self): | ||
rs = AwsXRayRemoteSampler( | ||
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"}) | ||
) | ||
self.assertIsNotNone(rs._timer) | ||
self.assertEqual(rs._AwsXRayRemoteSampler__polling_interval, 300) | ||
self.assertIsNotNone(rs._AwsXRayRemoteSampler__xray_client) | ||
self.assertIsNotNone(rs._AwsXRayRemoteSampler__resource) | ||
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["service.name"], "test-service-name") | ||
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["cloud.platform"], "test-cloud-platform") | ||
|
||
def test_create_remote_sampler_with_all_fields_populated(self): | ||
rs = AwsXRayRemoteSampler( | ||
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"}), | ||
endpoint="http://abc.com", | ||
polling_interval=120, | ||
log_level=DEBUG, | ||
) | ||
self.assertIsNotNone(rs._timer) | ||
self.assertEqual(rs._AwsXRayRemoteSampler__polling_interval, 120) | ||
self.assertIsNotNone(rs._AwsXRayRemoteSampler__xray_client) | ||
self.assertIsNotNone(rs._AwsXRayRemoteSampler__resource) | ||
self.assertEqual( | ||
rs._AwsXRayRemoteSampler__xray_client._AwsXRaySamplingClient__get_sampling_rules_endpoint, | ||
"http://abc.com/GetSamplingRules", | ||
) | ||
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["service.name"], "test-service-name") | ||
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["cloud.platform"], "test-cloud-platform") |
Oops, something went wrong.