Skip to content

Commit

Permalink
statistics lock
Browse files Browse the repository at this point in the history
  • Loading branch information
jj22ee committed Feb 9, 2024
1 parent 4de549a commit 5e4e778
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ def __init__(self, sampling_rule: _SamplingRule, client_id: str, clock: _Clock):
self.fixed_rate_sampler = TraceIdRatioBased(self.sampling_rule.FixedRate)

# TODO rename to rate limiter
# Initialize as borrowing if there will be a quota > 0
# Initialize with borrowing allowed if there will be a quota > 0
if self.sampling_rule.ReservoirSize > 0:
self.reservoir_sampler = _ReservoirSampler(1, self.clock)
self.borrowing = True
self.can_borrow = True
else:
self.reservoir_sampler = _ReservoirSampler(0, self.clock)
self.borrowing = False
self.can_borrow = False

self.reservoir_expiry = self.clock.now()
self.polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS
Expand All @@ -51,12 +51,15 @@ def should_sample(
links: Sequence[Link] = None,
trace_state: TraceState = None,
) -> SamplingResult:
has_borrowed = False
has_sampled = False

with self.statistics_lock: self.statistics.RequestCount += 1

reservoir_expired: bool = self.clock.now() > self.reservoir_expiry
sampling_result = SamplingResult(decision=Decision.DROP, attributes=attributes, trace_state=trace_state)
if reservoir_expired:
self.borrowing = True
self.can_borrow = True

sampling_result = self.reservoir_sampler.should_sample(
parent_context,
Expand All @@ -71,23 +74,26 @@ def should_sample(

if sampling_result.decision is not Decision.DROP:
with self.statistics_lock:
if self.borrowing:
self.statistics.BorrowCount += 1
self.statistics.SampleCount += 1
return sampling_result
has_borrowed = self.can_borrow
has_sampled = True
else:
sampling_result = self.fixed_rate_sampler.should_sample(
parent_context,
trace_id,
name,
kind=kind,
attributes=attributes,
links=links,
trace_state=trace_state
)
if sampling_result.decision is not Decision.DROP:
has_sampled = True

with self.statistics_lock:
self.statistics.RequestCount += 1
self.statistics.BorrowCount += (1 if has_borrowed else 0)
self.statistics.SampleCount += (1 if has_sampled else 0)

sampling_result = self.fixed_rate_sampler.should_sample(
parent_context,
trace_id,
name,
kind=kind,
attributes=attributes,
links=links,
trace_state=trace_state
)
if sampling_result.decision is not Decision.DROP:
with self.statistics_lock:
self.statistics.SampleCount += 1
return sampling_result

def update_target(self, target):
Expand All @@ -104,7 +110,7 @@ def update_target(self, target):
self.reservoir_expiry = self.clock.now()

self.polling_interval = target["Interval"]
self.borrowing = False
self.can_borrow = False

def get_then_reset_statistics(self) -> _SamplingStatisticsDocument:
self.statistics_lock.acquire()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ def __init__(

# Schedule the next rule poll now
# Python Timers only run once, so they need to be recreated for every poll
self._timer = Timer(0, self.__start_sampling_rule_poller)
self._timer.daemon = True # Ensures that when the main thread exits, the Timer threads are killed
self._timer.start()
self._rules_timer = Timer(0, self.__start_sampling_rule_poller)
self._rules_timer.daemon = True # Ensures that when the main thread exits, the Timer threads are killed
self._rules_timer.start()

# set up the target poller to go off once after the default interval. Subsequent polls may use new intervals.
self._timer2 = Timer(DEFAULT_TARGET_POLLING_INTERVAL_SECONDS, self.__start_sampling_target_poller)
self._timer2.daemon = True # Ensures that when the main thread exits, the Timer threads are killed
self._timer2.start()
self._targets_timer = Timer(DEFAULT_TARGET_POLLING_INTERVAL_SECONDS, self.__start_sampling_target_poller)
self._targets_timer.daemon = True # Ensures that when the main thread exits, the Timer threads are killed
self._targets_timer.start()

# pylint: disable=no-self-use
@override
Expand Down Expand Up @@ -115,9 +115,9 @@ def __get_and_update_sampling_rules(self) -> None:
def __start_sampling_rule_poller(self) -> None:
self.__get_and_update_sampling_rules()
# Schedule the next sampling rule poll
self._timer = Timer(self.__polling_interval + self.__rule_polling_jitter, self.__start_sampling_rule_poller)
self._timer.daemon = True
self._timer.start()
self._rules_timer = Timer(self.__polling_interval + self.__rule_polling_jitter, self.__start_sampling_rule_poller)
self._rules_timer.daemon = True
self._rules_timer.start()

def __get_and_update_sampling_targets(self) -> None:
all_statistics = self.__rule_cache.get_all_statistics()
Expand All @@ -131,9 +131,9 @@ def __get_and_update_sampling_targets(self) -> None:
def __start_sampling_target_poller(self) -> None:
self.__get_and_update_sampling_targets()
# Schedule the next sampling rule poll
self._timer2 = Timer(self.__target_polling_interval + self.__target_polling_jitter, self.__start_sampling_target_poller)
self._timer2.daemon = True
self._timer2.start()
self._targets_timer = Timer(self.__target_polling_interval + self.__target_polling_jitter, self.__start_sampling_target_poller)
self._targets_timer.daemon = True
self._targets_timer.start()

def __generate_client_id(self) -> str:
hex = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f']
Expand Down

0 comments on commit 5e4e778

Please sign in to comment.