From 423bf3ffc96ccc10141ccf5ddd97857b92efaff8 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 24 Aug 2023 13:59:44 -0700 Subject: [PATCH] [Optimization] Offset storage + saving processed shards (#8) --- lib/ttlmap/ttlmap.go | 153 +++++++++++++++++++++++ lib/ttlmap/ttlmap_suite_test.go | 28 +++++ lib/ttlmap/ttlmap_test.go | 66 ++++++++++ sources/dynamodb/dynamodb.go | 31 +++-- sources/dynamodb/offsets/offsets.go | 100 +++++---------- sources/dynamodb/offsets/offsets_test.go | 49 +++++--- 6 files changed, 329 insertions(+), 98 deletions(-) create mode 100644 lib/ttlmap/ttlmap.go create mode 100644 lib/ttlmap/ttlmap_suite_test.go create mode 100644 lib/ttlmap/ttlmap_test.go diff --git a/lib/ttlmap/ttlmap.go b/lib/ttlmap/ttlmap.go new file mode 100644 index 00000000..a3e59cf7 --- /dev/null +++ b/lib/ttlmap/ttlmap.go @@ -0,0 +1,153 @@ +package ttlmap + +import ( + "context" + "fmt" + "gopkg.in/yaml.v2" + "io" + "os" + "sync" + "time" + + "github.com/artie-labs/reader/lib/logger" +) + +const ( + DefaultCleanUpInterval = 5 * time.Minute + DefaultFlushInterval = 30 * time.Second +) + +type ItemWrapper struct { + Value interface{} `yaml:"value"` + Expiration int64 `yaml:"expiration"` +} + +type TTLMap struct { + shouldSave bool + ctx context.Context + mu sync.RWMutex + data map[string]*ItemWrapper `yaml:"data"` + filePath string + closeChan chan struct{} + cleanupTicker *time.Ticker + flushTicker *time.Ticker +} + +func NewMap(ctx context.Context, filePath string, cleanupInterval, flushInterval time.Duration) *TTLMap { + t := &TTLMap{ + ctx: ctx, + data: make(map[string]*ItemWrapper), + filePath: filePath, + closeChan: make(chan struct{}), + } + + if err := t.loadFromFile(); err != nil { + logger.FromContext(ctx).WithError(err).Warn("failed to load ttlmap from memory, starting a new one...") + } + + t.cleanupTicker = time.NewTicker(cleanupInterval) + t.flushTicker = time.NewTicker(flushInterval) + + go t.cleanUpAndFlushRoutine() + + return t +} + +func (t *TTLMap) Set(key string, value interface{}, ttl time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + + expiration := time.Now().Add(ttl).UnixNano() + t.data[key] = &ItemWrapper{ + Value: value, + Expiration: expiration, + } + + t.shouldSave = true +} + +func (t *TTLMap) Get(key string) (interface{}, bool) { + t.mu.RLock() + defer t.mu.RUnlock() + + item, exists := t.data[key] + if !exists || time.Now().UnixNano() > item.Expiration { + return nil, false + } + + return item.Value, true +} + +func (t *TTLMap) cleanUpAndFlushRoutine() { + for { + select { + case <-t.cleanupTicker.C: + t.cleanup() + case <-t.flushTicker.C: + if err := t.flush(); err != nil { + logger.FromContext(t.ctx).WithError(err).Fatal("failed to flush") + } + case <-t.closeChan: + return + } + } +} + +func (t *TTLMap) cleanup() { + t.mu.Lock() + defer t.mu.Unlock() + + now := time.Now().UnixNano() + for k, v := range t.data { + if now > v.Expiration { + delete(t.data, k) + t.shouldSave = true + } + } +} + +func (t *TTLMap) flush() error { + if !t.shouldSave { + return nil + } + + file, err := os.Create(t.filePath) + if err != nil { + return fmt.Errorf("failed to create file, err: %v", err) + } + + yamlBytes, err := yaml.Marshal(t.data) + if err != nil { + return fmt.Errorf("failed to marshal data, err: %v", err) + } + + if _, err = file.Write(yamlBytes); err != nil { + return fmt.Errorf("failed to write to file, err: %v", err) + } + + defer file.Close() + t.shouldSave = false + return nil +} + +func (t *TTLMap) loadFromFile() error { + file, err := os.Open(t.filePath) + if err != nil { + return err + } + + defer file.Close() + + readBytes, err := io.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file, err: %v", err) + } + + var data map[string]*ItemWrapper + if err = yaml.Unmarshal(readBytes, &data); err != nil { + return fmt.Errorf("failed to unmarshal data, err: %v", err) + } + + t.data = data + return nil +} diff --git a/lib/ttlmap/ttlmap_suite_test.go b/lib/ttlmap/ttlmap_suite_test.go new file mode 100644 index 00000000..76c6ec5e --- /dev/null +++ b/lib/ttlmap/ttlmap_suite_test.go @@ -0,0 +1,28 @@ +package ttlmap + +import ( + "context" + "github.com/artie-labs/transfer/lib/config" + "github.com/stretchr/testify/suite" + "testing" +) + +type TTLMapTestSuite struct { + suite.Suite + ctx context.Context +} + +func (t *TTLMapTestSuite) SetupTest() { + ctx := config.InjectSettingsIntoContext(context.Background(), &config.Settings{ + VerboseLogging: true, + Config: &config.Config{ + Redshift: &config.Redshift{}, + }, + }) + + t.ctx = ctx +} + +func TestTTLMapTestSuite(t *testing.T) { + suite.Run(t, new(TTLMapTestSuite)) +} diff --git a/lib/ttlmap/ttlmap_test.go b/lib/ttlmap/ttlmap_test.go new file mode 100644 index 00000000..8cc64021 --- /dev/null +++ b/lib/ttlmap/ttlmap_test.go @@ -0,0 +1,66 @@ +package ttlmap + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "os" + "time" +) + +func (t *TTLMapTestSuite) TestTTLMap_Complete() { + fp := "/tmp/test.yaml" + assert.NoError(t.T(), os.RemoveAll(fp)) + defer os.RemoveAll(fp) + + store := NewMap(t.ctx, fp, 100*time.Millisecond, 120*time.Millisecond) + keyToDuration := map[string]time.Duration{ + "foo": 50 * time.Millisecond, + "bar": 100 * time.Millisecond, + "baz": 150 * time.Millisecond, + "xyz": 2 * time.Second, + "123": 5 * time.Second, + } + + for key := range keyToDuration { + _, isOk := store.Get(key) + assert.False(t.T(), isOk, fmt.Sprintf("key %s should not exist", key)) + } + + // Now, insert all of this and then wait 100 ms. + for key, duration := range keyToDuration { + store.Set(key, key, duration) + } + + for key := range keyToDuration { + val, isOk := store.Get(key) + assert.True(t.T(), isOk, fmt.Sprintf("key %s should exist", key)) + assert.Equal(t.T(), val, key) + } + + // Now wait 50 ms. + time.Sleep(50 * time.Millisecond) + + // foo shouldn't exist from GET, but will be still stored since GC didn't run yet. + _, isOk := store.Get("foo") + assert.False(t.T(), isOk, "foo") + + store.mu.Lock() + _, isOk = store.data["foo"] + assert.True(t.T(), isOk) + store.mu.Unlock() + + time.Sleep(60 * time.Millisecond) + + _, isOk = store.Get("bar") + assert.False(t.T(), isOk, "bar") + store.mu.Lock() + // Did the data get erased? + for _, key := range []string{"foo", "bar"} { + _, isOk = store.data[key] + assert.False(t.T(), isOk, key) + } + store.mu.Unlock() + + _, isOk = store.Get("xyz") + assert.True(t.T(), isOk, "xyz") +} diff --git a/sources/dynamodb/dynamodb.go b/sources/dynamodb/dynamodb.go index b787c183..fa9a0fba 100644 --- a/sources/dynamodb/dynamodb.go +++ b/sources/dynamodb/dynamodb.go @@ -24,11 +24,8 @@ type Store struct { storage *offsets.OffsetStorage } -const ( - flushOffsetInterval = 30 * time.Second - // jitterSleepBaseMs - sleep for 50 ms as the base. - jitterSleepBaseMs = 50 -) +// jitterSleepBaseMs - sleep for 50 ms as the base. +const jitterSleepBaseMs = 50 func Load(ctx context.Context) *Store { cfg := config.FromContext(ctx) @@ -44,7 +41,7 @@ func Load(ctx context.Context) *Store { tableName: cfg.DynamoDB.TableName, streamArn: cfg.DynamoDB.StreamArn, batchSize: cfg.Kafka.PublishSize, - storage: offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile), + storage: offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile, nil, nil), streams: dynamodbstreams.New(sess), } @@ -52,16 +49,6 @@ func Load(ctx context.Context) *Store { } func (s *Store) Run(ctx context.Context) { - ticker := time.NewTicker(flushOffsetInterval) - go func() { - for { - select { - case <-ticker.C: - s.storage.Save(ctx) - } - } - }() - log := logger.FromContext(ctx) var attempts int for { @@ -72,9 +59,14 @@ func (s *Store) Run(ctx context.Context) { } for _, shard := range result.StreamDescription.Shards { + if s.storage.GetShardProcessed(*shard.ShardId) { + logger.FromContext(ctx).WithField("shardId", *shard.ShardId).Info("shard has been processed, skipping...") + continue + } + iteratorType := "TRIM_HORIZON" var startingSequenceNumber string - if seqNumber, exists := s.storage.ReadOnlyLastProcessedSequenceNumbers(*shard.ShardId); exists { + if seqNumber, exists := s.storage.LastProcessedSequenceNumber(*shard.ShardId); exists { iteratorType = "AFTER_SEQUENCE_NUMBER" startingSequenceNumber = seqNumber } @@ -153,6 +145,11 @@ func (s *Store) Run(ctx context.Context) { } shardIterator = getRecordsOutput.NextShardIterator + if shardIterator == nil { + // This means this shard has been fully processed, let's add it to our processed list. + logger.FromContext(ctx).WithField("shardId", *shard.ShardId).Info("shard has been fully processed, adding it to the processed list...") + s.storage.SetShardProcessed(*shard.ShardId) + } } } } diff --git a/sources/dynamodb/offsets/offsets.go b/sources/dynamodb/offsets/offsets.go index 633636db..c7ae12b4 100644 --- a/sources/dynamodb/offsets/offsets.go +++ b/sources/dynamodb/offsets/offsets.go @@ -1,95 +1,61 @@ package offsets import ( - "bufio" "context" "fmt" - "github.com/artie-labs/reader/lib/logger" - "os" - "strings" - "sync" + "github.com/artie-labs/reader/lib/ttlmap" + "time" ) +const ShardExpirationAndBuffer = 26 * time.Hour + type OffsetStorage struct { - lastProcessedSeqNumbers map[string]string - shouldSave bool - fp string - sync.Mutex + ttlMap *ttlmap.TTLMap } -func (o *OffsetStorage) SetLastProcessedSequenceNumber(shardID string, sequenceNumber string) { - o.Lock() - defer o.Unlock() - o.lastProcessedSeqNumbers[shardID] = sequenceNumber - o.shouldSave = true +func shardProcessKey(shardId string) string { + return fmt.Sprintf("processed#shardId#%s", shardId) } -func (o *OffsetStorage) ReadOnlyLastProcessedSequenceNumbers(shardID string) (string, bool) { - o.Lock() - defer o.Unlock() +func shardSeqNumberKey(shardId string) string { + return fmt.Sprintf("seqNumber#shardId#%s", shardId) +} - val, isOk := o.lastProcessedSeqNumbers[shardID] - return val, isOk +func (o *OffsetStorage) SetShardProcessed(shardID string) { + o.ttlMap.Set(shardProcessKey(shardID), true, ShardExpirationAndBuffer) } -func NewStorage(ctx context.Context, fp string) *OffsetStorage { - offset := &OffsetStorage{ - lastProcessedSeqNumbers: make(map[string]string), - fp: fp, - } +func (o *OffsetStorage) GetShardProcessed(shardID string) bool { + _, isOk := o.ttlMap.Get(shardProcessKey(shardID)) + return isOk +} - offset.load(ctx) - return offset +func (o *OffsetStorage) SetLastProcessedSequenceNumber(shardID string, sequenceNumber string) { + o.ttlMap.Set(shardSeqNumberKey(shardID), sequenceNumber, ShardExpirationAndBuffer) } -func (o *OffsetStorage) load(ctx context.Context) { - log := logger.FromContext(ctx) - log.Infof("loading DynamoDB offsets from file: %s", o.fp) - file, err := os.Open(o.fp) - if err != nil { - log.WithError(err).Warn("failed to open DynamoDB offset file, so not using previously stored offsets...") - return +func (o *OffsetStorage) LastProcessedSequenceNumber(shardID string) (string, bool) { + sequenceNumber, isOk := o.ttlMap.Get(shardSeqNumberKey(shardID)) + if !isOk { + return "", false } - defer file.Close() - scanner := bufio.NewScanner(file) - for scanner.Scan() { - parts := strings.Split(scanner.Text(), ":") - if len(parts) == 2 { - shardID := parts[0] - sequenceNumber := parts[1] - o.lastProcessedSeqNumbers[shardID] = sequenceNumber - } - } - if err := scanner.Err(); err != nil { - log.Printf("Error reading offset file: %v", err) - } + return fmt.Sprint(sequenceNumber), true } -func (o *OffsetStorage) Save(ctx context.Context) { - o.Lock() - defer o.Unlock() - - if !o.shouldSave { - return +func NewStorage(ctx context.Context, fp string, cleanUpIntervalOverride, flushIntervalOverride *time.Duration) *OffsetStorage { + cleanUpInterval := ttlmap.DefaultCleanUpInterval + if cleanUpIntervalOverride != nil { + cleanUpInterval = *cleanUpIntervalOverride } - file, err := os.Create(o.fp) - if err != nil { - logger.FromContext(ctx).WithError(err).Fatal("failed to create DynamoDB offset file") + flushInterval := ttlmap.DefaultFlushInterval + if flushIntervalOverride != nil { + flushInterval = *flushIntervalOverride } - defer file.Close() - - writer := bufio.NewWriter(file) - for shardID, sequenceNumber := range o.lastProcessedSeqNumbers { - _, err = writer.WriteString(fmt.Sprintf("%s:%s\n", shardID, sequenceNumber)) - if err != nil { - logger.FromContext(ctx).WithError(err).Fatal("failed to write to DynamoDB offset file") - continue - } + offset := &OffsetStorage{ + ttlMap: ttlmap.NewMap(ctx, fp, cleanUpInterval, flushInterval), } - - _ = writer.Flush() - o.shouldSave = false + return offset } diff --git a/sources/dynamodb/offsets/offsets_test.go b/sources/dynamodb/offsets/offsets_test.go index b454ec84..d915c534 100644 --- a/sources/dynamodb/offsets/offsets_test.go +++ b/sources/dynamodb/offsets/offsets_test.go @@ -1,34 +1,55 @@ package offsets import ( + "fmt" "github.com/stretchr/testify/assert" "os" + "time" ) +func ptrDuration(d time.Duration) *time.Duration { + return &d +} + func (o *OffsetsTestSuite) TestOffsets_Complete() { offsetsFilePath := "/tmp/offsets-test" - err := os.RemoveAll(offsetsFilePath) - assert.NoError(o.T(), err) + assert.NoError(o.T(), os.RemoveAll(offsetsFilePath)) // Delete if prev run wasn't clean. + + defer assert.NoError(o.T(), os.RemoveAll(offsetsFilePath)) // Delete the file we do create during the test. + + storage := NewStorage(o.ctx, offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond)) + processedShards := []string{"foo", "bar", "xyz"} - storage := NewStorage(o.ctx, offsetsFilePath) - originalLastProcessedSeqNumbers := map[string]string{ + // It should all return `False` because the file doesn't exist and we didn't load anything yet. + for _, processedShard := range processedShards { + assert.False(o.T(), storage.GetShardProcessed(processedShard), processedShard) + storage.SetShardProcessed(processedShard) + } + + shardToSequenceNumber := map[string]string{ "shard-1": "123", "shard-2": "456", "shard-3": "789", } - // Try to save a bunch of times, file will not exist since shouldSave = false - _, err = os.Open(offsetsFilePath) - assert.Error(o.T(), err) + for shard, sequenceNumber := range shardToSequenceNumber { + _, isOk := storage.LastProcessedSequenceNumber(shard) + assert.False(o.T(), isOk, shard) - for shard, lastProcessedSequenceNumber := range originalLastProcessedSeqNumbers { - storage.SetLastProcessedSequenceNumber(shard, lastProcessedSequenceNumber) + storage.SetLastProcessedSequenceNumber(shard, sequenceNumber) } - storage.Save(o.ctx) - storage.lastProcessedSeqNumbers = map[string]string{} - storage.load(o.ctx) + // Sleep, wait for the file to be committed to disk and then reload the storage. + time.Sleep(75 * time.Millisecond) // Wait for the file to be written. + storage = NewStorage(o.ctx, offsetsFilePath, ptrDuration(50*time.Millisecond), ptrDuration(50*time.Millisecond)) + for _, processedShard := range processedShards { + assert.True(o.T(), storage.GetShardProcessed(processedShard), + fmt.Sprintf("shard: %s, value: %v", processedShard, storage.GetShardProcessed(processedShard))) + } - assert.False(o.T(), storage.shouldSave) - assert.Equal(o.T(), originalLastProcessedSeqNumbers, storage.lastProcessedSeqNumbers) + for shard, sequenceNumber := range shardToSequenceNumber { + retrievedSeqNumber, isOk := storage.LastProcessedSequenceNumber(shard) + assert.True(o.T(), isOk, shard) + assert.Equal(o.T(), sequenceNumber, retrievedSeqNumber, shard) + } }