diff --git a/pom.xml b/pom.xml index 9dba418..a6c8e07 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.blueapron kafka-connect-protobuf-converter jar - 2.0.0 + 2.0.1 3.4.0 diff --git a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java index 7312ad1..d537aca 100644 --- a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java +++ b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java @@ -3,6 +3,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.OneofDescriptor; import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; @@ -21,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import com.google.protobuf.util.Timestamps; @@ -81,12 +83,6 @@ private String getProtoFieldName(String descriptorForTypeName, String connectFie return connectProtoNameMap.get(getProtoMapKey(descriptorForTypeName, connectFieldName)); } - private boolean isUnsetOneof(Descriptors.FieldDescriptor fieldDescriptor, Object value) { - return fieldDescriptor.getContainingOneof() != null && - fieldDescriptor.getType() != MESSAGE && - fieldDescriptor.getDefaultValue().equals(value); - } - ProtobufData(Class clazz, String legacyName) { this.legacyName = legacyName; @@ -225,6 +221,13 @@ private boolean isProtobufDate(Schema schema) { return Date.SCHEMA.name().equals(schema.name()); } + private void setStructField(Schema schema, Message message, Struct result, Descriptors.FieldDescriptor fieldDescriptor) { + final String fieldName = getConnectFieldName(fieldDescriptor); + final Field field = schema.field(fieldName); + Object obj = message.getField(fieldDescriptor); + result.put(fieldName, toConnectData(field.schema(), obj)); + } + Object toConnectData(Schema schema, Object value) { try { if (isProtobufTimestamp(schema)) { @@ -329,15 +332,20 @@ Object toConnectData(Schema schema, Object value) { } final Struct result = new Struct(schema.schema()); + final Descriptors.Descriptor descriptor = message.getDescriptorForType(); - for (Descriptors.FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { - final String fieldName = getConnectFieldName(fieldDescriptor); - final Field field = schema.field(fieldName); + for (OneofDescriptor oneOfDescriptor : descriptor.getOneofs()) { + final Descriptors.FieldDescriptor fieldDescriptor = message.getOneofFieldDescriptor(oneOfDescriptor); + setStructField(schema, message, result, fieldDescriptor); + } - Object obj = message.getField(fieldDescriptor); - if (!isUnsetOneof(fieldDescriptor, obj)) { - result.put(fieldName, toConnectData(field.schema(), obj)); + for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) { + Descriptors.OneofDescriptor oneOfDescriptor = fieldDescriptor.getContainingOneof(); + if (oneOfDescriptor != null) { + continue; } + + setStructField(schema, message, result, fieldDescriptor); } converted = result; diff --git a/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java b/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java index df947a5..3632bdc 100644 --- a/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java +++ b/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java @@ -86,6 +86,14 @@ private Schema getExpectedNestedTestProtoSchemaIntUserId() { return getExpectedNestedTestProtoSchema(); } + private SchemaBuilder getComplexTypeSchemaBuilder() { + final SchemaBuilder complexTypeBuilder = SchemaBuilder.struct(); + complexTypeBuilder.field("one_id", SchemaBuilder.string().optional().build()); + complexTypeBuilder.field("other_id", SchemaBuilder.int32().optional().build()); + complexTypeBuilder.field("is_active", SchemaBuilder.bool().optional().build()); + return complexTypeBuilder; + } + private Schema getExpectedNestedTestProtoSchema() { final SchemaBuilder builder = SchemaBuilder.struct(); final SchemaBuilder userIdBuilder = SchemaBuilder.struct(); @@ -99,11 +107,7 @@ private Schema getExpectedNestedTestProtoSchema() { builder.field("experiments_active", SchemaBuilder.array(SchemaBuilder.string().optional().build()).optional().build()); builder.field("updated_at", org.apache.kafka.connect.data.Timestamp.builder().optional().build()); builder.field("status", SchemaBuilder.string().optional().build()); - final SchemaBuilder complexTypeBuilder = SchemaBuilder.struct(); - complexTypeBuilder.field("one_id", SchemaBuilder.string().optional().build()); - complexTypeBuilder.field("other_id", SchemaBuilder.int32().optional().build()); - complexTypeBuilder.field("is_active", SchemaBuilder.bool().optional().build()); - builder.field("complex_type", complexTypeBuilder.optional().build()); + builder.field("complex_type", getComplexTypeSchemaBuilder().optional().build()); builder.field("map_type", SchemaBuilder.array(SchemaBuilder.struct().field("key", Schema.OPTIONAL_STRING_SCHEMA).field("value", Schema.OPTIONAL_STRING_SCHEMA).optional().build()).optional().build()); return builder.build(); } @@ -161,6 +165,27 @@ private Struct getExpectedNestedTestProtoResultIntUserId() throws ParseException return result; } + private NestedTestProtoOuterClass.ComplexType createProtoDefaultOneOf() throws ParseException { + NestedTestProtoOuterClass.ComplexType.Builder complexTypeBuilder = NestedTestProtoOuterClass.ComplexType.newBuilder(); + complexTypeBuilder.setOtherId(0); + return complexTypeBuilder.build(); + } + + private NestedTestProtoOuterClass.ComplexType createProtoMultipleSetOneOf() throws ParseException { + NestedTestProtoOuterClass.ComplexType.Builder complexTypeBuilder = NestedTestProtoOuterClass.ComplexType.newBuilder(); + complexTypeBuilder.setOneId("asdf"); + complexTypeBuilder.setOtherId(0); + return complexTypeBuilder.build(); + } + + private Struct getExpectedComplexTypeProtoWithDefaultOneOf() { + Schema schema = getComplexTypeSchemaBuilder().build(); + Struct result = new Struct(schema.schema()); + result.put("other_id", 0); + result.put("is_active", false); + return result; + } + private void assertSchemasEqual(Schema expectedSchema, Schema actualSchema) { assertEquals(expectedSchema.type(), actualSchema.type()); assertEquals(expectedSchema.isOptional(), actualSchema.isOptional()); @@ -196,6 +221,26 @@ public void testToConnectDataWithNestedProtobufMessageAndIntUserId() throws Pars assertEquals(new SchemaAndValue(getExpectedNestedTestProtoSchemaIntUserId(), getExpectedNestedTestProtoResultIntUserId()), result); } + @Test + public void testToConnectDataDefaultOneOf() throws ParseException { + Schema schema = getComplexTypeSchemaBuilder().build(); + NestedTestProtoOuterClass.ComplexType message = createProtoDefaultOneOf(); + ProtobufData protobufData = new ProtobufData(NestedTestProtoOuterClass.ComplexType.class, LEGACY_NAME); + SchemaAndValue result = protobufData.toConnectData(message.toByteArray()); + assertSchemasEqual(schema, result.schema()); + assertEquals(new SchemaAndValue(schema, getExpectedComplexTypeProtoWithDefaultOneOf()), result); + } + + @Test + public void testToConnectDataDefaultOneOfCannotHaveTwoOneOfsSet() throws ParseException { + Schema schema = getComplexTypeSchemaBuilder().build(); + NestedTestProtoOuterClass.ComplexType message = createProtoMultipleSetOneOf(); + ProtobufData protobufData = new ProtobufData(NestedTestProtoOuterClass.ComplexType.class, LEGACY_NAME); + SchemaAndValue result = protobufData.toConnectData(message.toByteArray()); + assertSchemasEqual(schema, result.schema()); + assertEquals(new SchemaAndValue(schema, getExpectedComplexTypeProtoWithDefaultOneOf()), result); + } + // Data Conversion tests @Test public void testToConnectSupportsOptionalValues() {