diff --git a/sources/mongo/collection.go b/sources/mongo/collection.go index aa9c44be..15eb96e3 100644 --- a/sources/mongo/collection.go +++ b/sources/mongo/collection.go @@ -3,6 +3,7 @@ package mongo import ( "context" "fmt" + "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" "go.mongodb.org/mongo-driver/bson" @@ -56,12 +57,12 @@ func (c *collectionScanner) Next() ([]lib.RawMessage, error) { return nil, fmt.Errorf("failed to decode document: %w", err) } - mgoMsg, err := parseMessage(result) + mgoMsg, err := ParseMessage(result) if err != nil { return nil, fmt.Errorf("failed to parse message: %w", err) } - rawMsg, err := mgoMsg.toRawMessage(c.collection, c.cfg.Database) + rawMsg, err := mgoMsg.ToRawMessage(c.collection, c.cfg.Database) if err != nil { return nil, fmt.Errorf("failed to create raw message: %w", err) } diff --git a/sources/mongo/message.go b/sources/mongo/message.go index 36576067..33d1cef1 100644 --- a/sources/mongo/message.go +++ b/sources/mongo/message.go @@ -17,7 +17,7 @@ type mgoMessage struct { pkMap map[string]any } -func (m *mgoMessage) toRawMessage(collection config.Collection, database string) (lib.RawMessage, error) { +func (m *mgoMessage) ToRawMessage(collection config.Collection, database string) (lib.RawMessage, error) { evt := &mongo.SchemaEventPayload{ Schema: debezium.Schema{}, Payload: mongo.Payload{ @@ -38,7 +38,7 @@ func (m *mgoMessage) toRawMessage(collection config.Collection, database string) return lib.NewRawMessage(collection.TopicSuffix(database), pkMap, evt), nil } -func parseMessage(result bson.M) (*mgoMessage, error) { +func ParseMessage(result bson.M) (*mgoMessage, error) { jsonExtendedBytes, err := bson.MarshalExtJSON(result, false, false) if err != nil { return nil, fmt.Errorf("failed to marshal document to JSON extended: %w", err) diff --git a/sources/mongo/message_test.go b/sources/mongo/message_test.go index cf563e15..e5ac4941 100644 --- a/sources/mongo/message_test.go +++ b/sources/mongo/message_test.go @@ -18,11 +18,11 @@ import ( func TestParseMessagePartitionKey(t *testing.T) { objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011") assert.NoError(t, err) - msg, err := parseMessage(bson.M{"_id": objId}) + msg, err := ParseMessage(bson.M{"_id": objId}) assert.NoError(t, err) assert.Equal(t, `{"$oid":"507f1f77bcf86cd799439011"}`, msg.pkMap["id"]) - rawMsg, err := msg.toRawMessage(config.Collection{}, "database") + rawMsg, err := msg.ToRawMessage(config.Collection{}, "database") assert.NoError(t, err) rawMsgBytes, err := json.Marshal(rawMsg.PartitionKey()) @@ -41,7 +41,7 @@ func TestParseMessage(t *testing.T) { decimal, err := primitive.ParseDecimal128("1234.5") assert.NoError(t, err) - msg, err := parseMessage( + msg, err := ParseMessage( bson.M{ "_id": objId, "string": "Hello, world!", @@ -64,7 +64,7 @@ func TestParseMessage(t *testing.T) { }) assert.NoError(t, err) - rawMsg, err := msg.toRawMessage(config.Collection{}, "database") + rawMsg, err := msg.ToRawMessage(config.Collection{}, "database") assert.NoError(t, err) rawPkBytes, err := json.Marshal(rawMsg.PartitionKey()) diff --git a/writers/transfer/writer.go b/writers/transfer/writer.go index e3a1a5cc..b911d760 100644 --- a/writers/transfer/writer.go +++ b/writers/transfer/writer.go @@ -2,12 +2,12 @@ package transfer import ( "context" + "encoding/json" "fmt" "time" - "github.com/artie-labs/reader/lib" - "github.com/artie-labs/reader/lib/mtr" "github.com/artie-labs/transfer/lib/artie" + "github.com/artie-labs/transfer/lib/cdc/mongo" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/destination" @@ -15,6 +15,9 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/models" "github.com/artie-labs/transfer/models/event" + + "github.com/artie-labs/reader/lib" + "github.com/artie-labs/reader/lib/mtr" ) type Writer struct { @@ -43,6 +46,25 @@ func NewWriter(cfg config.Config, statsD mtr.Client) (*Writer, error) { }, nil } +func (w *Writer) messageToEvent(message lib.RawMessage) (event.Event, error) { + evt := message.Event() + + if mongoEvt, ok := evt.(*mongo.SchemaEventPayload); ok { + bytes, err := json.Marshal(mongoEvt) + if err != nil { + return event.Event{}, err + } + + var dbz mongo.Debezium + evt, err = dbz.GetEventFromBytes(w.cfg.SharedTransferConfig.TypingSettings, bytes) + if err != nil { + return event.Event{}, err + } + } + + return event.ToMemoryEvent(evt, message.PartitionKey(), w.tc, config.Replication), nil +} + func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error { if len(messages) == 0 { return nil @@ -50,7 +72,11 @@ func (w *Writer) Write(_ context.Context, messages []lib.RawMessage) error { var events []event.Event for _, message := range messages { - events = append(events, event.ToMemoryEvent(message.Event(), message.PartitionKey(), w.tc, config.Replication)) + evt, err := w.messageToEvent(message) + if err != nil { + return err + } + events = append(events, evt) } tags := map[string]string{ diff --git a/writers/transfer/writer_test.go b/writers/transfer/writer_test.go new file mode 100644 index 00000000..636b49aa --- /dev/null +++ b/writers/transfer/writer_test.go @@ -0,0 +1,47 @@ +package transfer + +import ( + "testing" + + transferCfg "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/kafkalib" + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + + "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/sources/mongo" +) + +func TestWriter_MessageToEvent(t *testing.T) { + objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011") + assert.NoError(t, err) + + msg, err := mongo.ParseMessage(bson.M{ + "_id": objId, + "string": "Hello, world!", + "int64": int64(1234567890), + "double": 3.14159, + }) + assert.NoError(t, err) + + message, err := msg.ToRawMessage(config.Collection{Name: "collection"}, "database") + assert.NoError(t, err) + + writer := Writer{ + cfg: transferCfg.Config{ + SharedTransferConfig: transferCfg.SharedTransferConfig{}, + }, + tc: &kafkalib.TopicConfig{}, + } + evtOut, err := writer.messageToEvent(message) + assert.NoError(t, err) + assert.Equal(t, map[string]any{ + "__artie_delete": false, + "_id": "507f1f77bcf86cd799439011", + "double": 3.14159, + "int64": 1.23456789e+09, + "payload": map[string]any{"id": "{\"$oid\":\"507f1f77bcf86cd799439011\"}"}, + "string": "Hello, world!", + }, evtOut.Data) +}