Skip to content

Commit

Permalink
Merge branch 'main' into enhance_kinesis
Browse files Browse the repository at this point in the history
  • Loading branch information
zzhlogin authored Jun 5, 2024
2 parents d670649 + 166c4cb commit 920ce00
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,80 +49,93 @@ def json(self):


class TestAwsXRayRemoteSampler(TestCase):
def setUp(self):
self.rs = None

def tearDown(self):
# Clean up timers
if self.rs is not None:
self.rs._rules_timer.cancel()
self.rs._targets_timer.cancel()

def test_create_remote_sampler_with_empty_resource(self):
rs = AwsXRayRemoteSampler(resource=Resource.get_empty())
self.assertIsNotNone(rs._rules_timer)
self.assertEqual(rs._AwsXRayRemoteSampler__polling_interval, 300)
self.assertIsNotNone(rs._AwsXRayRemoteSampler__xray_client)
self.assertIsNotNone(rs._AwsXRayRemoteSampler__resource)
self.assertTrue(len(rs._AwsXRayRemoteSampler__client_id), 24)
self.rs = AwsXRayRemoteSampler(resource=Resource.get_empty())
self.assertIsNotNone(self.rs._rules_timer)
self.assertEqual(self.rs._AwsXRayRemoteSampler__polling_interval, 300)
self.assertIsNotNone(self.rs._AwsXRayRemoteSampler__xray_client)
self.assertIsNotNone(self.rs._AwsXRayRemoteSampler__resource)
self.assertTrue(len(self.rs._AwsXRayRemoteSampler__client_id), 24)

def test_create_remote_sampler_with_populated_resource(self):
rs = AwsXRayRemoteSampler(
self.rs = AwsXRayRemoteSampler(
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"})
)
self.assertIsNotNone(rs._rules_timer)
self.assertEqual(rs._AwsXRayRemoteSampler__polling_interval, 300)
self.assertIsNotNone(rs._AwsXRayRemoteSampler__xray_client)
self.assertIsNotNone(rs._AwsXRayRemoteSampler__resource)
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["service.name"], "test-service-name")
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["cloud.platform"], "test-cloud-platform")
self.assertIsNotNone(self.rs._rules_timer)
self.assertEqual(self.rs._AwsXRayRemoteSampler__polling_interval, 300)
self.assertIsNotNone(self.rs._AwsXRayRemoteSampler__xray_client)
self.assertIsNotNone(self.rs._AwsXRayRemoteSampler__resource)
self.assertEqual(self.rs._AwsXRayRemoteSampler__resource.attributes["service.name"], "test-service-name")
self.assertEqual(self.rs._AwsXRayRemoteSampler__resource.attributes["cloud.platform"], "test-cloud-platform")

def test_create_remote_sampler_with_all_fields_populated(self):
rs = AwsXRayRemoteSampler(
self.rs = AwsXRayRemoteSampler(
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"}),
endpoint="http://abc.com",
polling_interval=120,
log_level=DEBUG,
)
self.assertIsNotNone(rs._rules_timer)
self.assertEqual(rs._AwsXRayRemoteSampler__polling_interval, 120)
self.assertIsNotNone(rs._AwsXRayRemoteSampler__xray_client)
self.assertIsNotNone(rs._AwsXRayRemoteSampler__resource)
self.assertIsNotNone(self.rs._rules_timer)
self.assertEqual(self.rs._AwsXRayRemoteSampler__polling_interval, 120)
self.assertIsNotNone(self.rs._AwsXRayRemoteSampler__xray_client)
self.assertIsNotNone(self.rs._AwsXRayRemoteSampler__resource)
self.assertEqual(
rs._AwsXRayRemoteSampler__xray_client._AwsXRaySamplingClient__get_sampling_rules_endpoint,
self.rs._AwsXRayRemoteSampler__xray_client._AwsXRaySamplingClient__get_sampling_rules_endpoint,
"http://abc.com/GetSamplingRules",
)
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["service.name"], "test-service-name")
self.assertEqual(rs._AwsXRayRemoteSampler__resource.attributes["cloud.platform"], "test-cloud-platform")
self.assertEqual(self.rs._AwsXRayRemoteSampler__resource.attributes["service.name"], "test-service-name")
self.assertEqual(self.rs._AwsXRayRemoteSampler__resource.attributes["cloud.platform"], "test-cloud-platform")

