Skip to content

Commit

Permalink
[mongodb] Fix issue with payload not being written to transfer (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Apr 12, 2024
1 parent a0b8473 commit 0d773ea
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 11 deletions.
5 changes: 3 additions & 2 deletions sources/mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mongo
import (
"context"
"fmt"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -56,12 +57,12 @@ func (c *collectionScanner) Next() ([]lib.RawMessage, error) {
return nil, fmt.Errorf("failed to decode document: %w", err)
}

mgoMsg, err := parseMessage(result)
mgoMsg, err := ParseMessage(result)
if err != nil {
return nil, fmt.Errorf("failed to parse message: %w", err)
}

rawMsg, err := mgoMsg.toRawMessage(c.collection, c.cfg.Database)
rawMsg, err := mgoMsg.ToRawMessage(c.collection, c.cfg.Database)
if err != nil {
return nil, fmt.Errorf("failed to create raw message: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions sources/mongo/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type mgoMessage struct {
pkMap map[string]any
}

func (m *mgoMessage) toRawMessage(collection config.Collection, database string) (lib.RawMessage, error) {
func (m *mgoMessage) ToRawMessage(collection config.Collection, database string) (lib.RawMessage, error) {
evt := &mongo.SchemaEventPayload{
Schema: debezium.Schema{},
Payload: mongo.Payload{
Expand All @@ -38,7 +38,7 @@ func (m *mgoMessage) toRawMessage(collection config.Collection, database string)
return lib.NewRawMessage(collection.TopicSuffix(database), pkMap, evt), nil
}

func parseMessage(result bson.M) (*mgoMessage, error) {
func ParseMessage(result bson.M) (*mgoMessage, error) {
jsonExtendedBytes, err := bson.MarshalExtJSON(result, false, false)
if err != nil {
return nil, fmt.Errorf("failed to marshal document to JSON extended: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions sources/mongo/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
func TestParseMessagePartitionKey(t *testing.T) {
objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011")
assert.NoError(t, err)
msg, err := parseMessage(bson.M{"_id": objId})
msg, err := ParseMessage(bson.M{"_id": objId})
assert.NoError(t, err)
assert.Equal(t, `{"$oid":"507f1f77bcf86cd799439011"}`, msg.pkMap["id"])

rawMsg, err := msg.toRawMessage(config.Collection{}, "database")
rawMsg, err := msg.ToRawMessage(config.Collection{}, "database")
assert.NoError(t, err)

rawMsgBytes, err := json.Marshal(rawMsg.PartitionKey())
Expand All @@ -41,7 +41,7 @@ func TestParseMessage(t *testing.T) {
decimal, err := primitive.ParseDecimal128("1234.5")
assert.NoError(t, err)

msg, err := parseMessage(
msg, err := ParseMessage(
bson.M{
"_id": objId,
"string": "Hello, world!",
Expand All @@ -64,7 +64,7 @@ func TestParseMessage(t *testing.T) {
})
assert.NoError(t, err)

rawMsg, err := msg.toRawMessage(config.Collection{}, "database")
rawMsg, err := msg.ToRawMessage(config.Collection{}, "database")
assert.NoError(t, err)

rawPkBytes, err := json.Marshal(rawMsg.PartitionKey())
Expand Down
32 changes: 29 additions & 3 deletions writers/transfer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package transfer

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/cdc/mongo"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/utils"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/models"
"github.com/artie-labs/transfer/models/event"

"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/mtr"
)

type Writer struct {
Expand Down Expand Up @@ -43,14 +46,37 @@ func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) {
}, nil
}

func (w *Writer) messageToEvent(message lib.RawMessage) (event.Event, error) {
evt := message.Event()

if mongoEvt, ok := evt.(*mongo.SchemaEventPayload); ok {
bytes, err := json.Marshal(mongoEvt)
if err != nil {
return event.Event{}, err
}

var dbz mongo.Debezium
evt, err = dbz.GetEventFromBytes(w.cfg.SharedTransferConfig.TypingSettings, bytes)
if err != nil {
return event.Event{}, err
}
}

return event.ToMemoryEvent(evt, message.PartitionKey(), w.tc, config.Replication), nil
}

func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error {
if len(messages) == 0 {
return nil
}

var events []event.Event
for _, message := range messages {
events = append(events, event.ToMemoryEvent(message.Event(), message.PartitionKey(), w.tc, config.Replication))
evt, err := w.messageToEvent(message)
if err != nil {
return err
}
events = append(events, evt)
}

tags := map[string]string{
Expand Down
47 changes: 47 additions & 0 deletions writers/transfer/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package transfer

import (
"testing"

transferCfg "github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/sources/mongo"
)

func TestWriter_MessageToEvent(t *testing.T) {
objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011")
assert.NoError(t, err)

msg, err := mongo.ParseMessage(bson.M{
"_id": objId,
"string": "Hello, world!",
"int64": int64(1234567890),
"double": 3.14159,
})
assert.NoError(t, err)

message, err := msg.ToRawMessage(config.Collection{Name: "collection"}, "database")
assert.NoError(t, err)

writer := Writer{
cfg: transferCfg.Config{
SharedTransferConfig: transferCfg.SharedTransferConfig{},
},
tc: &kafkalib.TopicConfig{},
}
evtOut, err := writer.messageToEvent(message)
assert.NoError(t, err)
assert.Equal(t, map[string]any{
"__artie_delete": false,
"_id": "507f1f77bcf86cd799439011",
"double": 3.14159,
"int64": 1.23456789e+09,
"payload": map[string]any{"id": "{\"$oid\":\"507f1f77bcf86cd799439011\"}"},
"string": "Hello, world!",
}, evtOut.Data)
}

0 comments on commit 0d773ea

Please sign in to comment.