Skip to content

Commit

Permalink
Use explicitly provided storage instance
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Jun 9, 2022
1 parent f3cfeff commit ce5871e
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 71 deletions.
9 changes: 5 additions & 4 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ The following configuration options can be modified:
To use persistent queue, following setting needs to be enabled:

- `sending_queue`
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for persistent queue
- (deprecated) `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension

The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches).

When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using [filestorage] extension. If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and the exporting is continued.
When persistent queue is enabled, the batches are being buffered to disk using [filestorage] extension. If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and the exporting is continued.

```
┌─Consumer #1─┐
Expand Down Expand Up @@ -87,9 +88,9 @@ exporters:
otlp:
endpoint: <ENDPOINT>
sending_queue:
persistent_storage_enabled: true
storage: file_storage/otc
extensions:
file_storage:
file_storage/otc:
directory: /var/lib/storage/otc
timeout: 10s
service:
Expand Down
23 changes: 15 additions & 8 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ type persistentQueue struct {
storage persistentStorage
}

// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
// to avoid conflicts between different signals, which require unique persistent storage name
func buildPersistentStorageName(name string, signal config.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}
// PersistentQueueStartFunc defines a function that can be used to start the persistent queue
type PersistentQueueStartFunc func(ctx context.Context, client storage.Client)

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal config.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
return &persistentQueue{
func NewPersistentQueue(name string, signal config.DataType, capacity int, logger *zap.Logger, unmarshaler RequestUnmarshaler) (ProducerConsumerQueue, PersistentQueueStartFunc) {
pq := &persistentQueue{
logger: logger,
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(buildPersistentStorageName(name, signal), uint64(capacity), logger, unmarshaler),
}

return pq, func(ctx context.Context, client storage.Client) {
pq.storage.start(ctx, client)
}
}

Expand Down Expand Up @@ -94,3 +95,9 @@ func (pq *persistentQueue) Stop() {
func (pq *persistentQueue) Size() int {
return int(pq.storage.size())
}

// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
// to avoid conflicts between different signals, which require unique persistent storage name
func buildPersistentStorageName(name string, signal config.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}
3 changes: 2 additions & 1 deletion exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", config.TracesDataType, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq, start := NewPersistentQueue("foo", config.TracesDataType, capacity, logger, newFakeTracesRequestUnmarshalerFunc())
start(context.Background(), client)
return wq.(*persistentQueue)
}

Expand Down
20 changes: 13 additions & 7 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type persistentStorage interface {
get() <-chan PersistentRequest
// size returns the current size of the persistent storage with items waiting for processing
size() uint64
// start attaches the storage client and initializes the storage
start(ctx context.Context, client storage.Client)
// stop gracefully stops the storage
stop()
}
Expand Down Expand Up @@ -104,11 +106,10 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
// The queue needs to be initialized separately using initPersistentContiguousStorage.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
// The queue needs to be started separately using start().
func newPersistentContiguousStorage(queueName string, capacity uint64, logger *zap.Logger, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
queueName: queueName,
unmarshaler: unmarshaler,
capacity: capacity,
Expand All @@ -118,7 +119,14 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac
itemsCount: atomic.NewUint64(0),
}

initPersistentContiguousStorage(ctx, pcs)
return pcs
}

// start attaches the storage client and initializes the queue
func (pcs *persistentContiguousStorage) start(ctx context.Context, client storage.Client) {
pcs.client = client

pcs.initPersistentContiguousStorage(ctx)
notDispatchedReqs := pcs.retrieveNotDispatchedReqs(context.Background())

// We start the loop first so in case there are more elements in the persistent storage than the capacity,
Expand All @@ -132,11 +140,9 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, capac
for i := uint64(0); i < pcs.size(); i++ {
pcs.putChan <- struct{}{}
}

return pcs
}

