From 57c736774b33d0321a510fcc3cb5370e6384afbc Mon Sep 17 00:00:00 2001 From: Adam Charrett <73886859+adcharre@users.noreply.github.com> Date: Fri, 4 Oct 2024 14:26:08 +0100 Subject: [PATCH] [receiver/awss3receiver]: Add ingest progress notifications via OpAMP (#33980) --- .chloggen/awss3receiver_notifications.yaml | 27 ++ receiver/awss3receiver/README.md | 25 +- receiver/awss3receiver/config.go | 13 +- receiver/awss3receiver/config_test.go | 5 +- receiver/awss3receiver/go.mod | 4 + receiver/awss3receiver/go.sum | 2 + receiver/awss3receiver/notifications.go | 121 ++++++++ receiver/awss3receiver/notifications_test.go | 288 +++++++++++++++++++ receiver/awss3receiver/receiver.go | 26 +- receiver/awss3receiver/s3reader.go | 42 ++- receiver/awss3receiver/s3reader_test.go | 103 +++++++ receiver/awss3receiver/testdata/config.yaml | 3 + 12 files changed, 645 insertions(+), 14 deletions(-) create mode 100644 .chloggen/awss3receiver_notifications.yaml create mode 100644 receiver/awss3receiver/notifications.go create mode 100644 receiver/awss3receiver/notifications_test.go diff --git a/.chloggen/awss3receiver_notifications.yaml b/.chloggen/awss3receiver_notifications.yaml new file mode 100644 index 000000000000..bab4d623b6b3 --- /dev/null +++ b/.chloggen/awss3receiver_notifications.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: 'Add support for monitoring the progress of ingesting data from an S3 bucket via OpAMP custom messages.' + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30750] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/awss3receiver/README.md b/receiver/awss3receiver/README.md index a7ec61a84b47..73fcb74048b8 100644 --- a/receiver/awss3receiver/README.md +++ b/receiver/awss3receiver/README.md @@ -32,6 +32,8 @@ The following exporter configuration parameters are supported. | `encodings:` | An array of entries with the following properties: | | Optional | | `extension` | Extension to use for decoding a key with a matching suffix. | | Required | | `suffix` | Key suffix to match against. | | Required | +| `notifications:` | | | | +| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | | ### Time format for `starttime` and `endtime` The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data. @@ -67,4 +69,25 @@ receivers: encodings: - extension: text_encoding suffix: ".txt" -``` \ No newline at end of file +``` + +## Notifications +The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of +"org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus". +The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the +record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the +following attributes: + +| Attribute | Description | +|:------------------|:--------------------------------------------------------------------------------| +| `telemetry_type` | The type of telemetry being ingested. One of "traces", "metrics", or "logs". | +| `ingest_status` | The status of the data ingestion. One of "ingesting", "failed", or "completed". | +| `start_time` | The time to start retrieving data as an Int64, nanoseconds since Unix epoch. | +| `end_time` | The time to stop retrieving data as an Int64, nanoseconds since Unix epoch. | +| `failure_message` | Error message if `ingest_status` is "failed". | + +The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time. +If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with +the time of the data being ingested when the failure occurred. +If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent. + diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index fc8cdce0d7bb..f50e05097bb3 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -26,6 +26,10 @@ type S3DownloaderConfig struct { S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` } +type Notifications struct { + OpAMP *component.ID `mapstructure:"opampextension"` +} + type Encoding struct { Extension component.ID `mapstructure:"extension"` Suffix string `mapstructure:"suffix"` @@ -33,10 +37,11 @@ type Encoding struct { // Config defines the configuration for the file receiver. type Config struct { - S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"` - StartTime string `mapstructure:"starttime"` - EndTime string `mapstructure:"endtime"` - Encodings []Encoding `mapstructure:"encodings"` + S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"` + StartTime string `mapstructure:"starttime"` + EndTime string `mapstructure:"endtime"` + Encodings []Encoding `mapstructure:"encodings"` + Notifications Notifications `mapstructure:"notifications"` } const ( diff --git a/receiver/awss3receiver/config_test.go b/receiver/awss3receiver/config_test.go index acf0a9d1fab1..0516760fdbb3 100644 --- a/receiver/awss3receiver/config_test.go +++ b/receiver/awss3receiver/config_test.go @@ -41,7 +41,7 @@ func TestConfig_Validate_Valid(t *testing.T) { func TestLoadConfig(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) require.NoError(t, err) - + opampExtension := component.NewIDWithName(component.MustNewType("opamp"), "bar") tests := []struct { id component.ID expected component.Config @@ -89,6 +89,9 @@ func TestLoadConfig(t *testing.T) { Suffix: "nop", }, }, + Notifications: Notifications{ + OpAMP: &opampExtension, + }, }, }, } diff --git a/receiver/awss3receiver/go.mod b/receiver/awss3receiver/go.mod index 99952b2d24cc..19eea798077b 100644 --- a/receiver/awss3receiver/go.mod +++ b/receiver/awss3receiver/go.mod @@ -7,6 +7,8 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.39 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25 github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3 + github.com/open-telemetry/opamp-go v0.15.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.110.1-0.20241004063257-d6cd5935eefc github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.0 go.opentelemetry.io/collector/confmap v1.17.0 @@ -71,3 +73,5 @@ require ( google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages diff --git a/receiver/awss3receiver/go.sum b/receiver/awss3receiver/go.sum index b980798da171..b467efc0a61e 100644 --- a/receiver/awss3receiver/go.sum +++ b/receiver/awss3receiver/go.sum @@ -77,6 +77,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opamp-go v0.15.0 h1:X2TWhEsGQ8GP7Uos3Ic9v/1aFUqoECZXKS7xAF5HqsA= +github.com/open-telemetry/opamp-go v0.15.0/go.mod h1:QyPeN56JXlcZt5yG5RMdZ50Ju+zMFs1Ihy/hwHyF8Oo= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/receiver/awss3receiver/notifications.go b/receiver/awss3receiver/notifications.go new file mode 100644 index 000000000000..d30d0572baab --- /dev/null +++ b/receiver/awss3receiver/notifications.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/open-telemetry/opamp-go/client/types" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" +) + +const ( + IngestStatusCompleted = "completed" + IngestStatusFailed = "failed" + IngestStatusIngesting = "ingesting" + CustomCapability = "org.opentelemetry.collector.receiver.awss3" + maxNotificationAttempts = 3 +) + +type statusNotification struct { + TelemetryType string + IngestStatus string + StartTime time.Time + EndTime time.Time + IngestTime time.Time + FailureMessage string +} + +type statusNotifier interface { + Start(ctx context.Context, host component.Host) error + Shutdown(ctx context.Context) error + SendStatus(ctx context.Context, message statusNotification) +} + +type opampNotifier struct { + logger *zap.Logger + opampExtensionID component.ID + handler opampcustommessages.CustomCapabilityHandler +} + +func newNotifier(config *Config, logger *zap.Logger) statusNotifier { + if config.Notifications.OpAMP != nil { + return &opampNotifier{opampExtensionID: *config.Notifications.OpAMP, logger: logger} + } + return nil +} + +func (n *opampNotifier) Start(_ context.Context, host component.Host) error { + ext, ok := host.GetExtensions()[n.opampExtensionID] + if !ok { + return fmt.Errorf("extension %q does not exist", n.opampExtensionID) + } + + registry, ok := ext.(opampcustommessages.CustomCapabilityRegistry) + if !ok { + return fmt.Errorf("extension %q is not a custom message registry", n.opampExtensionID) + } + + handler, err := registry.Register(CustomCapability) + if err != nil { + return fmt.Errorf("failed to register custom capability: %w", err) + } + if handler == nil { + return errors.New("custom capability handler is nil") + } + n.handler = handler + return nil +} + +func (n *opampNotifier) Shutdown(_ context.Context) error { + if n.handler != nil { + n.handler.Unregister() + } + return nil +} + +func (n *opampNotifier) SendStatus(_ context.Context, message statusNotification) { + logs := plog.NewLogs() + log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + log.Body().SetStr("status") + attributes := log.Attributes() + attributes.PutStr("telemetry_type", message.TelemetryType) + attributes.PutStr("ingest_status", message.IngestStatus) + attributes.PutInt("start_time", int64(pcommon.NewTimestampFromTime(message.StartTime))) + attributes.PutInt("end_time", int64(pcommon.NewTimestampFromTime(message.EndTime))) + log.SetTimestamp(pcommon.NewTimestampFromTime(message.IngestTime)) + + if message.FailureMessage != "" { + attributes.PutStr("failure_message", message.FailureMessage) + } + + marshaler := plog.ProtoMarshaler{} + bytes, err := marshaler.MarshalLogs(logs) + if err != nil { + return + } + for attempt := 0; attempt < maxNotificationAttempts; attempt++ { + sendingChan, sendingErr := n.handler.SendMessage("TimeBasedIngestStatus", bytes) + switch { + case sendingErr == nil: + return + case errors.Is(sendingErr, types.ErrCustomMessagePending): + <-sendingChan + default: + // The only other errors returned by the OpAmp extension are unrecoverable, ie ErrCustomCapabilityNotSupported + // so just log an error and return. + n.logger.Error("Failed to send notification", zap.Error(sendingErr), zap.Int("attempt", attempt)) + return + } + } + n.logger.Error("Failed to send notification after multiple attempts", zap.Int("max_attempts", maxNotificationAttempts)) +} diff --git a/receiver/awss3receiver/notifications_test.go b/receiver/awss3receiver/notifications_test.go new file mode 100644 index 000000000000..36e0acba561c --- /dev/null +++ b/receiver/awss3receiver/notifications_test.go @@ -0,0 +1,288 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver" + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/open-telemetry/opamp-go/client/types" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" +) + +type mockCustomCapabilityRegistry struct { + component.Component + + shouldFailRegister bool + shouldRegisterReturnNilHandler bool + shouldReturnPending func() bool + shouldFailSend bool + + sendMessageCalls int + + pendingChannel chan struct{} + unregisterCalled bool + sentMessages []customMessage +} + +type customMessage struct { + messageType string + message []byte +} + +type hostWithCustomCapabilityRegistry struct { + extension *mockCustomCapabilityRegistry +} + +func (h hostWithCustomCapabilityRegistry) Start(context.Context, component.Host) error { + panic("unsupported") +} + +func (h hostWithCustomCapabilityRegistry) Shutdown(context.Context) error { + panic("unsupported") +} + +func (h hostWithCustomCapabilityRegistry) GetFactory(_ component.Kind, _ component.Type) component.Factory { + panic("unsupported") +} + +func (h hostWithCustomCapabilityRegistry) GetExtensions() map[component.ID]component.Component { + return map[component.ID]component.Component{ + component.MustNewID("foo"): h.extension, + } +} + +func (m *mockCustomCapabilityRegistry) Register(_ string, _ ...opampcustommessages.CustomCapabilityRegisterOption) (handler opampcustommessages.CustomCapabilityHandler, err error) { + if m.shouldFailRegister { + return nil, fmt.Errorf("register failed") + } + if m.shouldRegisterReturnNilHandler { + return nil, nil + } + return m, nil +} + +func (m *mockCustomCapabilityRegistry) Message() <-chan *protobufs.CustomMessage { + panic("unsupported") +} + +func (m *mockCustomCapabilityRegistry) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { + m.sendMessageCalls++ + if m.unregisterCalled { + return nil, fmt.Errorf("unregister called") + } + if m.shouldReturnPending != nil && m.shouldReturnPending() { + return m.pendingChannel, types.ErrCustomMessagePending + } + if m.shouldFailSend { + return nil, fmt.Errorf("send failed") + } + m.sentMessages = append(m.sentMessages, customMessage{messageType: messageType, message: message}) + return nil, nil +} + +func (m *mockCustomCapabilityRegistry) Unregister() { + m.unregisterCalled = true +} + +func Test_opampNotifier_Start(t *testing.T) { + id := component.MustNewID("foo") + + tests := []struct { + name string + host component.Host + wantErr bool + }{ + { + name: "success", + host: hostWithCustomCapabilityRegistry{ + extension: &mockCustomCapabilityRegistry{}, + }, + wantErr: false, + }, + { + name: "extension not found", + host: componenttest.NewNopHost(), + wantErr: true, + }, + { + name: "register failed", + host: hostWithCustomCapabilityRegistry{ + extension: &mockCustomCapabilityRegistry{ + shouldFailRegister: true, + }, + }, + wantErr: true, + }, + { + name: "register returns nil handler", + host: hostWithCustomCapabilityRegistry{ + extension: &mockCustomCapabilityRegistry{ + shouldRegisterReturnNilHandler: true, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + notifier := &opampNotifier{opampExtensionID: id} + err := notifier.Start(context.Background(), tt.host) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func Test_opampNotifier_Shutdown(t *testing.T) { + registry := mockCustomCapabilityRegistry{} + notifier := &opampNotifier{handler: ®istry, logger: zap.NewNop()} + err := notifier.Shutdown(context.Background()) + require.NoError(t, err) + require.True(t, registry.unregisterCalled) +} + +func Test_opampNotifier_SendStatus(t *testing.T) { + registry := mockCustomCapabilityRegistry{} + notifier := &opampNotifier{handler: ®istry, logger: zap.NewNop()} + ingestTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + toSend := statusNotification{ + TelemetryType: "telemetry", + IngestStatus: IngestStatusIngesting, + IngestTime: ingestTime, + StartTime: ingestTime, + EndTime: ingestTime, + } + notifier.SendStatus(context.Background(), toSend) + require.Len(t, registry.sentMessages, 1) + require.Equal(t, "TimeBasedIngestStatus", registry.sentMessages[0].messageType) + + unmarshaler := plog.ProtoUnmarshaler{} + logs, err := unmarshaler.UnmarshalLogs(registry.sentMessages[0].message) + require.NoError(t, err) + require.Equal(t, 1, logs.ResourceLogs().Len()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().Len()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + log := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + require.Equal(t, "status", log.Body().Str()) + attr := log.Attributes() + v, b := attr.Get("telemetry_type") + require.True(t, b) + require.Equal(t, "telemetry", v.Str()) + + v, b = attr.Get("ingest_status") + require.True(t, b) + require.Equal(t, IngestStatusIngesting, v.Str()) + + require.Equal(t, pcommon.NewTimestampFromTime(ingestTime), log.Timestamp()) + + expectedTimestamp := int64(pcommon.NewTimestampFromTime(ingestTime)) + v, b = attr.Get("start_time") + require.True(t, b) + require.Equal(t, expectedTimestamp, v.Int()) + + v, b = attr.Get("end_time") + require.True(t, b) + require.Equal(t, expectedTimestamp, v.Int()) + + _, b = attr.Get("failure_message") + require.False(t, b) +} + +func Test_opampNotifier_SendStatus_MessagePending(t *testing.T) { + tryCount := 0 + registry := mockCustomCapabilityRegistry{ + shouldReturnPending: func() bool { + pending := tryCount < 1 + tryCount++ + return pending + }, + pendingChannel: make(chan struct{}), + } + notifier := &opampNotifier{handler: ®istry, logger: zap.NewNop()} + toSend := statusNotification{ + TelemetryType: "telemetry", + IngestStatus: IngestStatusIngesting, + IngestTime: time.Time{}, + } + + var completionTime time.Time + + now := time.Now() + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + notifier.SendStatus(context.Background(), toSend) + completionTime = time.Now() + wg.Done() + }() + require.Empty(t, registry.sentMessages) + registry.pendingChannel <- struct{}{} + wg.Wait() + require.True(t, completionTime.After(now)) + require.Len(t, registry.sentMessages, 1) + require.Equal(t, "TimeBasedIngestStatus", registry.sentMessages[0].messageType) +} + +func Test_opampNotifier_SendStatus_Error(t *testing.T) { + registry := mockCustomCapabilityRegistry{ + shouldFailSend: true, + } + notifier := &opampNotifier{handler: ®istry, logger: zap.NewNop()} + toSend := statusNotification{ + TelemetryType: "telemetry", + IngestStatus: IngestStatusIngesting, + IngestTime: time.Time{}, + } + + notifier.SendStatus(context.Background(), toSend) + require.Empty(t, registry.sentMessages) + require.Equal(t, 1, registry.sendMessageCalls) +} + +func Test_opampNotifier_SendStatus_MaxRetries(t *testing.T) { + registry := mockCustomCapabilityRegistry{ + shouldReturnPending: func() bool { return true }, + pendingChannel: make(chan struct{}), + } + notifier := &opampNotifier{handler: ®istry, logger: zap.NewNop()} + toSend := statusNotification{ + TelemetryType: "telemetry", + IngestStatus: IngestStatusIngesting, + IngestTime: time.Time{}, + } + var completionTime time.Time + now := time.Now() + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + notifier.SendStatus(context.Background(), toSend) + completionTime = time.Now() + wg.Done() + }() + + for attempt := 0; attempt < maxNotificationAttempts; attempt++ { + registry.pendingChannel <- struct{}{} + } + wg.Wait() + + require.True(t, completionTime.After(now)) + require.Empty(t, registry.sentMessages) + require.Equal(t, maxNotificationAttempts, registry.sendMessageCalls) +} diff --git a/receiver/awss3receiver/receiver.go b/receiver/awss3receiver/receiver.go index cd44a8b55a94..229dee0a88a8 100644 --- a/receiver/awss3receiver/receiver.go +++ b/receiver/awss3receiver/receiver.go @@ -41,10 +41,12 @@ type awss3Receiver struct { telemetryType string dataProcessor receiverProcessor extensions encodingExtensions + notifier statusNotifier } func newAWSS3Receiver(ctx context.Context, cfg *Config, telemetryType string, settings receiver.Settings, processor receiverProcessor) (*awss3Receiver, error) { - reader, err := newS3Reader(ctx, settings.Logger, cfg) + notifier := newNotifier(cfg, settings.Logger) + reader, err := newS3Reader(ctx, notifier, settings.Logger, cfg) if err != nil { return nil, err } @@ -65,25 +67,35 @@ func newAWSS3Receiver(ctx context.Context, cfg *Config, telemetryType string, se obsrecv: obsrecv, dataProcessor: processor, encodingsConfig: cfg.Encodings, + notifier: notifier, }, nil } -func (r *awss3Receiver) Start(_ context.Context, host component.Host) error { +func (r *awss3Receiver) Start(ctx context.Context, host component.Host) error { var err error + if r.notifier != nil { + if err = r.notifier.Start(ctx, host); err != nil { + return err + } + } r.extensions, err = newEncodingExtensions(r.encodingsConfig, host) if err != nil { return err } - var ctx context.Context - ctx, r.cancel = context.WithCancel(context.Background()) + var cancelCtx context.Context + cancelCtx, r.cancel = context.WithCancel(context.Background()) go func() { - _ = r.s3Reader.readAll(ctx, r.telemetryType, r.receiveBytes) + _ = r.s3Reader.readAll(cancelCtx, r.telemetryType, r.receiveBytes) }() return nil } - -func (r *awss3Receiver) Shutdown(_ context.Context) error { +func (r *awss3Receiver) Shutdown(ctx context.Context) error { + if r.notifier != nil { + if err := r.notifier.Shutdown(ctx); err != nil { + return err + } + } if r.cancel != nil { r.cancel() } diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index 2c56e133518a..ddf44c5b9fc1 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -25,11 +25,12 @@ type s3Reader struct { filePrefix string startTime time.Time endTime time.Time + notifier statusNotifier } type s3ReaderDataCallback func(context.Context, string, []byte) error -func newS3Reader(ctx context.Context, logger *zap.Logger, cfg *Config) (*s3Reader, error) { +func newS3Reader(ctx context.Context, notifier statusNotifier, logger *zap.Logger, cfg *Config) (*s3Reader, error) { listObjectsClient, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader) if err != nil { return nil, err @@ -56,9 +57,11 @@ func newS3Reader(ctx context.Context, logger *zap.Logger, cfg *Config) (*s3Reade s3Partition: cfg.S3Downloader.S3Partition, startTime: startTime, endTime: endTime, + notifier: notifier, }, nil } +//nolint:golint,unparam func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dataCallback s3ReaderDataCallback) error { var timeStep time.Duration if s3Reader.s3Partition == "hour" { @@ -68,18 +71,49 @@ func (s3Reader *s3Reader) readAll(ctx context.Context, telemetryType string, dat } s3Reader.logger.Info("Start reading telemetry", zap.Time("start_time", s3Reader.startTime), zap.Time("end_time", s3Reader.endTime)) for currentTime := s3Reader.startTime; currentTime.Before(s3Reader.endTime); currentTime = currentTime.Add(timeStep) { + s3Reader.sendStatus(ctx, statusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusIngesting, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: currentTime, + }) + select { case <-ctx.Done(): + s3Reader.sendStatus(ctx, statusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusFailed, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: currentTime, + FailureMessage: ctx.Err().Error(), + }) s3Reader.logger.Error("Context cancelled, stopping reading telemetry", zap.Time("time", currentTime)) return ctx.Err() default: s3Reader.logger.Info("Reading telemetry", zap.Time("time", currentTime)) if err := s3Reader.readTelemetryForTime(ctx, currentTime, telemetryType, dataCallback); err != nil { + s3Reader.sendStatus(ctx, statusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusFailed, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: currentTime, + FailureMessage: err.Error(), + }) s3Reader.logger.Error("Error reading telemetry", zap.Error(err), zap.Time("time", currentTime)) return err } } } + s3Reader.sendStatus(ctx, statusNotification{ + TelemetryType: telemetryType, + IngestStatus: IngestStatusCompleted, + StartTime: s3Reader.startTime, + EndTime: s3Reader.endTime, + IngestTime: s3Reader.endTime, + }) s3Reader.logger.Info("Finished reading telemetry", zap.Time("start_time", s3Reader.startTime), zap.Time("end_time", s3Reader.endTime)) return nil } @@ -149,6 +183,12 @@ func (s3Reader *s3Reader) retrieveObject(ctx context.Context, key string) ([]byt return contents, nil } +func (s3Reader *s3Reader) sendStatus(ctx context.Context, status statusNotification) { + if s3Reader.notifier != nil { + s3Reader.notifier.SendStatus(ctx, status) + } +} + func getTimeKeyPartitionHour(t time.Time) string { year, month, day := t.Date() hour := t.Hour() diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index c9e6190ef2f8..7dfe9ba4c92f 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -15,6 +15,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" ) @@ -316,6 +317,20 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) { require.Error(t, err) } +type mockNotifier struct { + messages []statusNotification +} + +func (m *mockNotifier) Start(_ context.Context, _ component.Host) error { + return nil +} +func (m *mockNotifier) Shutdown(_ context.Context) error { + return nil +} +func (m *mockNotifier) SendStatus(_ context.Context, notification statusNotification) { + m.messages = append(m.messages, notification) +} + func Test_readAll(t *testing.T) { reader := s3Reader{ listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { @@ -363,7 +378,78 @@ func Test_readAll(t *testing.T) { require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=33/traces_1") } +func Test_readAll_StatusMessages(t *testing.T) { + notifier := mockNotifier{} + reader := s3Reader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + key := fmt.Sprintf("%s%s", *params.Prefix, "1") + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + { + Key: &key, + }, + }, + }, + }, + } + }), + getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }), + logger: zap.NewNop(), + s3Bucket: "bucket", + s3Prefix: "", + s3Partition: "minute", + filePrefix: "", + startTime: testTime, + endTime: testTime.Add(time.Minute * 2), + notifier: ¬ifier, + } + + dataCallbackKeys := make([]string, 0) + + err := reader.readAll(context.Background(), "traces", func(_ context.Context, key string, data []byte) error { + t.Helper() + require.Equal(t, "this is the body of the object", string(data)) + dataCallbackKeys = append(dataCallbackKeys, key) + return nil + }) + require.NoError(t, err) + require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=32/traces_1") + require.Contains(t, dataCallbackKeys, "year=2021/month=02/day=01/hour=17/minute=33/traces_1") + require.Equal(t, []statusNotification{ + { + TelemetryType: "traces", + IngestStatus: IngestStatusIngesting, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime, + }, { + TelemetryType: "traces", + IngestStatus: IngestStatusIngesting, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime.Add(time.Minute), + }, { + TelemetryType: "traces", + IngestStatus: IngestStatusCompleted, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime.Add(time.Minute * 2), + }, + }, notifier.messages) +} + func Test_readAll_ContextDone(t *testing.T) { + notifier := mockNotifier{} reader := s3Reader{ listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { t.Helper() @@ -395,6 +481,7 @@ func Test_readAll_ContextDone(t *testing.T) { filePrefix: "", startTime: testTime, endTime: testTime.Add(time.Minute * 2), + notifier: ¬ifier, } dataCallbackKeys := make([]string, 0) @@ -407,4 +494,20 @@ func Test_readAll_ContextDone(t *testing.T) { }) require.Error(t, err) require.Empty(t, dataCallbackKeys) + require.Equal(t, []statusNotification{ + { + TelemetryType: "traces", + IngestStatus: IngestStatusIngesting, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime, + }, { + TelemetryType: "traces", + IngestStatus: IngestStatusFailed, + StartTime: testTime, + EndTime: testTime.Add(time.Minute * 2), + IngestTime: testTime, + FailureMessage: "context canceled", + }, + }, notifier.messages) } diff --git a/receiver/awss3receiver/testdata/config.yaml b/receiver/awss3receiver/testdata/config.yaml index c386dc58f54d..b1e4823c6105 100644 --- a/receiver/awss3receiver/testdata/config.yaml +++ b/receiver/awss3receiver/testdata/config.yaml @@ -20,3 +20,6 @@ awss3/3: suffix: "baz" - extension: "nop/nop" suffix: "nop" + notifications: + opampextension: "opamp/bar" +