@patch("requests.Session.post", side_effect=mocked_requests_get)
@patch("amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler.DEFAULT_TARGET_POLLING_INTERVAL_SECONDS", 2)
def test_update_sampling_rules_and_targets_with_pollers_and_should_sample(self, mock_post=None):
rs = AwsXRayRemoteSampler(
self.rs = AwsXRayRemoteSampler(
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"})
)
self.assertEqual(rs._AwsXRayRemoteSampler__target_polling_interval, 2)
self.assertEqual(self.rs._AwsXRayRemoteSampler__target_polling_interval, 2)

time.sleep(1.0)
self.assertEqual(
rs._AwsXRayRemoteSampler__rule_cache._RuleCache__rule_appliers[0].sampling_rule.RuleName, "test"
self.rs._AwsXRayRemoteSampler__rule_cache._RuleCache__rule_appliers[0].sampling_rule.RuleName,
"test",
)
self.assertEqual(rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision, Decision.DROP)
self.assertEqual(self.rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision, Decision.DROP)

# wait 2 more seconds since targets polling was patched to 2 seconds (rather than 10s)
time.sleep(2.0)
self.assertEqual(rs._AwsXRayRemoteSampler__target_polling_interval, 1000)
self.assertEqual(self.rs._AwsXRayRemoteSampler__target_polling_interval, 1000)
self.assertEqual(
rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision, Decision.RECORD_AND_SAMPLE
self.rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision,
Decision.RECORD_AND_SAMPLE,
)
self.assertEqual(
rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision, Decision.RECORD_AND_SAMPLE
self.rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision,
Decision.RECORD_AND_SAMPLE,
)
self.assertEqual(
rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision, Decision.RECORD_AND_SAMPLE
self.rs.should_sample(None, 0, "name", attributes={"abc": "1234"}).decision,
Decision.RECORD_AND_SAMPLE,
)

@patch("requests.Session.post", side_effect=mocked_requests_get)
@patch("amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler.DEFAULT_TARGET_POLLING_INTERVAL_SECONDS", 3)
def test_multithreading_with_large_reservoir_with_otel_sdk(self, mock_post=None):
rs = AwsXRayRemoteSampler(
self.rs = AwsXRayRemoteSampler(
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"})
)
attributes = {"abc": "1234"}

time.sleep(2.0)
self.assertEqual(rs.should_sample(None, 0, "name", attributes=attributes).decision, Decision.DROP)
self.assertEqual(self.rs.should_sample(None, 0, "name", attributes=attributes).decision, Decision.DROP)

# wait 3 more seconds since targets polling was patched to 2 seconds (rather than 10s)
time.sleep(3.0)
Expand All @@ -139,7 +152,7 @@ def test_multithreading_with_large_reservoir_with_otel_sdk(self, mock_post=None)
target=create_spans,
name="thread_" + str(idx),
daemon=True,
args=(sampled_array, idx, attributes, rs, number_of_spans),
args=(sampled_array, idx, attributes, self.rs, number_of_spans),
)
)
threads[idx].start()
Expand All @@ -149,7 +162,7 @@ def test_multithreading_with_large_reservoir_with_otel_sdk(self, mock_post=None)
threads[idx].join()
sum_sampled += sampled_array[idx]

test_rule_applier = rs._AwsXRayRemoteSampler__rule_cache._RuleCache__rule_appliers[0]
test_rule_applier = self.rs._AwsXRayRemoteSampler__rule_cache._RuleCache__rule_appliers[0]
self.assertEqual(
test_rule_applier._SamplingRuleApplier__reservoir_sampler._root._RateLimitingSampler__reservoir._quota,
100000,
Expand All @@ -161,7 +174,7 @@ def test_multithreading_with_large_reservoir_with_otel_sdk(self, mock_post=None)
@patch("amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler.DEFAULT_TARGET_POLLING_INTERVAL_SECONDS", 2)
@patch("amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler._Clock", MockClock)
def test_multithreading_with_some_reservoir_with_otel_sdk(self, mock_post=None):
rs = AwsXRayRemoteSampler(
self.rs = AwsXRayRemoteSampler(
resource=Resource.create({"service.name": "test-service-name", "cloud.platform": "test-cloud-platform"})
)
attributes = {"abc": "non-matching attribute value, use default rule"}
Expand All @@ -170,17 +183,19 @@ def test_multithreading_with_some_reservoir_with_otel_sdk(self, mock_post=None):
# which will eat up more than 1 second of reservoir. Using MockClock we can freeze time
# and pretend all thread jobs start and end at the exact same time,
# assume and test exactly 1 second of reservoir (100 quota) only
mock_clock: MockClock = rs._clock
mock_clock: MockClock = self.rs._clock

time.sleep(1.0)
mock_clock.add_time(1.0)
self.assertEqual(mock_clock.now(), rs._clock.now())
self.assertEqual(rs.should_sample(None, 0, "name", attributes=attributes).decision, Decision.RECORD_AND_SAMPLE)
self.assertEqual(mock_clock.now(), self.rs._clock.now())
self.assertEqual(
self.rs.should_sample(None, 0, "name", attributes=attributes).decision, Decision.RECORD_AND_SAMPLE
)

# wait 2 more seconds since targets polling was patched to 2 seconds (rather than 10s)
time.sleep(2.0)
mock_clock.add_time(2.0)
self.assertEqual(mock_clock.now(), rs._clock.now())
self.assertEqual(mock_clock.now(), self.rs._clock.now())

number_of_spans = 100
thread_count = 1000
Expand All @@ -194,7 +209,7 @@ def test_multithreading_with_some_reservoir_with_otel_sdk(self, mock_post=None):
target=create_spans,
name="thread_" + str(idx),
daemon=True,
args=(sampled_array, idx, attributes, rs, number_of_spans),
args=(sampled_array, idx, attributes, self.rs, number_of_spans),
)
)
threads[idx].start()
Expand All @@ -204,7 +219,7 @@ def test_multithreading_with_some_reservoir_with_otel_sdk(self, mock_post=None):
threads[idx].join()
sum_sampled += sampled_array[idx]

