Skip to content

Commit

Permalink
Pull context out of dynamodb.Load (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Jan 24, 2024
1 parent 6d931a4 commit 435ffac
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 24 deletions.
7 changes: 4 additions & 3 deletions lib/mtr/mtr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ func InjectDatadogIntoCtx(ctx context.Context, namespace string, tags []string,
slog.Info("Overriding telemetry address with env vars", slog.String("address", address))
}

datadogClient, err := statsd.New(address)
datadogClient, err := statsd.New(address,
statsd.WithNamespace(stringutil.Override(DefaultNamespace, namespace)),
statsd.WithTags(tags),
)
if err != nil {
logger.Fatal("Failed to create datadog client", slog.Any("err", err))
}

datadogClient.Tags = tags
datadogClient.Namespace = stringutil.Override(DefaultNamespace, namespace)
return context.WithValue(ctx, constants.MtrKey, &statsClient{
client: datadogClient,
rate: samplingRate,
Expand Down
5 changes: 1 addition & 4 deletions lib/ttlmap/ttlmap.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ttlmap

import (
"context"
"fmt"
"io"
"log/slog"
Expand All @@ -27,7 +26,6 @@ type ItemWrapper struct {

type TTLMap struct {
shouldSave bool
ctx context.Context
mu sync.RWMutex
data map[string]*ItemWrapper `yaml:"data"`
filePath string
Expand All @@ -36,9 +34,8 @@ type TTLMap struct {
flushTicker *time.Ticker
}

func NewMap(ctx context.Context, filePath string, cleanupInterval, flushInterval time.Duration) *TTLMap {
func NewMap(filePath string, cleanupInterval, flushInterval time.Duration) *TTLMap {
t := &TTLMap{
ctx: ctx,
data: make(map[string]*ItemWrapper),
filePath: filePath,
closeChan: make(chan struct{}),
Expand Down
11 changes: 5 additions & 6 deletions lib/ttlmap/ttlmap_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package ttlmap

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
"os"
"time"

"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)

func (t *TTLMapTestSuite) TestTTLMap_Complete() {
fp := "/tmp/test.yaml"
assert.NoError(t.T(), os.RemoveAll(fp))
defer os.RemoveAll(fp)

store := NewMap(t.ctx, fp, 100*time.Millisecond, 120*time.Millisecond)
store := NewMap(fp, 100*time.Millisecond, 120*time.Millisecond)
keyToDuration := map[string]time.Duration{
"foo": 50 * time.Millisecond,
"bar": 100 * time.Millisecond,
Expand Down Expand Up @@ -76,8 +76,7 @@ func (t *TTLMapTestSuite) TestFlushing() {
assert.NoError(t.T(), os.RemoveAll(fp))
defer os.RemoveAll(fp)

ctx := context.Background()
ttlMap := NewMap(ctx, fp, DefaultCleanUpInterval, DefaultFlushInterval)
ttlMap := NewMap(fp, DefaultCleanUpInterval, DefaultFlushInterval)

// Step 2: Add items to the map with varying DoNotFlushToDisk values
ttlMap.Set(SetArgs{Key: "key1", Value: "value1", DoNotFlushToDisk: true}, 1*time.Hour)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ func main() {
ctx = mtr.InjectDatadogIntoCtx(ctx, cfg.Metrics.Namespace, cfg.Metrics.Tags, 0.5)
}

ddb := dynamodb.Load(ctx)
ddb := dynamodb.Load(*cfg)
ddb.Run(ctx)
}
5 changes: 2 additions & 3 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ type Store struct {
const jitterSleepBaseMs = 50
const shardScannerInterval = 5 * time.Minute

func Load(ctx context.Context) *Store {
cfg := config.FromContext(ctx)
func Load(cfg config.Settings) *Store {
sess, err := session.NewSession(&aws.Config{
Region: ptr.ToString(cfg.DynamoDB.AwsRegion),
Credentials: credentials.NewStaticCredentials(cfg.DynamoDB.AwsAccessKeyID, cfg.DynamoDB.AwsSecretAccessKey, ""),
Expand All @@ -59,7 +58,7 @@ func Load(ctx context.Context) *Store {
store.s3Client = s3lib.NewClient(sess)
} else {
// If it's not snapshotting, then we'll need to create offset storage, streams client and a channel.
store.storage = offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile, nil, nil)
store.storage = offsets.NewStorage(cfg.DynamoDB.OffsetFile, nil, nil)
store.streams = dynamodbstreams.New(sess)
store.shardChan = make(chan *dynamodbstreams.Shard)
}
Expand Down
8 changes: 4 additions & 4 deletions sources/dynamodb/offsets/offsets.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package offsets

import (
"context"
"fmt"
"github.com/artie-labs/reader/lib/ttlmap"
"time"

"github.com/artie-labs/reader/lib/ttlmap"
)

const ShardExpirationAndBuffer = 26 * time.Hour
Expand Down Expand Up @@ -70,7 +70,7 @@ func (o *OffsetStorage) LastProcessedSequenceNumber(shardID string) (string, boo
return fmt.Sprint(sequenceNumber), true
}

func NewStorage(ctx context.Context, fp string, cleanUpIntervalOverride, flushIntervalOverride *time.Duration) *OffsetStorage {
func NewStorage(fp string, cleanUpIntervalOverride, flushIntervalOverride *time.Duration) *OffsetStorage {
cleanUpInterval := ttlmap.DefaultCleanUpInterval
if cleanUpIntervalOverride != nil {
cleanUpInterval = *cleanUpIntervalOverride
Expand All @@ -82,7 +82,7 @@ func NewStorage(ctx context.Context, fp string, cleanUpIntervalOverride, flushIn
}

offset := &OffsetStorage{
ttlMap: ttlmap.NewMap(ctx, fp, cleanUpInterval, flushInterval),
ttlMap: ttlmap.NewMap(fp, cleanUpInterval, flushInterval),
}
return offset
}
7 changes: 4 additions & 3 deletions sources/dynamodb/offsets/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package offsets

import (
"fmt"
"github.com/stretchr/testify/assert"
"os"
"time"

"github.com/stretchr/testify/assert"
)

func ptrDuration(d time.Duration) *time.Duration {
Expand All @@ -17,7 +18,7 @@ func (o *OffsetsTestSuite) TestOffsets_Complete() {

defer assert.NoError(o.T(), os.RemoveAll(offsetsFilePath)) // Delete the file we do create during the test.

storage := NewStorage(o.ctx, offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond))
storage := NewStorage(offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond))
processedShards := []string{"foo", "bar", "xyz"}

// It should all return `False` because the file doesn't exist and we didn't load anything yet.
Expand All @@ -41,7 +42,7 @@ func (o *OffsetsTestSuite) TestOffsets_Complete() {

// Sleep, wait for the file to be committed to disk and then reload the storage.
time.Sleep(75 * time.Millisecond) // Wait for the file to be written.
storage = NewStorage(o.ctx, offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond))
storage = NewStorage(offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond))
for _, processedShard := range processedShards {
assert.True(o.T(), storage.GetShardProcessed(processedShard),
fmt.Sprintf("shard: %s, value: %v", processedShard, storage.GetShardProcessed(processedShard)))
Expand Down

0 comments on commit 435ffac

Please sign in to comment.