From 85a241bbc38521205e31d7e1c9ab5bca30614c3b Mon Sep 17 00:00:00 2001 From: eaddingtonwhite <5491827+ellery44@users.noreply.github.com> Date: Mon, 30 Sep 2024 09:08:25 -0700 Subject: [PATCH] feat: make serializer used configurable --- caching/caching.go | 103 ++++++++------------ caching/caching_test.go | 128 +++++++++++++++--------- go.mod | 2 + go.sum | 6 ++ internal/serializer/bench_test.go | 155 ++++++++++++++++++++++++++++++ internal/serializer/serializer.go | 70 ++++++++++++++ 6 files changed, 356 insertions(+), 108 deletions(-) create mode 100644 internal/serializer/bench_test.go create mode 100644 internal/serializer/serializer.go diff --git a/caching/caching.go b/caching/caching.go index e446201..82ecca5 100644 --- a/caching/caching.go +++ b/caching/caching.go @@ -1,21 +1,20 @@ package caching import ( - "bytes" "context" "crypto/sha256" "encoding/hex" - "encoding/json" "errors" "fmt" "sort" + "github.com/momentohq/go-aws-sdk-middlewares/internal/serializer" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/smithy-go/middleware" - "github.com/momentohq/client-sdk-go/config/logger" "github.com/momentohq/client-sdk-go/momento" "github.com/momentohq/client-sdk-go/responses" ) @@ -28,11 +27,22 @@ const ( DISABLED WritebackType = "DISABLED" ) +type MsgPackSerializer = serializer.MsgPackSerializer +type JSONSerializer = serializer.JSONSerializer + type cachingMiddleware struct { cacheName string momentoClient momento.CacheClient writebackType WritebackType asyncWriteChan chan *momento.SetBatchRequest + serializer Serializer +} + +// Serializer defines the methods for serializing and deserializing data. +type Serializer interface { + Name() string + Serialize(item map[string]types.AttributeValue) ([]byte, error) + Deserialize(data []byte) (map[string]types.AttributeValue, error) } type MiddlewareProps struct { @@ -40,29 +50,37 @@ type MiddlewareProps struct { CacheName string MomentoClient momento.CacheClient WritebackType WritebackType + Serializer Serializer } func AttachNewCachingMiddleware(props MiddlewareProps) { if props.WritebackType == "" { props.WritebackType = SYNCHRONOUS } + + if props.Serializer == nil { + props.Serializer = serializer.JSONSerializer{} + } + props.MomentoClient.Logger().Debug("attaching Momento caching middleware with writeback type " + string(props.WritebackType)) props.AwsConfig.APIOptions = append(props.AwsConfig.APIOptions, func(stack *middleware.Stack) error { return stack.Initialize.Add( - NewCachingMiddleware(&cachingMiddleware{ + newCachingMiddleware(&cachingMiddleware{ cacheName: props.CacheName, momentoClient: props.MomentoClient, writebackType: props.WritebackType, + serializer: props.Serializer, }), middleware.Before, ) }) } -func NewCachingMiddleware(mw *cachingMiddleware) middleware.InitializeMiddleware { +func newCachingMiddleware(mw *cachingMiddleware) middleware.InitializeMiddleware { if mw.writebackType == ASYNCHRONOUS { mw.startAsyncBatchWriter() } + return middleware.InitializeMiddlewareFunc("CachingMiddleware", func( ctx context.Context, in middleware.InitializeInput, next middleware.InitializeHandler, ) (out middleware.InitializeOutput, metadata middleware.Metadata, err error) { @@ -167,7 +185,7 @@ func (d *cachingMiddleware) handleBatchGetItemCommand(ctx context.Context, input } gatherKeys = false } - cacheKey, err := ComputeCacheKey(tableName, key) + cacheKey, err := ComputeCacheKey(tableName, key, d.serializer) if err != nil { return middleware.InitializeOutput{}, fmt.Errorf("error getting key for caching: %w", err) } @@ -195,11 +213,11 @@ func (d *cachingMiddleware) handleBatchGetItemCommand(ctx context.Context, input switch e := element.(type) { case *responses.GetHit: gotHit = true - marshalMap, err := GetMarshalMap(e) + deserializedMap, err := d.serializer.Deserialize(e.ValueByte()) if err != nil { - return middleware.InitializeOutput{}, fmt.Errorf("error with marshal map: %w", err) + return middleware.InitializeOutput{}, fmt.Errorf("error with desrializing map: %w", err) } - responsesToReturn[tableName] = append(responsesToReturn[tableName], marshalMap) + responsesToReturn[tableName] = append(responsesToReturn[tableName], deserializedMap) case *responses.GetMiss: gotMiss = true if _, ok := cacheMissesPerTable[tableName]; !ok { @@ -315,7 +333,7 @@ func (d *cachingMiddleware) handleGetItemCommand(ctx context.Context, input *dyn } // Derive a cache key from DDB request - cacheKey, err := ComputeCacheKey(*input.TableName, input.Key) + cacheKey, err := ComputeCacheKey(*input.TableName, input.Key, d.serializer) if err != nil { return middleware.InitializeOutput{}, fmt.Errorf("error getting key for caching: %w", err) } @@ -329,15 +347,15 @@ func (d *cachingMiddleware) handleGetItemCommand(ctx context.Context, input *dyn if err == nil { switch r := rsp.(type) { case *responses.GetHit: - // On hit decode value from stored json to DDB attribute map - marshalMap, err := GetMarshalMap(r) + // On hit decode value from value stored in cache to a DDB attribute map + deserializedMap, err := d.serializer.Deserialize(r.ValueByte()) if err != nil { return middleware.InitializeOutput{}, fmt.Errorf("error with marshal map: %w", err) } d.momentoClient.Logger().Debug("returning cached item") // Return user spoofed dynamodb.GetItemOutput.Item w/ cached value return struct{ Result interface{} }{Result: &dynamodb.GetItemOutput{ - Item: marshalMap, + Item: deserializedMap, }}, nil case *responses.GetMiss: @@ -368,10 +386,9 @@ func (d *cachingMiddleware) handleGetItemCommand(ctx context.Context, input *dyn } func (d *cachingMiddleware) writeResultToCache(ctx context.Context, ddbOutput *dynamodb.GetItemOutput, cacheKey string) { - // unmarshal raw response object to DDB attribute values map and encode as json - j, err := MarshalToJson(ddbOutput.Item, d.momentoClient.Logger()) + b, err := d.serializer.Serialize(ddbOutput.Item) if err != nil { - d.momentoClient.Logger().Warn(fmt.Sprintf("error marshalling item to json: %+v", err)) + d.momentoClient.Logger().Warn(fmt.Sprintf("error serializing item: %+v", err)) } d.momentoClient.Logger().Debug(fmt.Sprintf("caching item with key: %s", cacheKey)) @@ -379,7 +396,7 @@ func (d *cachingMiddleware) writeResultToCache(ctx context.Context, ddbOutput *d _, err = d.momentoClient.Set(ctx, &momento.SetRequest{ CacheName: d.cacheName, Key: momento.String(cacheKey), - Value: momento.Bytes(j), + Value: momento.Bytes(b), }) if err != nil { d.momentoClient.Logger().Warn( @@ -410,9 +427,9 @@ func (d *cachingMiddleware) prepareMomentoBatchGetRequest(ddbOutput *dynamodb.Ba // compute and gather keys and JSON encoded items to store in Momento cache for tableName, items := range ddbOutput.Responses { for _, item := range items { - j, err := MarshalToJson(item, d.momentoClient.Logger()) + b, err := d.serializer.Serialize(item) if err != nil { - d.momentoClient.Logger().Warn(fmt.Sprintf("error marshalling item to json: %+v", err)) + d.momentoClient.Logger().Warn(fmt.Sprintf("error seralizing item: %+v", err)) continue } @@ -421,7 +438,7 @@ func (d *cachingMiddleware) prepareMomentoBatchGetRequest(ddbOutput *dynamodb.Ba for _, key := range tableToDdbKeys[tableName] { itemForKey[key] = item[key] } - cacheKey, err := ComputeCacheKey(tableName, itemForKey) + cacheKey, err := ComputeCacheKey(tableName, itemForKey, d.serializer) if err != nil { d.momentoClient.Logger().Warn(fmt.Sprintf("error getting key for caching: %+v", err)) continue @@ -430,7 +447,7 @@ func (d *cachingMiddleware) prepareMomentoBatchGetRequest(ddbOutput *dynamodb.Ba itemsToSet = append(itemsToSet, momento.BatchSetItem{ Key: momento.String(cacheKey), - Value: momento.Bytes(j), + Value: momento.Bytes(b), }) } } @@ -440,7 +457,7 @@ func (d *cachingMiddleware) prepareMomentoBatchGetRequest(ddbOutput *dynamodb.Ba } } -func ComputeCacheKey(tableName string, keys map[string]types.AttributeValue) (string, error) { +func ComputeCacheKey(tableName string, keys map[string]types.AttributeValue, serializer Serializer) (string, error) { // Marshal to attribute map var t map[string]interface{} err := attributevalue.UnmarshalMap(keys, &t) @@ -461,50 +478,12 @@ func ComputeCacheKey(tableName string, keys map[string]types.AttributeValue) (st out += fieldToValue[k] } - // prefix key w/ table name and convert to fixed length hash + // prefix key w/ table name + serializer used and convert to fixed length hash hash := sha256.New() - hash.Write([]byte(tableName + out)) + hash.Write([]byte(tableName + serializer.Name() + out)) return hex.EncodeToString(hash.Sum(nil)), nil } -func GetMarshalMap(r *responses.GetHit) (map[string]types.AttributeValue, error) { - // On hit decode value from stored json to DDB attribute map - var t map[string]interface{} - err := json.NewDecoder(bytes.NewReader(r.ValueByte())).Decode(&t) - if err != nil { - return nil, fmt.Errorf("error decoding json item in cache to return: %w", err) - } - - // Marshal from attribute map to dynamodb.GetItemOutput.Item - marshalMap, err := attributevalue.MarshalMap(t) - if err != nil { - return nil, fmt.Errorf("error encoding item in cache to ddbItem to return: %w", err) - } - return marshalMap, nil -} - -func MarshalToJson(item map[string]types.AttributeValue, logger logger.MomentoLogger) ([]byte, error) { - // unmarshal raw response object to DDB attribute values map - var t map[string]interface{} - err := attributevalue.UnmarshalMap(item, &t) - if err != nil { - logger.Warn( - fmt.Sprintf("error decoding output item to store in cache err=%+v", err), - ) - return nil, fmt.Errorf("error decoding output item to store in cache err=%+v", err) - } - - // Marshal to JSON to store in cache - j, err := json.Marshal(t) - if err != nil { - logger.Warn( - fmt.Sprintf("error json encoding new item to store in cache err=%+v", err), - ) - return nil, fmt.Errorf("error json encoding new item to store in cache err=%+v", err) - } - return j, nil -} - // safeGetDDItemFromResponseSlice safely checks the slice of DDB item responses to avoid a panic func safeGetDDItemFromResponseSlice(slice []map[string]types.AttributeValue, index int) (map[string]types.AttributeValue, bool) { if index >= 0 && index < len(slice) { diff --git a/caching/caching_test.go b/caching/caching_test.go index 0a06c22..29e8597 100644 --- a/caching/caching_test.go +++ b/caching/caching_test.go @@ -1,15 +1,15 @@ package caching import ( - "bytes" "context" - "encoding/json" "errors" "fmt" "log" "testing" "time" + "github.com/momentohq/go-aws-sdk-middlewares/internal/serializer" + "github.com/aws/aws-sdk-go-v2/aws" awsConfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" @@ -37,18 +37,27 @@ type Movie struct { } var ( - momentoClient momento.CacheClient - ddbClient *dynamodb.Client - tableInfo TableBasics - tableName string - movies []Movie - movie1 Movie - movie2 Movie - movie1hash string - movie2hash string - movie1json2022 = "{\"info\":null,\"title\":\"A Movie Part 1\",\"year\":2022}" - movie2json2022 = "{\"info\":null,\"title\":\"A Movie Part 2\",\"year\":2022}" - writebackType = SYNCHRONOUS + momentoClient momento.CacheClient + ddbClient *dynamodb.Client + ddbSerializer Serializer + tableInfo TableBasics + tableName string + movies []Movie + movie1 Movie + movie2 Movie + movie1hash string + movie2hash string + movie1Map = map[string]types.AttributeValue{ + "info": &types.AttributeValueMemberNULL{Value: true}, + "title": &types.AttributeValueMemberS{Value: "A Movie Part 1"}, + "year": &types.AttributeValueMemberN{Value: "2022"}, + } + movie2Map = map[string]types.AttributeValue{ + "info": &types.AttributeValueMemberNULL{Value: true}, + "title": &types.AttributeValueMemberS{Value: "A Movie Part 2"}, + "year": &types.AttributeValueMemberN{Value: "2022"}, + } + writebackType = SYNCHRONOUS ) func setupTest() func() { @@ -77,7 +86,9 @@ func setupTest() func() { // writebackType defaults to synchronous but can be modified before calling `setupTest()` // you may also instantiate additional clients to test, passing different values for writebackType // to `getDdbClientWithMiddleware()` - ddbClient = getDdbClientWithMiddleware(momentoClient, &writebackType) + ddbSerializer = serializer.JSONSerializer{} + d := getDdbClientWithMiddleware(momentoClient, &writebackType, ddbSerializer) + ddbClient = d amazonConfig := mustGetAWSConfig() ddbControlClient := dynamodb.NewFromConfig(amazonConfig) @@ -104,11 +115,11 @@ func setupTest() func() { movie1 = movies[0] movie2 = movies[1] - movie1hash, err = ComputeCacheKey(tableName, movie1.getKey()) + movie1hash, err = ComputeCacheKey(tableName, movie1.getKey(), ddbSerializer) if err != nil { panic(err) } - movie2hash, err = ComputeCacheKey(tableName, movie2.getKey()) + movie2hash, err = ComputeCacheKey(tableName, movie2.getKey(), ddbSerializer) if err != nil { panic(err) } @@ -151,6 +162,7 @@ func testGetItemCacheMissCommon(t *testing.T) (Movie, responses.GetResponse) { if err != nil { t.Errorf("error occured calling momento get: %+v", err) } + return movie, getResp } @@ -160,14 +172,14 @@ func TestGetItemCacheMiss(t *testing.T) { movie, getResp := testGetItemCacheMissCommon(t) switch r := getResp.(type) { case *responses.GetHit: - movieInfo, err := getMapFromJsonBytes(r.ValueByte()) + movieInfo, err := getMovieFromBytes(r) if err != nil { t.Errorf("error decoding cache hit: %+v", err) } - if movieInfo["title"] != movie.Title { + if movieInfo.Title != movie.Title { t.Errorf("expected cache hit title to match dynamodb response: %+v != %+v", movieInfo, movie) } - if fmt.Sprint(movieInfo["year"]) != fmt.Sprint(movie.Year) { + if fmt.Sprint(movieInfo.Year) != fmt.Sprint(movie.Year) { t.Errorf("expected cache hit year to match dynamodb response: %+v != %+v", movieInfo, movie) } case *responses.GetMiss: @@ -199,10 +211,14 @@ func TestGetItemCacheHitAsync(t *testing.T) { func TestGetItemCacheHit(t *testing.T) { defer setupTest()() - _, err := momentoClient.Set(context.Background(), &momento.SetRequest{ + itemToCache, err := ddbSerializer.Serialize(movie1Map) + if err != nil { + t.Errorf("error serializing movie 1 map: %+v", err) + } + _, err = momentoClient.Set(context.Background(), &momento.SetRequest{ CacheName: tableName, Key: momento.String(movie1hash), - Value: momento.Bytes(movie1json2022), + Value: momento.Bytes(itemToCache), }) if err != nil { t.Errorf("error occured calling momento set: %+v", err) @@ -231,7 +247,7 @@ func TestGetItemCacheHit(t *testing.T) { func TestGetItemError(t *testing.T) { defer setupTest()() mmc := &mockMomentoClient{} - ddbClient := getDdbClientWithMiddleware(mmc, nil) + ddbClient := getDdbClientWithMiddleware(mmc, nil, ddbSerializer) // Execute GetItem Request as you would normally resp, err := ddbClient.GetItem(context.TODO(), &dynamodb.GetItemInput{ @@ -256,16 +272,24 @@ func TestGetItemError(t *testing.T) { func TestBatchGetItemAllHits(t *testing.T) { defer setupTest()() - _, err := momentoClient.SetBatch(context.Background(), &momento.SetBatchRequest{ + item1ToCache, err := ddbSerializer.Serialize(movie1Map) + if err != nil { + t.Errorf("error serializing movie 1 map: %+v", err) + } + item2ToCache, err := ddbSerializer.Serialize(movie1Map) + if err != nil { + t.Errorf("error serializing movie 2 map: %+v", err) + } + _, err = momentoClient.SetBatch(context.Background(), &momento.SetBatchRequest{ CacheName: tableName, Items: []momento.BatchSetItem{ { Key: momento.String(movie1hash), - Value: momento.Bytes(movie1json2022), + Value: momento.Bytes(item1ToCache), }, { Key: momento.String(movie2hash), - Value: momento.Bytes(movie2json2022), + Value: momento.Bytes(item2ToCache), }, }, }) @@ -359,11 +383,14 @@ func TestBatchGetItemAllMisses(t *testing.T) { for _, element := range r.Results() { switch e := element.(type) { case *responses.GetHit: - movieInfo, err := getMapFromJsonBytes(e.ValueByte()) + movieInfo, err := getMovieFromBytes(e) if err != nil { t.Errorf("error decoding cache hit: %+v", err) } - if fmt.Sprint(movieInfo["year"]) != fmt.Sprint(2021) { + if err != nil { + t.Errorf("error decoding cache hit: %+v", err) + } + if fmt.Sprint(movieInfo.Year) != fmt.Sprint(2021) { t.Errorf("expected cache hit year to match ddb response: %+v", movieInfo) } case *responses.GetMiss: @@ -398,10 +425,14 @@ func TestBatchGetItemAllMissesNoWriteback(t *testing.T) { // batch get tests - mixed hits and misses func testBatchGetItemsMixedCommon(t *testing.T) responses.GetBatchResponse { - _, err := momentoClient.Set(context.Background(), &momento.SetRequest{ + itemToCache, err := ddbSerializer.Serialize(movie1Map) + if err != nil { + t.Errorf("error serializing movie 1 map: %+v", err) + } + _, err = momentoClient.Set(context.Background(), &momento.SetRequest{ CacheName: tableName, Key: momento.String(movie1hash), - Value: momento.Bytes(movie1json2022), + Value: momento.Bytes(itemToCache), }) if err != nil { t.Errorf("error occured calling momento set: %+v", err) @@ -463,14 +494,14 @@ func TestBatchGetItemsMixed(t *testing.T) { for _, element := range r.Results() { switch e := element.(type) { case *responses.GetHit: - movieInfo, err := getMapFromJsonBytes(e.ValueByte()) + movieInfo, err := getMovieFromBytes(e) if err != nil { t.Errorf("error decoding cache hit: %+v", err) } - if movieInfo["title"] == "A Movie Part 1" && fmt.Sprint(movieInfo["year"]) != fmt.Sprint(2022) { + if movieInfo.Title == "A Movie Part 1" && fmt.Sprint(movieInfo.Year) != fmt.Sprint(2022) { t.Errorf("expected cache hit year to match ddb response: %+v", movieInfo) } - if movieInfo["title"] == "A Movie Part 2" && fmt.Sprint(movieInfo["year"]) != fmt.Sprint(2021) { + if movieInfo.Title == "A Movie Part 2" && fmt.Sprint(movieInfo.Year) != fmt.Sprint(2021) { t.Errorf("expected ddb hit year: %+v", movieInfo) } case *responses.GetMiss: @@ -494,11 +525,11 @@ func TestBatchGetItemsMixedNoWriteback(t *testing.T) { for _, element := range r.Results() { switch e := element.(type) { case *responses.GetHit: - movieInfo, err := getMapFromJsonBytes(e.ValueByte()) + movieInfo, err := getMovieFromBytes(e) if err != nil { t.Errorf("error decoding cache hit: %+v", err) } - if movieInfo["title"] != "A Movie Part 1" { + if movieInfo.Title != "A Movie Part 1" { t.Errorf("expected cache miss but got: %+v", movieInfo) } } @@ -510,7 +541,7 @@ func TestBatchGetItemsMixedNoWriteback(t *testing.T) { func TestBatchGetItemsError(t *testing.T) { defer setupTest()() mmc := &mockMomentoClient{} - ddbClient := getDdbClientWithMiddleware(mmc, nil) + ddbClient := getDdbClientWithMiddleware(mmc, nil, ddbSerializer) req := &dynamodb.BatchGetItemInput{ RequestItems: map[string]types.KeysAndAttributes{ @@ -578,13 +609,16 @@ func mustGetAWSConfig() aws.Config { return cfg } -func getMapFromJsonBytes(jsonBytes []byte) (map[string]interface{}, error) { - var myMap map[string]interface{} - err := json.NewDecoder(bytes.NewReader(jsonBytes)).Decode(&myMap) +func getMovieFromBytes(r *responses.GetHit) (Movie, error) { + d, err := ddbSerializer.Deserialize(r.ValueByte()) if err != nil { - return nil, err + return Movie{}, err } - return myMap, nil + movie, err := getMovieFromDdbItem(d) + if err != nil { + return Movie{}, err + } + return movie, nil } func getMovieFromDdbItem(item map[string]types.AttributeValue) (Movie, error) { @@ -596,17 +630,19 @@ func getMovieFromDdbItem(item map[string]types.AttributeValue) (Movie, error) { return movie, nil } -func getDdbClientWithMiddleware(momentoClient momento.CacheClient, writebackType *WritebackType) *dynamodb.Client { +func getDdbClientWithMiddleware(momentoClient momento.CacheClient, writebackType *WritebackType, s Serializer) *dynamodb.Client { amazonConfiguration := mustGetAWSConfig() var wb WritebackType if writebackType != nil { wb = *writebackType } + AttachNewCachingMiddleware(MiddlewareProps{ - &amazonConfiguration, - tableName, - momentoClient, - wb, + AwsConfig: &amazonConfiguration, + CacheName: tableName, + MomentoClient: momentoClient, + WritebackType: wb, + Serializer: s, }) return dynamodb.NewFromConfig(amazonConfiguration) } diff --git a/go.mod b/go.mod index 2e2d6b3..ea29b36 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/aws/smithy-go v1.21.0 github.com/google/uuid v1.6.0 github.com/momentohq/client-sdk-go v1.28.2 + github.com/vmihailenco/msgpack/v5 v5.4.1 ) require ( @@ -29,6 +30,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 // indirect github.com/golang-jwt/jwt/v4 v4.3.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 4f54ac6..4520975 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,12 @@ github.com/onsi/gomega v1.26.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdM github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= diff --git a/internal/serializer/bench_test.go b/internal/serializer/bench_test.go new file mode 100644 index 0000000..d480224 --- /dev/null +++ b/internal/serializer/bench_test.go @@ -0,0 +1,155 @@ +package serializer + +import ( + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +type testPayload struct { + name string + item map[string]types.AttributeValue +} + +var testPayloads = []testPayload{ + {name: "SmallItem", item: smallItem}, + {name: "LargeItem", item: largeSizeItem}, + {name: "ComplexItem", item: complexItem}, +} + +type Serializer interface { + Name() string + Serialize(item map[string]types.AttributeValue) ([]byte, error) + Deserialize(data []byte) (map[string]types.AttributeValue, error) +} + +func BenchmarkSerialization(b *testing.B) { + serializers := []struct { + name string + serializer Serializer + }{ + {name: "JSON", serializer: JSONSerializer{}}, + {name: "MsgPack", serializer: MsgPackSerializer{}}, + } + + for _, serializer := range serializers { + for _, payload := range testPayloads { + b.Run(serializer.name+"Marshal/"+payload.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := serializer.serializer.Serialize(payload.item) + if err != nil { + b.Error(err) + } + } + }) + + b.Run(serializer.name+"Unmarshal/"+payload.name, func(b *testing.B) { + data, err := serializer.serializer.Serialize(payload.item) + if err != nil { + b.Error(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := serializer.serializer.Deserialize(data) + if err != nil { + b.Error(err) + } + } + }) + } + } +} + +var smallItem = map[string]types.AttributeValue{ + "info": &types.AttributeValueMemberNULL{Value: true}, + "title": &types.AttributeValueMemberS{Value: "A Movie Part 1"}, + "year": &types.AttributeValueMemberN{Value: "2022"}, +} + +var largeSizeItem = map[string]types.AttributeValue{ + "ID": &types.AttributeValueMemberN{Value: "123456"}, + "Name": &types.AttributeValueMemberS{Value: "Test Item"}, + "LargeText": &types.AttributeValueMemberS{Value: strings.Repeat("a", 10_000)}, + "Category": &types.AttributeValueMemberS{Value: "CategoryA"}, + "Active": &types.AttributeValueMemberBOOL{Value: true}, +} + +var complexItem = map[string]types.AttributeValue{ + "Name": &types.AttributeValueMemberS{Value: "Test Item"}, + "ID": &types.AttributeValueMemberN{Value: "12345"}, + "Active": &types.AttributeValueMemberBOOL{Value: true}, + "Tags": &types.AttributeValueMemberSS{Value: []string{"tag1", "tag2", "tag3"}}, + "Ratings": &types.AttributeValueMemberNS{Value: []string{"5", "4", "3"}}, + "Data": &types.AttributeValueMemberBS{Value: [][]byte{[]byte("binarydata1"), []byte("binarydata2")}}, + "Attributes": &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "Height": &types.AttributeValueMemberN{Value: "180"}, + "Weight": &types.AttributeValueMemberN{Value: "75"}, + "Skills": &types.AttributeValueMemberSS{Value: []string{"skill1", "skill2"}}, + "Scores": &types.AttributeValueMemberNS{Value: []string{"10", "9.5", "8"}}, + "Active": &types.AttributeValueMemberBOOL{Value: false}, + "Deleted": &types.AttributeValueMemberNULL{Value: true}, + }, + }, + "Items": &types.AttributeValueMemberL{ + Value: []types.AttributeValue{ + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "ItemID": &types.AttributeValueMemberN{Value: "101"}, + "ItemName": &types.AttributeValueMemberS{Value: "Sub Item 1"}, + "Price": &types.AttributeValueMemberN{Value: "25.50"}, + "InStock": &types.AttributeValueMemberBOOL{Value: true}, + }, + }, + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "ItemID": &types.AttributeValueMemberN{Value: "102"}, + "ItemName": &types.AttributeValueMemberS{Value: "Sub Item 2"}, + "Price": &types.AttributeValueMemberN{Value: "15.75"}, + "InStock": &types.AttributeValueMemberBOOL{Value: false}, + }, + }, + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "ItemID": &types.AttributeValueMemberN{Value: "103"}, + "ItemName": &types.AttributeValueMemberS{Value: "Sub Item 3"}, + "Price": &types.AttributeValueMemberN{Value: "100.00"}, + "InStock": &types.AttributeValueMemberBOOL{Value: true}, + }, + }, + }, + }, + "History": &types.AttributeValueMemberL{ + Value: []types.AttributeValue{ + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "Action": &types.AttributeValueMemberS{Value: "Created"}, + "Timestamp": &types.AttributeValueMemberS{Value: "2023-01-01T12:00:00Z"}, + }, + }, + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "Action": &types.AttributeValueMemberS{Value: "Updated"}, + "Timestamp": &types.AttributeValueMemberS{Value: "2023-03-01T09:30:00Z"}, + }, + }, + &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "Action": &types.AttributeValueMemberS{Value: "Deleted"}, + "Timestamp": &types.AttributeValueMemberS{Value: "2023-04-15T16:45:00Z"}, + }, + }, + }, + }, + "RelatedItems": &types.AttributeValueMemberL{ + Value: []types.AttributeValue{ + &types.AttributeValueMemberSS{Value: []string{"relItem1", "relItem2"}}, + &types.AttributeValueMemberSS{Value: []string{"relItem3"}}, + }, + }, + "Notes": &types.AttributeValueMemberNULL{Value: true}, + "Quantity": &types.AttributeValueMemberN{Value: "50"}, + "Discount": &types.AttributeValueMemberN{Value: "5.00"}, +} diff --git a/internal/serializer/serializer.go b/internal/serializer/serializer.go new file mode 100644 index 0000000..95254b3 --- /dev/null +++ b/internal/serializer/serializer.go @@ -0,0 +1,70 @@ +package serializer + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/vmihailenco/msgpack/v5" +) + +type MsgPackSerializer struct{} + +func (d MsgPackSerializer) Name() string { + return "MsgPack" +} + +func (d MsgPackSerializer) Serialize(item map[string]types.AttributeValue) ([]byte, error) { + var t map[string]interface{} + err := attributevalue.UnmarshalMap(item, &t) + if err != nil { + panic(err) + } + return msgpack.Marshal(t) +} + +func (d MsgPackSerializer) Deserialize(data []byte) (map[string]types.AttributeValue, error) { + var item map[string]interface{} + err := msgpack.Unmarshal(data, &item) + if err != nil { + panic(err) + } + return attributevalue.MarshalMap(item) +} + +type JSONSerializer struct{} + +func (d JSONSerializer) Name() string { + return "JSON" +} + +func (d JSONSerializer) Serialize(item map[string]types.AttributeValue) ([]byte, error) { + // unmarshal raw response object to DDB attribute values map + var t map[string]interface{} + err := attributevalue.UnmarshalMap(item, &t) + if err != nil { + return nil, fmt.Errorf("error decoding output item to store in cache err=%+v", err) + } + + // Marshal to JSON to store in cache + j, err := json.Marshal(t) + if err != nil { + return nil, fmt.Errorf("error json encoding new item to store in cache err=%+v", err) + } + return j, nil +} + +func (d JSONSerializer) Deserialize(data []byte) (map[string]types.AttributeValue, error) { + var t map[string]interface{} + err := json.NewDecoder(bytes.NewReader(data)).Decode(&t) + if err != nil { + return nil, fmt.Errorf("error decoding json item in cache to return: %w", err) + } + + marshalMap, err := attributevalue.MarshalMap(t) + if err != nil { + return nil, fmt.Errorf("error encoding item in cache to ddbItem to return: %w", err) + } + return marshalMap, nil +}