Skip to content

Commit

Permalink
storage (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Mar 4, 2021
1 parent f993072 commit a82fa83
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 39 deletions.
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Fix `logger.exception` with no exception info throwing error
([#1006](https://github.com/census-instrumentation/opencensus-python/pull/1006))
- Add `enable_local_storage` to turn on/off local storage + retry + flushing logic
([#1006](https://github.com/census-instrumentation/opencensus-python/pull/1006))

## 1.0.7
Released 2021-01-25
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(self, *args, **kwargs):

_default = BaseObject(
connection_string=None,
enable_local_storage=True,
enable_standard_metrics=True,
endpoint='https://dc.services.visualstudio.com/v2/track',
export_interval=15.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@

class TransportMixin(object):
def _transmit_from_storage(self):
for blob in self.storage.gets():
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self.options.timeout + 5):
envelopes = blob.get()
result = self._transmit(envelopes)
if result > 0:
blob.lease(result)
else:
blob.delete()
if self.storage:
for blob in self.storage.gets():
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self.options.timeout + 5):
envelopes = blob.get()
result = self._transmit(envelopes)
if result > 0:
blob.lease(result)
else:
blob.delete()

def _transmit(self, envelopes):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ def __init__(self, **options):
raise ValueError('Sampling must be in the range: [0,1]')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
source=self.__class__.__name__,
)
self.storage = None
if self.options.enable_local_storage:
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
source=self.__class__.__name__,
)
self._telemetry_processors = []
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))
self._queue = Queue(capacity=self.options.queue_capacity)
Expand All @@ -66,7 +68,8 @@ def _export(self, batch, event=None): # pragma: NO COVER
envelopes = [self.log_record_to_envelope(x) for x in batch]
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
if result > 0:
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(envelopes, result)
if event:
if isinstance(event, QueueExitEvent):
Expand All @@ -79,8 +82,10 @@ def _export(self, batch, event=None): # pragma: NO COVER
event.set()

def close(self):
self.storage.close()
self._worker.stop()
if self.storage:
self.storage.close()
if self._worker:
self._worker.stop()

def createLock(self):
self.lock = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ def __init__(self, **options):
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
self._telemetry_processors = []
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
source=self.__class__.__name__,
)
self.storage = None
if self.options.enable_local_storage:
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
source=self.__class__.__name__,
)
self._atexit_handler = atexit.register(self.shutdown)
self.exporter_thread = None
super(MetricsExporter, self).__init__()
Expand All @@ -68,7 +70,8 @@ def export_metrics(self, metrics):
for batch in batched_envelopes:
batch = self.apply_telemetry_processors(batch)
result = self._transmit(batch)
if result > 0:
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(batch, result)

# If there is still room to transmit envelopes, transmit from storage
Expand Down Expand Up @@ -141,7 +144,8 @@ def shutdown(self):
if self.exporter_thread:
self.exporter_thread.close()
# Shutsdown storage worker
self.storage.close()
if self.storage:
self.storage.close()


def new_metrics_exporter(**options):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ class AzureExporter(BaseExporter, ProcessorMixin, TransportMixin):
def __init__(self, **options):
self.options = Options(**options)
utils.validate_instrumentation_key(self.options.instrumentation_key)
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
source=self.__class__.__name__,
)
self.storage = None
if self.options.enable_local_storage:
self.storage = LocalFileStorage(
path=self.options.storage_path,
max_size=self.options.storage_max_size,
maintenance_period=self.options.storage_maintenance_period,
retention_period=self.options.storage_retention_period,
source=self.__class__.__name__,
)
self._telemetry_processors = []
super(AzureExporter, self).__init__(**options)
atexit.register(self._stop, self.options.grace_period)
Expand Down Expand Up @@ -166,7 +168,8 @@ def emit(self, batch, event=None):
envelopes = [self.span_data_to_envelope(sd) for sd in batch]
envelopes = self.apply_telemetry_processors(envelopes)
result = self._transmit(envelopes)
if result > 0:
# Only store files if local storage enabled
if self.storage and result > 0:
self.storage.put(envelopes, result)
if event:
if isinstance(event, QueueExitEvent):
Expand All @@ -179,5 +182,7 @@ def emit(self, batch, event=None):
logger.exception('Exception occurred while exporting the data.')

def _stop(self, timeout=None):
self.storage.close()
self._worker.stop(timeout)
if self.storage:
self.storage.close()
if self._worker:
self._worker.stop(timeout)

0 comments on commit a82fa83

Please sign in to comment.