Skip to content

Commit

Permalink
[Optimization] Offset storage + saving processed shards (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 24, 2023
1 parent 8b320ce commit 423bf3f
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 98 deletions.
153 changes: 153 additions & 0 deletions lib/ttlmap/ttlmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package ttlmap

import (
"context"
"fmt"
"gopkg.in/yaml.v2"
"io"
"os"
"sync"
"time"

"github.com/artie-labs/reader/lib/logger"
)

const (
DefaultCleanUpInterval = 5 * time.Minute
DefaultFlushInterval = 30 * time.Second
)

type ItemWrapper struct {
Value interface{} `yaml:"value"`
Expiration int64 `yaml:"expiration"`
}

type TTLMap struct {
shouldSave bool
ctx context.Context
mu sync.RWMutex
data map[string]*ItemWrapper `yaml:"data"`
filePath string
closeChan chan struct{}
cleanupTicker *time.Ticker
flushTicker *time.Ticker
}

func NewMap(ctx context.Context, filePath string, cleanupInterval, flushInterval time.Duration) *TTLMap {
t := &TTLMap{
ctx: ctx,
data: make(map[string]*ItemWrapper),
filePath: filePath,
closeChan: make(chan struct{}),
}

if err := t.loadFromFile(); err != nil {
logger.FromContext(ctx).WithError(err).Warn("failed to load ttlmap from memory, starting a new one...")
}

t.cleanupTicker = time.NewTicker(cleanupInterval)
t.flushTicker = time.NewTicker(flushInterval)

go t.cleanUpAndFlushRoutine()

return t
}

func (t *TTLMap) Set(key string, value interface{}, ttl time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()

expiration := time.Now().Add(ttl).UnixNano()
t.data[key] = &ItemWrapper{
Value: value,
Expiration: expiration,
}

t.shouldSave = true
}

func (t *TTLMap) Get(key string) (interface{}, bool) {
t.mu.RLock()
defer t.mu.RUnlock()

item, exists := t.data[key]
if !exists || time.Now().UnixNano() > item.Expiration {
return nil, false
}

return item.Value, true
}

func (t *TTLMap) cleanUpAndFlushRoutine() {
for {
select {
case <-t.cleanupTicker.C:
t.cleanup()
case <-t.flushTicker.C:
if err := t.flush(); err != nil {
logger.FromContext(t.ctx).WithError(err).Fatal("failed to flush")
}
case <-t.closeChan:
return
}
}
}

func (t *TTLMap) cleanup() {
t.mu.Lock()
defer t.mu.Unlock()

now := time.Now().UnixNano()
for k, v := range t.data {
if now > v.Expiration {
delete(t.data, k)
t.shouldSave = true
}
}
}

func (t *TTLMap) flush() error {
if !t.shouldSave {
return nil
}

file, err := os.Create(t.filePath)
if err != nil {
return fmt.Errorf("failed to create file, err: %v", err)
}

yamlBytes, err := yaml.Marshal(t.data)
if err != nil {
return fmt.Errorf("failed to marshal data, err: %v", err)
}

if _, err = file.Write(yamlBytes); err != nil {
return fmt.Errorf("failed to write to file, err: %v", err)
}

defer file.Close()
t.shouldSave = false
return nil
}

func (t *TTLMap) loadFromFile() error {
file, err := os.Open(t.filePath)
if err != nil {
return err
}

defer file.Close()

readBytes, err := io.ReadAll(file)
if err != nil {
return fmt.Errorf("failed to read file, err: %v", err)
}

var data map[string]*ItemWrapper
if err = yaml.Unmarshal(readBytes, &data); err != nil {
return fmt.Errorf("failed to unmarshal data, err: %v", err)
}

t.data = data
return nil
}
28 changes: 28 additions & 0 deletions lib/ttlmap/ttlmap_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ttlmap

import (
"context"
"github.com/artie-labs/transfer/lib/config"
"github.com/stretchr/testify/suite"
"testing"
)

type TTLMapTestSuite struct {
suite.Suite
ctx context.Context
}

func (t *TTLMapTestSuite) SetupTest() {
ctx := config.InjectSettingsIntoContext(context.Background(), &config.Settings{
VerboseLogging: true,
Config: &config.Config{
Redshift: &config.Redshift{},
},
})

t.ctx = ctx
}

func TestTTLMapTestSuite(t *testing.T) {
suite.Run(t, new(TTLMapTestSuite))
}
66 changes: 66 additions & 0 deletions lib/ttlmap/ttlmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ttlmap

import (
"fmt"
"github.com/stretchr/testify/assert"
"os"
"time"
)

func (t *TTLMapTestSuite) TestTTLMap_Complete() {
fp := "/tmp/test.yaml"
assert.NoError(t.T(), os.RemoveAll(fp))
defer os.RemoveAll(fp)

store := NewMap(t.ctx, fp, 100*time.Millisecond, 120*time.Millisecond)
keyToDuration := map[string]time.Duration{
"foo": 50 * time.Millisecond,
"bar": 100 * time.Millisecond,
"baz": 150 * time.Millisecond,
"xyz": 2 * time.Second,
"123": 5 * time.Second,
}

for key := range keyToDuration {
_, isOk := store.Get(key)
assert.False(t.T(), isOk, fmt.Sprintf("key %s should not exist", key))
}

// Now, insert all of this and then wait 100 ms.
for key, duration := range keyToDuration {
store.Set(key, key, duration)
}

for key := range keyToDuration {
val, isOk := store.Get(key)
assert.True(t.T(), isOk, fmt.Sprintf("key %s should exist", key))
assert.Equal(t.T(), val, key)
}

// Now wait 50 ms.
time.Sleep(50 * time.Millisecond)

// foo shouldn't exist from GET, but will be still stored since GC didn't run yet.
_, isOk := store.Get("foo")
assert.False(t.T(), isOk, "foo")

store.mu.Lock()
_, isOk = store.data["foo"]
assert.True(t.T(), isOk)
store.mu.Unlock()

time.Sleep(60 * time.Millisecond)

_, isOk = store.Get("bar")
assert.False(t.T(), isOk, "bar")
store.mu.Lock()
// Did the data get erased?
for _, key := range []string{"foo", "bar"} {
_, isOk = store.data[key]
assert.False(t.T(), isOk, key)
}
store.mu.Unlock()

_, isOk = store.Get("xyz")
assert.True(t.T(), isOk, "xyz")
}
31 changes: 14 additions & 17 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ type Store struct {
storage *offsets.OffsetStorage
}

const (
flushOffsetInterval = 30 * time.Second
// jitterSleepBaseMs - sleep for 50 ms as the base.
jitterSleepBaseMs = 50
)
// jitterSleepBaseMs - sleep for 50 ms as the base.
const jitterSleepBaseMs = 50

func Load(ctx context.Context) *Store {
cfg := config.FromContext(ctx)
Expand All @@ -44,24 +41,14 @@ func Load(ctx context.Context) *Store {
tableName: cfg.DynamoDB.TableName,
streamArn: cfg.DynamoDB.StreamArn,
batchSize: cfg.Kafka.PublishSize,
storage: offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile),
storage: offsets.NewStorage(ctx, cfg.DynamoDB.OffsetFile, nil, nil),
streams: dynamodbstreams.New(sess),
}

return store
}