default_rule_applier = rs._AwsXRayRemoteSampler__rule_cache._RuleCache__rule_appliers[1]
default_rule_applier = self.rs._AwsXRayRemoteSampler__rule_cache._RuleCache__rule_appliers[1]
self.assertEqual(
default_rule_applier._SamplingRuleApplier__reservoir_sampler._root._RateLimitingSampler__reservoir._quota,
100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import logging
import os
import time
from importlib import reload
from logging import getLogger
from unittest import TestCase
Expand Down Expand Up @@ -187,11 +186,7 @@ def test_urls_excluded_from_sampling(self):
except requests.exceptions.RequestException:
pass

timeout = time.time() + 1
span_list = memory_exporter.get_finished_spans()
while len(span_list) != 1 and timeout > time.time():
span_list = memory_exporter.get_finished_spans()
time.sleep(0.1)
self.assertEqual(1, len(span_list))
span_http_url = span_list[0].attributes.get("http.url")
self.assertEqual(span_http_url, "http://this_is_a_fake_url:3849/GetSamplingRules")
Expand All @@ -201,11 +196,7 @@ def test_urls_excluded_from_sampling(self):
except requests.exceptions.RequestException:
pass

timeout = time.time() + 1
span_list = memory_exporter.get_finished_spans()
while len(span_list) != 2 and timeout > time.time():
span_list = memory_exporter.get_finished_spans()
time.sleep(0.1)
self.assertEqual(2, len(span_list))
span_http_url = span_list[1].attributes.get("http.url")
self.assertEqual(span_http_url, "http://this_is_a_fake_url:3849/SamplingTargets")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def test_import_default_sampler_when_env_var_is_not_set(self):
self.assertEqual(default_sampler.get_description(), DEFAULT_ON.get_description())
# DEFAULT_ON is a ParentBased(ALWAYS_ON) sampler

@patch.object(AwsXRayRemoteSampler, "_AwsXRayRemoteSampler__start_sampling_rule_poller", lambda x: None)
@patch.object(AwsXRayRemoteSampler, "_AwsXRayRemoteSampler__start_sampling_target_poller", lambda x: None)
def test_using_xray_sampler_sets_url_exclusion_env_vars(self):
targets_to_exclude = "SamplingTargets,GetSamplingRules"
os.environ.pop("OTEL_PYTHON_REQUESTS_EXCLUDED_URLS", None)
Expand All @@ -186,6 +188,8 @@ def test_using_xray_sampler_sets_url_exclusion_env_vars(self):
self.assertEqual(os.environ.get("OTEL_PYTHON_REQUESTS_EXCLUDED_URLS", None), targets_to_exclude)
self.assertEqual(os.environ.get("OTEL_PYTHON_URLLIB3_EXCLUDED_URLS", None), targets_to_exclude)

@patch.object(AwsXRayRemoteSampler, "_AwsXRayRemoteSampler__start_sampling_rule_poller", lambda x: None)
@patch.object(AwsXRayRemoteSampler, "_AwsXRayRemoteSampler__start_sampling_target_poller", lambda x: None)
def test_using_xray_sampler_appends_url_exclusion_env_vars(self):
targets_to_exclude = "SamplingTargets,GetSamplingRules"
os.environ.pop("OTEL_PYTHON_REQUESTS_EXCLUDED_URLS", None)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from unittest import TestCase

from amazon.opentelemetry.distro.version import __version__


class TestVersion(TestCase):
def test_version_is_not_empty_and_not_none(self):
self.assertIsNotNone(__version__)
self.assertNotEqual(__version__, "")

0 comments on commit 920ce00

Please sign in to comment.