Skip to content

Commit

Permalink
Adding tests and CI (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Aug 21, 2023
1 parent 42b2482 commit 4e43558
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 58 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/gha-go-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: Go tests
run-name: Running tests 🚀
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3 # Checks out the code
- name: Setting Go up
uses: actions/setup-go@v3
with:
go-version: 1.19
- name: test
run: make test
- name: test-race
run: make race
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
.PHONY: test
test:
go test ./...

.PHONY: race
race:
go test -race ./...
8 changes: 6 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *Settings) Validate() error {
return nil
}

func InjectIntoContext(ctx context.Context, fp string) context.Context {
func ReadConfig(fp string) (*Settings, error) {
bytes, err := os.ReadFile(fp)
if err != nil {
log.Fatalf("failed to read config file, err: %v", err)
Expand All @@ -104,7 +104,11 @@ func InjectIntoContext(ctx context.Context, fp string) context.Context {
log.Fatalf("failed to unmarshal config file, err: %v", err)
}

return context.WithValue(ctx, ctxKey, &settings)
return &settings, nil
}

func InjectIntoContext(ctx context.Context, settings *Settings) context.Context {
return context.WithValue(ctx, ctxKey, settings)
}

func FromContext(ctx context.Context) *Settings {
Expand Down
35 changes: 34 additions & 1 deletion lib/dynamo/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
"github.com/segmentio/kafka-go"
"strconv"
"time"
)

Expand All @@ -22,12 +23,22 @@ type Message struct {
executionTime time.Time
}

func stringToFloat64(s string) (float64, error) {
return strconv.ParseFloat(s, 64)
}

func transformAttributeValue(attr *dynamodb.AttributeValue) interface{} {
switch {
case attr.S != nil:
return *attr.S
case attr.N != nil:
return *attr.N
number, err := stringToFloat64(*attr.N)
if err == nil {
return number
} else {
// TODO - should we throw an error here?
return nil
}
case attr.BOOL != nil:
return *attr.BOOL
case attr.M != nil:
Expand All @@ -42,7 +53,29 @@ func transformAttributeValue(attr *dynamodb.AttributeValue) interface{} {
list[i] = transformAttributeValue(item)
}
return list
case attr.SS != nil:
// Convert the string set to a slice of strings.
strSet := make([]string, len(attr.SS))
for i, s := range attr.SS {
strSet[i] = *s
}
return strSet
case attr.NS != nil:
// Convert the number set to a slice of strings (since the numbers are stored as strings).
numSet := make([]float64, len(attr.NS))
for i, n := range attr.NS {
number, err := stringToFloat64(*n)
if err != nil {
// TODO - should we throw an error here?
return nil
}

numSet[i] = number
}

return numSet
}

return nil
}

Expand Down
22 changes: 22 additions & 0 deletions lib/dynamo/message_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package dynamo

import (
"context"
"github.com/artie-labs/reader/config"
"github.com/stretchr/testify/suite"
"testing"
)

type DynamoDBTestSuite struct {
suite.Suite
ctx context.Context
}

func (d *DynamoDBTestSuite) SetupTest() {
d.ctx = context.Background()
d.ctx = config.InjectIntoContext(d.ctx, &config.Settings{})
}

func TestDynamoDBTestSuite(t *testing.T) {
suite.Run(t, new(DynamoDBTestSuite))
}
124 changes: 124 additions & 0 deletions lib/dynamo/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package dynamo

import (
"github.com/artie-labs/transfer/lib/ptr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/stretchr/testify/assert"
)

func (d *DynamoDBTestSuite) TestTransformAttributeValue() {
type _tc struct {
name string
attr *dynamodb.AttributeValue
expectedValue interface{}
}

tcs := []_tc{
{
name: "string",
attr: &dynamodb.AttributeValue{
S: ptr.ToString("hello"),
},
expectedValue: "hello",
},
{
name: "number",
attr: &dynamodb.AttributeValue{
N: ptr.ToString("123"),
},
expectedValue: float64(123),
},
{
name: "boolean",
attr: &dynamodb.AttributeValue{
BOOL: ptr.ToBool(true),
},
expectedValue: true,
},
{
name: "map",
attr: &dynamodb.AttributeValue{
M: map[string]*dynamodb.AttributeValue{
"foo": {
S: ptr.ToString("bar"),
},
"bar": {
N: ptr.ToString("123"),
},
"nested_map": {
M: map[string]*dynamodb.AttributeValue{
"foo": {
S: ptr.ToString("bar"),
},
},
},
},
},
expectedValue: map[string]interface{}{
"foo": "bar",
"bar": float64(123),
"nested_map": map[string]interface{}{
"foo": "bar",
},
},
},
{
name: "list",
attr: &dynamodb.AttributeValue{
L: []*dynamodb.AttributeValue{
{
S: ptr.ToString("foo"),
},
{
N: ptr.ToString("123"),
},
{
M: map[string]*dynamodb.AttributeValue{
"foo": {
S: ptr.ToString("bar"),
},
},
},
},
},
expectedValue: []interface{}{
"foo",
float64(123),
map[string]interface{}{
"foo": "bar",
},
},
},
{
name: "string set",
attr: &dynamodb.AttributeValue{
SS: []*string{
ptr.ToString("foo"),
ptr.ToString("bar"),
},
},
expectedValue: []string{
"foo",
"bar",
},
},
{
name: "number set",
attr: &dynamodb.AttributeValue{
NS: []*string{
ptr.ToString("123"),
ptr.ToString("456"),
},
},
expectedValue: []float64{
123,
456,
},
},
}

for _, tc := range tcs {
actualValue := transformAttributeValue(tc.attr)
assert.Equal(d.T(), tc.expectedValue, actualValue, tc.name)
}
}
5 changes: 2 additions & 3 deletions lib/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
const loggerKey = "_log"

func InjectLoggerIntoCtx(ctx context.Context) context.Context {
return context.WithValue(ctx, loggerKey, new(config.FromContext(ctx)))
return context.WithValue(ctx, loggerKey, initLogger(config.FromContext(ctx)))
}

func FromContext(ctx context.Context) *logrus.Logger {
logVal := ctx.Value(loggerKey)
if logVal == nil {
// Inject this back into context, so we don't need to initialize this again
return FromContext(InjectLoggerIntoCtx(ctx))
}

Expand All @@ -29,7 +28,7 @@ func FromContext(ctx context.Context) *logrus.Logger {
return log
}

func new(settings *config.Settings) *logrus.Logger {
func initLogger(settings *config.Settings) *logrus.Logger {
log := logrus.New()
log.SetOutput(os.Stdout)

Expand Down
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ import (
"github.com/artie-labs/reader/lib/kafka"
"github.com/artie-labs/reader/lib/logger"
"github.com/artie-labs/reader/sources/dynamodb"
"log"
)

func main() {
var configFilePath string
flag.StringVar(&configFilePath, "config", "", "path to config file")
flag.Parse()

// Logger as well
ctx := config.InjectIntoContext(context.Background(), configFilePath)
cfg, err := config.ReadConfig(configFilePath)
if err != nil {
log.Fatalf("failed to read config file, err: %v", err)
}

ctx := config.InjectIntoContext(context.Background(), cfg)
ctx = logger.InjectLoggerIntoCtx(ctx)
ctx = kafka.InjectIntoContext(ctx)

Expand Down
54 changes: 4 additions & 50 deletions sources/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package dynamodb

import (
"bufio"
"context"
"fmt"
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib/dynamo"
"github.com/artie-labs/reader/lib/kafka"
Expand All @@ -13,8 +11,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
"os"
"strings"
"time"
)

Expand Down Expand Up @@ -42,17 +38,19 @@ func Load(ctx context.Context) *Store {
logger.FromContext(ctx).Fatalf("Failed to create session: %v", err)
}

return &Store{
store := &Store{
tableName: cfg.DynamoDB.TableName,
streamArn: cfg.DynamoDB.StreamArn,
offsetFilePath: cfg.DynamoDB.OffsetFile,
lastProcessedSeqNumbers: make(map[string]string),
streams: dynamodbstreams.New(sess),
}

store.loadOffsets(ctx)
return store
}

func (s *Store) Run(ctx context.Context) {
s.loadOffsets(ctx)
ticker := time.NewTicker(flushOffsetInterval)
go func() {
for {
Expand Down Expand Up @@ -177,47 +175,3 @@ func (s *Store) Run(ctx context.Context) {
}
}
}

func (s *Store) loadOffsets(ctx context.Context) {
log := logger.FromContext(ctx)
log.Infof("loading DynamoDB offsets from file: %s", s.offsetFilePath)
file, err := os.Open(s.offsetFilePath)
if err != nil {
log.WithError(err).Warn("failed to open DynamoDB offset file, so not using previously stored offsets...")
return
}

defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
parts := strings.Split(scanner.Text(), ":")
if len(parts) == 2 {
shardID := parts[0]
sequenceNumber := parts[1]
s.lastProcessedSeqNumbers[shardID] = sequenceNumber
}
}
if err := scanner.Err(); err != nil {
log.Printf("Error reading offset file: %v", err)
}
}

func (s *Store) saveOffsets(ctx context.Context) {
file, err := os.Create(s.offsetFilePath)
if err != nil {
logger.FromContext(ctx).WithError(err).Fatal("failed to create DynamoDB offset file")
}

defer file.Close()

writer := bufio.NewWriter(file)
for shardID, sequenceNumber := range s.lastProcessedSeqNumbers {
_, err = writer.WriteString(fmt.Sprintf("%s:%s\n", shardID, sequenceNumber))
if err != nil {
logger.FromContext(ctx).WithError(err).Fatal("failed to write to DynamoDB offset file")
continue
}
}

_ = writer.Flush()
}
Loading

0 comments on commit 4e43558

Please sign in to comment.