Skip to content

Commit

Permalink
[receiver/awss3receiver]: Add ingest progress notifications via OpAMP (
Browse files Browse the repository at this point in the history
  • Loading branch information
adcharre authored and AkhigbeEromo committed Oct 9, 2024
1 parent 0261bad commit 57c7367
Show file tree
Hide file tree
Showing 12 changed files with 645 additions and 14 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_notifications.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Add support for monitoring the progress of ingesting data from an S3 bucket via OpAMP custom messages.'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
25 changes: 24 additions & 1 deletion receiver/awss3receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ The following exporter configuration parameters are supported.
| `encodings:` | An array of entries with the following properties: | | Optional |
| `extension` | Extension to use for decoding a key with a matching suffix. | | Required |
| `suffix` | Key suffix to match against. | | Required |
| `notifications:` | | | |
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |

### Time format for `starttime` and `endtime`
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
Expand Down Expand Up @@ -67,4 +69,25 @@ receivers:
encodings:
- extension: text_encoding
suffix: ".txt"
```
```
## Notifications
The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of
"org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus".
The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the
record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the
following attributes:

| Attribute | Description |
|:------------------|:--------------------------------------------------------------------------------|
| `telemetry_type` | The type of telemetry being ingested. One of "traces", "metrics", or "logs". |
| `ingest_status` | The status of the data ingestion. One of "ingesting", "failed", or "completed". |
| `start_time` | The time to start retrieving data as an Int64, nanoseconds since Unix epoch. |
| `end_time` | The time to stop retrieving data as an Int64, nanoseconds since Unix epoch. |
| `failure_message` | Error message if `ingest_status` is "failed". |

The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time.
If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with
the time of the data being ingested when the failure occurred.
If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent.

13 changes: 9 additions & 4 deletions receiver/awss3receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ type S3DownloaderConfig struct {
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
}

type Notifications struct {
OpAMP *component.ID `mapstructure:"opampextension"`
}

type Encoding struct {
Extension component.ID `mapstructure:"extension"`
Suffix string `mapstructure:"suffix"`
}

// Config defines the configuration for the file receiver.
type Config struct {
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
StartTime string `mapstructure:"starttime"`
EndTime string `mapstructure:"endtime"`
Encodings []Encoding `mapstructure:"encodings"`
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
StartTime string `mapstructure:"starttime"`
EndTime string `mapstructure:"endtime"`
Encodings []Encoding `mapstructure:"encodings"`
Notifications Notifications `mapstructure:"notifications"`
}

const (
Expand Down
5 changes: 4 additions & 1 deletion receiver/awss3receiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

opampExtension := component.NewIDWithName(component.MustNewType("opamp"), "bar")
tests := []struct {
id component.ID
expected component.Config
Expand Down Expand Up @@ -89,6 +89,9 @@ func TestLoadConfig(t *testing.T) {
Suffix: "nop",
},
},
Notifications: Notifications{
OpAMP: &opampExtension,
},
},
},
}
Expand Down
4 changes: 4 additions & 0 deletions receiver/awss3receiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.39
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25
github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3
github.com/open-telemetry/opamp-go v0.15.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.110.1-0.20241004063257-d6cd5935eefc
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.0
go.opentelemetry.io/collector/confmap v1.17.0
Expand Down Expand Up @@ -71,3 +73,5 @@ require (
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages
2 changes: 2 additions & 0 deletions receiver/awss3receiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 121 additions & 0 deletions receiver/awss3receiver/notifications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver"

import (
"context"
"errors"
"fmt"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages"
)

const (
IngestStatusCompleted = "completed"
IngestStatusFailed = "failed"
IngestStatusIngesting = "ingesting"
CustomCapability = "org.opentelemetry.collector.receiver.awss3"
maxNotificationAttempts = 3
)

type statusNotification struct {
TelemetryType string
IngestStatus string
StartTime time.Time
EndTime time.Time
IngestTime time.Time
FailureMessage string
}

type statusNotifier interface {
Start(ctx context.Context, host component.Host) error
Shutdown(ctx context.Context) error
SendStatus(ctx context.Context, message statusNotification)
}

type opampNotifier struct {
logger *zap.Logger
opampExtensionID component.ID
handler opampcustommessages.CustomCapabilityHandler
}

func newNotifier(config *Config, logger *zap.Logger) statusNotifier {
if config.Notifications.OpAMP != nil {
return &opampNotifier{opampExtensionID: *config.Notifications.OpAMP, logger: logger}
}
return nil
}

func (n *opampNotifier) Start(_ context.Context, host component.Host) error {
ext, ok := host.GetExtensions()[n.opampExtensionID]
if !ok {
return fmt.Errorf("extension %q does not exist", n.opampExtensionID)
}

registry, ok := ext.(opampcustommessages.CustomCapabilityRegistry)
if !ok {
return fmt.Errorf("extension %q is not a custom message registry", n.opampExtensionID)
}

handler, err := registry.Register(CustomCapability)
if err != nil {
return fmt.Errorf("failed to register custom capability: %w", err)
}
if handler == nil {
return errors.New("custom capability handler is nil")
}
n.handler = handler
return nil
}

func (n *opampNotifier) Shutdown(_ context.Context) error {
if n.handler != nil {
n.handler.Unregister()
}
return nil
}

func (n *opampNotifier) SendStatus(_ context.Context, message statusNotification) {
logs := plog.NewLogs()
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log.Body().SetStr("status")
attributes := log.Attributes()
attributes.PutStr("telemetry_type", message.TelemetryType)
attributes.PutStr("ingest_status", message.IngestStatus)
attributes.PutInt("start_time", int64(pcommon.NewTimestampFromTime(message.StartTime)))
attributes.PutInt("end_time", int64(pcommon.NewTimestampFromTime(message.EndTime)))
log.SetTimestamp(pcommon.NewTimestampFromTime(message.IngestTime))

if message.FailureMessage != "" {
attributes.PutStr("failure_message", message.FailureMessage)
}

marshaler := plog.ProtoMarshaler{}
bytes, err := marshaler.MarshalLogs(logs)
if err != nil {
return
}
for attempt := 0; attempt < maxNotificationAttempts; attempt++ {
sendingChan, sendingErr := n.handler.SendMessage("TimeBasedIngestStatus", bytes)
switch {
case sendingErr == nil:
return
case errors.Is(sendingErr, types.ErrCustomMessagePending):
<-sendingChan
default:
// The only other errors returned by the OpAmp extension are unrecoverable, ie ErrCustomCapabilityNotSupported
// so just log an error and return.
n.logger.Error("Failed to send notification", zap.Error(sendingErr), zap.Int("attempt", attempt))
return
}
}
n.logger.Error("Failed to send notification after multiple attempts", zap.Int("max_attempts", maxNotificationAttempts))
}
Loading

0 comments on commit 57c7367

Please sign in to comment.