func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContiguousStorage) {
func (pcs *persistentContiguousStorage) initPersistentContiguousStorage(ctx context.Context) {
var writeIndex itemIndex
var readIndex itemIndex
batch, err := newBatch(pcs).get(readIndexKey, writeIndexKey).execute(ctx)
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func createTestClient(extension storage.Extension) storage.Client {
}

func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
pcs := newPersistentContiguousStorage("foo", capacity, logger, newFakeTracesRequestUnmarshalerFunc())
pcs.start(context.Background(), client)
return pcs
}

func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage {
Expand Down
115 changes: 65 additions & 50 deletions exporter/exporterhelper/queued_retry_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ type QueueSettings struct {
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue_size"`
// PersistentStorageEnabled describes whether persistence via a file storage extension is enabled
PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *config.ComponentID `mapstructure:"storage"`
// StorageEnabled describes whether persistence via a file storage extension is enabled using the single
// default storage extension.
// Deprecated: this does not allow to specify which extension is going to be used when several ones are available
StorageEnabled bool `mapstructure:"persistent_storage_enabled"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
Expand All @@ -53,8 +58,8 @@ func NewDefaultQueueSettings() QueueSettings {
// This is a pretty decent value for production.
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
// multiply that by the number of requests per seconds.
QueueSize: 5000,
PersistentStorageEnabled: false,
QueueSize: 5000,
StorageEnabled: false,
}
}

Expand All @@ -71,9 +76,52 @@ func (qCfg *QueueSettings) Validate() error {
return nil
}

func (qCfg *QueueSettings) getStorageExtension(logger *zap.Logger, extensions map[config.ComponentID]component.Extension) (storage.Extension, error) {
if qCfg.StorageID != nil {
if ext, found := extensions[*qCfg.StorageID]; found {
if auth, ok := ext.(storage.Extension); ok {
return auth, nil
}
return nil, errWrongExtensionType
}
} else if qCfg.StorageEnabled {
logger.Warn("enabling persistent storage via deprecated `persistent_storage_enabled` setting; please change to `storage` and specify extension ID")
var storageExtension storage.Extension
for _, ext := range extensions {
if se, ok := ext.(storage.Extension); ok {
if storageExtension != nil {
return nil, errMultipleStorageClients
}
storageExtension = se
}
}

if storageExtension == nil {
return nil, errNoStorageClient
}
}

return nil, nil
}

func (qCfg *QueueSettings) toStorageClient(ctx context.Context, logger *zap.Logger, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
extension, err := qCfg.getStorageExtension(logger, host.GetExtensions())
if extension == nil || err != nil {
return nil, err
}

client, err := extension.GetClient(ctx, component.KindExporter, id, string(signal))
if err != nil {
return nil, err
}

return &client, err
}

var (
errNoStorageClient = errors.New("no storage client extension found")
errMultipleStorageClients = errors.New("multiple storage extensions found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
errMultipleStorageClients = errors.New("multiple storage extensions found while default extension expected")
)

type queuedRetrySender struct {
Expand All @@ -83,6 +131,7 @@ type queuedRetrySender struct {
cfg QueueSettings
consumerSender requestSender
queue internal.ProducerConsumerQueue
queueStartFunc internal.PersistentQueueStartFunc
retryStopCh chan struct{}
traceAttributes []attribute.KeyValue
logger *zap.Logger
Expand Down Expand Up @@ -116,54 +165,17 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
onTemporaryFailure: qrs.onTemporaryFailure,
}

if !qCfg.PersistentStorageEnabled {
if qCfg.StorageEnabled || qCfg.StorageID != nil {
qrs.queue, qrs.queueStartFunc = internal.NewPersistentQueue(qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, qrs.requestUnmarshaler)
// TODO: following can be further exposed as a config param rather than relying on a type of queue
qrs.requeuingEnabled = true
} else {
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
}
// The Persistent Queue is initialized separately as it needs extra information about the component

return qrs
}

func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
var storageExtension storage.Extension
for _, ext := range host.GetExtensions() {
if se, ok := ext.(storage.Extension); ok {
if storageExtension != nil {
return nil, errMultipleStorageClients
}
storageExtension = se
}
}

if storageExtension == nil {
return nil, errNoStorageClient
}

client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal))
if err != nil {
return nil, err
}

return &client, err
}

// initializePersistentQueue uses extra information for initialization available from component.Host
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
if qrs.cfg.PersistentStorageEnabled {
storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal)
if err != nil {
return err
}

qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)

// TODO: this can be further exposed as a config param rather than relying on a type of queue
qrs.requeuingEnabled = true
}

return nil
}

func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req request, err error) error {
if !qrs.requeuingEnabled || qrs.queue == nil {
logger.Error(
Expand Down Expand Up @@ -191,9 +203,12 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req request

// start is invoked during service startup.
func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) error {
err := qrs.initializePersistentQueue(ctx, host)
if err != nil {
return err
if qrs.queueStartFunc != nil {
storageClient, err := qrs.cfg.toStorageClient(ctx, qrs.logger, host, qrs.id, qrs.signal)
if err != nil {
return err
}
qrs.queueStartFunc(ctx, *storageClient)
}

qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
Expand Down

0 comments on commit ce5871e

Please sign in to comment.