func (s *Store) Run(ctx context.Context) {
ticker := time.NewTicker(flushOffsetInterval)
go func() {
for {
select {
case <-ticker.C:
s.storage.Save(ctx)
}
}
}()

log := logger.FromContext(ctx)
var attempts int
for {
Expand All @@ -72,9 +59,14 @@ func (s *Store) Run(ctx context.Context) {
}

for _, shard := range result.StreamDescription.Shards {
if s.storage.GetShardProcessed(*shard.ShardId) {
logger.FromContext(ctx).WithField("shardId", *shard.ShardId).Info("shard has been processed, skipping...")
continue
}

iteratorType := "TRIM_HORIZON"
var startingSequenceNumber string
if seqNumber, exists := s.storage.ReadOnlyLastProcessedSequenceNumbers(*shard.ShardId); exists {
if seqNumber, exists := s.storage.LastProcessedSequenceNumber(*shard.ShardId); exists {
iteratorType = "AFTER_SEQUENCE_NUMBER"
startingSequenceNumber = seqNumber
}
Expand Down Expand Up @@ -153,6 +145,11 @@ func (s *Store) Run(ctx context.Context) {
}

shardIterator = getRecordsOutput.NextShardIterator
if shardIterator == nil {
// This means this shard has been fully processed, let's add it to our processed list.
logger.FromContext(ctx).WithField("shardId", *shard.ShardId).Info("shard has been fully processed, adding it to the processed list...")
s.storage.SetShardProcessed(*shard.ShardId)
}
}
}
}
Expand Down
Loading

0 comments on commit 423bf3f

Please sign in to comment.