From 435ffac8e4880d2a2586d066acdd67cf3e750e52 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Wed, 24 Jan 2024 10:40:56 -0800 Subject: [PATCH] Pull context out of dynamodb.Load (#30) --- lib/mtr/mtr.go | 7 ++++--- lib/ttlmap/ttlmap.go | 5 +---- lib/ttlmap/ttlmap_test.go | 11 +++++------ main.go | 2 +- sources/dynamodb/dynamodb.go | 5 ++--- sources/dynamodb/offsets/offsets.go | 8 ++++---- sources/dynamodb/offsets/offsets_test.go | 7 ++++--- 7 files changed, 21 insertions(+), 24 deletions(-) diff --git a/lib/mtr/mtr.go b/lib/mtr/mtr.go index d2ea1adc..b7ed9a8e 100644 --- a/lib/mtr/mtr.go +++ b/lib/mtr/mtr.go @@ -22,13 +22,14 @@ func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string, slog.Info("Overriding telemetry address with env vars", slog.String("address", address)) } - datadogClient, err := statsd.New(address) + datadogClient, err := statsd.New(address, + statsd.WithNamespace(stringutil.Override(DefaultNamespace, namespace)), + statsd.WithTags(tags), + ) if err != nil { logger.Fatal("Failed to create datadog client", slog.Any("err", err)) } - datadogClient.Tags = tags - datadogClient.Namespace = stringutil.Override(DefaultNamespace, namespace) return context.WithValue(ctx, constants.MtrKey, &statsClient{ client: datadogClient, rate: samplingRate, diff --git a/lib/ttlmap/ttlmap.go b/lib/ttlmap/ttlmap.go index f4cd2106..5315bda8 100644 --- a/lib/ttlmap/ttlmap.go +++ b/lib/ttlmap/ttlmap.go @@ -1,7 +1,6 @@ package ttlmap import ( - "context" "fmt" "io" "log/slog" @@ -27,7 +26,6 @@ type ItemWrapper struct { type TTLMap struct { shouldSave bool - ctx context.Context mu sync.RWMutex data map[string]*ItemWrapper `yaml:"data"` filePath string @@ -36,9 +34,8 @@ type TTLMap struct { flushTicker *time.Ticker } -func NewMap(ctx context.Context, filePath string, cleanupInterval, flushInterval time.Duration) *TTLMap { +func NewMap(filePath string, cleanupInterval, flushInterval time.Duration) *TTLMap { t := &TTLMap{ - ctx: ctx, data: make(map[string]*ItemWrapper), filePath: filePath, closeChan: make(chan struct{}), diff --git a/lib/ttlmap/ttlmap_test.go b/lib/ttlmap/ttlmap_test.go index 30230753..d632b500 100644 --- a/lib/ttlmap/ttlmap_test.go +++ b/lib/ttlmap/ttlmap_test.go @@ -1,12 +1,12 @@ package ttlmap import ( - "context" "fmt" - "github.com/stretchr/testify/assert" - "gopkg.in/yaml.v2" "os" "time" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" ) func (t *TTLMapTestSuite) TestTTLMap_Complete() { @@ -14,7 +14,7 @@ func (t *TTLMapTestSuite) TestTTLMap_Complete() { assert.NoError(t.T(), os.RemoveAll(fp)) defer os.RemoveAll(fp) - store := NewMap(t.ctx, fp, 100*time.Millisecond, 120*time.Millisecond) + store := NewMap(fp, 100*time.Millisecond, 120*time.Millisecond) keyToDuration := map[string]time.Duration{ "foo": 50 * time.Millisecond, "bar": 100 * time.Millisecond, @@ -76,8 +76,7 @@ func (t *TTLMapTestSuite) TestFlushing() { assert.NoError(t.T(), os.RemoveAll(fp)) defer os.RemoveAll(fp) - ctx := context.Background() - ttlMap := NewMap(ctx, fp, DefaultCleanUpInterval, DefaultFlushInterval) + ttlMap := NewMap(fp, DefaultCleanUpInterval, DefaultFlushInterval) // Step 2: Add items to the map with varying DoNotFlushToDisk values ttlMap.Set(SetArgs{Key: "key1", Value: "value1", DoNotFlushToDisk: true}, 1*time.Hour) diff --git a/main.go b/main.go index 52888745..40089336 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,6 @@ func main() { ctx = mtr.InjectDatadogIntoCtx(ctx, cfg.Metrics.Namespace, cfg.Metrics.Tags, 0.5) } - ddb := dynamodb.Load(ctx) + ddb := dynamodb.Load(*cfg) ddb.Run(ctx) } diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index 2ab73257..9f4c007f 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -35,8 +35,7 @@ type Store struct { const jitterSleepBaseMs = 50 const shardScannerInterval = 5 * time.Minute -func Load(ctx context.Context) *Store { - cfg := config.FromContext(ctx) +func Load(cfg config.Settings) *Store { sess, err := session.NewSession(&aws.Config{ Region: ptr.ToString(cfg.DynamoDB.AwsRegion), Credentials: credentials.NewStaticCredentials(cfg.DynamoDB.AwsAccessKeyID, cfg.DynamoDB.AwsSecretAccessKey, ""), @@ -59,7 +58,7 @@ func Load(ctx context.Context) *Store { store.s3Client = s3lib.NewClient(sess) } else { // If it's not snapshotting, then we'll need to create offset storage, streams client and a channel. - store.storage = offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile, nil, nil) + store.storage = offsets.NewStorage(cfg.DynamoDB.OffsetFile, nil, nil) store.streams = dynamodbstreams.New(sess) store.shardChan = make(chan *dynamodbstreams.Shard) } diff --git a/sources/dynamodb/offsets/offsets.go b/sources/dynamodb/offsets/offsets.go index 04dd9146..4d7f5c17 100644 --- a/sources/dynamodb/offsets/offsets.go +++ b/sources/dynamodb/offsets/offsets.go @@ -1,10 +1,10 @@ package offsets import ( - "context" "fmt" - "github.com/artie-labs/reader/lib/ttlmap" "time" + + "github.com/artie-labs/reader/lib/ttlmap" ) const ShardExpirationAndBuffer = 26 * time.Hour @@ -70,7 +70,7 @@ func (o *OffsetStorage) LastProcessedSequenceNumber(shardID string) (string, boo return fmt.Sprint(sequenceNumber), true } -func NewStorage(ctx context.Context, fp string, cleanUpIntervalOverride, flushIntervalOverride *time.Duration) *OffsetStorage { +func NewStorage(fp string, cleanUpIntervalOverride, flushIntervalOverride *time.Duration) *OffsetStorage { cleanUpInterval := ttlmap.DefaultCleanUpInterval if cleanUpIntervalOverride != nil { cleanUpInterval = *cleanUpIntervalOverride @@ -82,7 +82,7 @@ func NewStorage(ctx context.Context, fp string, cleanUpIntervalOverride, flushIn } offset := &OffsetStorage{ - ttlMap: ttlmap.NewMap(ctx, fp, cleanUpInterval, flushInterval), + ttlMap: ttlmap.NewMap(fp, cleanUpInterval, flushInterval), } return offset } diff --git a/sources/dynamodb/offsets/offsets_test.go b/sources/dynamodb/offsets/offsets_test.go index d915c534..30ef0897 100644 --- a/sources/dynamodb/offsets/offsets_test.go +++ b/sources/dynamodb/offsets/offsets_test.go @@ -2,9 +2,10 @@ package offsets import ( "fmt" - "github.com/stretchr/testify/assert" "os" "time" + + "github.com/stretchr/testify/assert" ) func ptrDuration(d time.Duration) *time.Duration { @@ -17,7 +18,7 @@ func (o *OffsetsTestSuite) TestOffsets_Complete() { defer assert.NoError(o.T(), os.RemoveAll(offsetsFilePath)) // Delete the file we do create during the test. - storage := NewStorage(o.ctx, offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond)) + storage := NewStorage(offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond)) processedShards := []string{"foo", "bar", "xyz"} // It should all return `False` because the file doesn't exist and we didn't load anything yet. @@ -41,7 +42,7 @@ func (o *OffsetsTestSuite) TestOffsets_Complete() { // Sleep, wait for the file to be committed to disk and then reload the storage. time.Sleep(75 * time.Millisecond) // Wait for the file to be written. - storage = NewStorage(o.ctx, offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond)) + storage = NewStorage(offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond)) for _, processedShard := range processedShards { assert.True(o.T(), storage.GetShardProcessed(processedShard), fmt.Sprintf("shard: %s, value: %v", processedShard, storage.GetShardProcessed(processedShard)))