From ef85a7a4eff11b9f8c92150b1aa12b0a3c16e975 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Dec 2024 14:50:50 -0800 Subject: [PATCH] Fix MongoDB primary key (#629) --- integration_tests/mongo/main.go | 3 ++- lib/mongo/message.go | 7 +------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/integration_tests/mongo/main.go b/integration_tests/mongo/main.go index 1e3650240..3df539ca3 100644 --- a/integration_tests/mongo/main.go +++ b/integration_tests/mongo/main.go @@ -122,7 +122,8 @@ func testTypes(ctx context.Context, db *mongo.Database, mongoCfg config.MongoDB) } row := rows[0] - expectedPartitionKey := map[string]any{"payload": map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`}} + // This should not include the payload field in here. The payload field gets injected in [kafkalib.buildKafkaMessageWrapper] + expectedPartitionKey := map[string]any{"id": `{"$oid":"66a95fae3776c2f21f0ff568"}`} expectedPkBytes, err := json.Marshal(expectedPartitionKey) if err != nil { return fmt.Errorf("failed to marshal expected partition key: %w", err) diff --git a/lib/mongo/message.go b/lib/mongo/message.go index a66dba304..bb1ebff32 100644 --- a/lib/mongo/message.go +++ b/lib/mongo/message.go @@ -36,13 +36,8 @@ func (m *Message) ToRawMessage(collection config.Collection, database string) (l Operation: m.operation, }, } - - pkMap := map[string]any{ - "payload": m.pkMap, - } - // MongoDB wouldn't include the schema. - return lib.NewRawMessage(collection.TopicSuffix(database), debezium.FieldsObject{}, pkMap, evt), nil + return lib.NewRawMessage(collection.TopicSuffix(database), debezium.FieldsObject{}, m.pkMap, evt), nil } func ParseMessage(after bson.M, before *bson.M, op string) (*Message, error) {