From 2b07eafc56564f9c3603b626a9409253f9bd4915 Mon Sep 17 00:00:00 2001 From: makearl Date: Thu, 31 May 2018 16:56:17 -0400 Subject: [PATCH 1/3] Add tests and suggested fix for oneof handling --- .../connect/protobuf/ProtobufData.java | 18 ++++-- .../connect/protobuf/ProtobufDataTest.java | 55 +++++++++++++++++-- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java index 7312ad1..249c592 100644 --- a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java +++ b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java @@ -331,11 +331,19 @@ Object toConnectData(Schema schema, Object value) { final Struct result = new Struct(schema.schema()); for (Descriptors.FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { - final String fieldName = getConnectFieldName(fieldDescriptor); - final Field field = schema.field(fieldName); - - Object obj = message.getField(fieldDescriptor); - if (!isUnsetOneof(fieldDescriptor, obj)) { + Descriptors.OneofDescriptor oneOfDescriptor = fieldDescriptor.getContainingOneof(); + if (oneOfDescriptor != null) { + Descriptors.FieldDescriptor oneFieldDescriptor = message.getOneofFieldDescriptor(oneOfDescriptor); + + String oneFieldName = getConnectFieldName(oneFieldDescriptor); + Field oneField = schema.field(oneFieldName); + Object oneValue = message.getField(oneFieldDescriptor); + + result.put(oneFieldName, toConnectData(oneField.schema(), oneValue)); + } else { + final String fieldName = getConnectFieldName(fieldDescriptor); + final Field field = schema.field(fieldName); + Object obj = message.getField(fieldDescriptor); result.put(fieldName, toConnectData(field.schema(), obj)); } } diff --git a/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java b/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java index df947a5..3632bdc 100644 --- a/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java +++ b/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java @@ -86,6 +86,14 @@ private Schema getExpectedNestedTestProtoSchemaIntUserId() { return getExpectedNestedTestProtoSchema(); } + private SchemaBuilder getComplexTypeSchemaBuilder() { + final SchemaBuilder complexTypeBuilder = SchemaBuilder.struct(); + complexTypeBuilder.field("one_id", SchemaBuilder.string().optional().build()); + complexTypeBuilder.field("other_id", SchemaBuilder.int32().optional().build()); + complexTypeBuilder.field("is_active", SchemaBuilder.bool().optional().build()); + return complexTypeBuilder; + } + private Schema getExpectedNestedTestProtoSchema() { final SchemaBuilder builder = SchemaBuilder.struct(); final SchemaBuilder userIdBuilder = SchemaBuilder.struct(); @@ -99,11 +107,7 @@ private Schema getExpectedNestedTestProtoSchema() { builder.field("experiments_active", SchemaBuilder.array(SchemaBuilder.string().optional().build()).optional().build()); builder.field("updated_at", org.apache.kafka.connect.data.Timestamp.builder().optional().build()); builder.field("status", SchemaBuilder.string().optional().build()); - final SchemaBuilder complexTypeBuilder = SchemaBuilder.struct(); - complexTypeBuilder.field("one_id", SchemaBuilder.string().optional().build()); - complexTypeBuilder.field("other_id", SchemaBuilder.int32().optional().build()); - complexTypeBuilder.field("is_active", SchemaBuilder.bool().optional().build()); - builder.field("complex_type", complexTypeBuilder.optional().build()); + builder.field("complex_type", getComplexTypeSchemaBuilder().optional().build()); builder.field("map_type", SchemaBuilder.array(SchemaBuilder.struct().field("key", Schema.OPTIONAL_STRING_SCHEMA).field("value", Schema.OPTIONAL_STRING_SCHEMA).optional().build()).optional().build()); return builder.build(); } @@ -161,6 +165,27 @@ private Struct getExpectedNestedTestProtoResultIntUserId() throws ParseException return result; } + private NestedTestProtoOuterClass.ComplexType createProtoDefaultOneOf() throws ParseException { + NestedTestProtoOuterClass.ComplexType.Builder complexTypeBuilder = NestedTestProtoOuterClass.ComplexType.newBuilder(); + complexTypeBuilder.setOtherId(0); + return complexTypeBuilder.build(); + } + + private NestedTestProtoOuterClass.ComplexType createProtoMultipleSetOneOf() throws ParseException { + NestedTestProtoOuterClass.ComplexType.Builder complexTypeBuilder = NestedTestProtoOuterClass.ComplexType.newBuilder(); + complexTypeBuilder.setOneId("asdf"); + complexTypeBuilder.setOtherId(0); + return complexTypeBuilder.build(); + } + + private Struct getExpectedComplexTypeProtoWithDefaultOneOf() { + Schema schema = getComplexTypeSchemaBuilder().build(); + Struct result = new Struct(schema.schema()); + result.put("other_id", 0); + result.put("is_active", false); + return result; + } + private void assertSchemasEqual(Schema expectedSchema, Schema actualSchema) { assertEquals(expectedSchema.type(), actualSchema.type()); assertEquals(expectedSchema.isOptional(), actualSchema.isOptional()); @@ -196,6 +221,26 @@ public void testToConnectDataWithNestedProtobufMessageAndIntUserId() throws Pars assertEquals(new SchemaAndValue(getExpectedNestedTestProtoSchemaIntUserId(), getExpectedNestedTestProtoResultIntUserId()), result); } + @Test + public void testToConnectDataDefaultOneOf() throws ParseException { + Schema schema = getComplexTypeSchemaBuilder().build(); + NestedTestProtoOuterClass.ComplexType message = createProtoDefaultOneOf(); + ProtobufData protobufData = new ProtobufData(NestedTestProtoOuterClass.ComplexType.class, LEGACY_NAME); + SchemaAndValue result = protobufData.toConnectData(message.toByteArray()); + assertSchemasEqual(schema, result.schema()); + assertEquals(new SchemaAndValue(schema, getExpectedComplexTypeProtoWithDefaultOneOf()), result); + } + + @Test + public void testToConnectDataDefaultOneOfCannotHaveTwoOneOfsSet() throws ParseException { + Schema schema = getComplexTypeSchemaBuilder().build(); + NestedTestProtoOuterClass.ComplexType message = createProtoMultipleSetOneOf(); + ProtobufData protobufData = new ProtobufData(NestedTestProtoOuterClass.ComplexType.class, LEGACY_NAME); + SchemaAndValue result = protobufData.toConnectData(message.toByteArray()); + assertSchemasEqual(schema, result.schema()); + assertEquals(new SchemaAndValue(schema, getExpectedComplexTypeProtoWithDefaultOneOf()), result); + } + // Data Conversion tests @Test public void testToConnectSupportsOptionalValues() { From bbac3a5b5aaf20ca812282f73bbae6d31e3c0fae Mon Sep 17 00:00:00 2001 From: makearl Date: Thu, 31 May 2018 16:56:38 -0400 Subject: [PATCH 2/3] Bump version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9dba418..a6c8e07 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.blueapron kafka-connect-protobuf-converter jar - 2.0.0 + 2.0.1 3.4.0 From 60a0fcb5f6735adb21903eb1487834e8c22ff7bb Mon Sep 17 00:00:00 2001 From: makearl Date: Fri, 1 Jun 2018 09:43:36 -0400 Subject: [PATCH 3/3] Split out oneof field handling --- .../connect/protobuf/ProtobufData.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java index 249c592..d537aca 100644 --- a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java +++ b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java @@ -3,6 +3,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.OneofDescriptor; import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; @@ -21,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import com.google.protobuf.util.Timestamps; @@ -81,12 +83,6 @@ private String getProtoFieldName(String descriptorForTypeName, String connectFie return connectProtoNameMap.get(getProtoMapKey(descriptorForTypeName, connectFieldName)); } - private boolean isUnsetOneof(Descriptors.FieldDescriptor fieldDescriptor, Object value) { - return fieldDescriptor.getContainingOneof() != null && - fieldDescriptor.getType() != MESSAGE && - fieldDescriptor.getDefaultValue().equals(value); - } - ProtobufData(Class clazz, String legacyName) { this.legacyName = legacyName; @@ -225,6 +221,13 @@ private boolean isProtobufDate(Schema schema) { return Date.SCHEMA.name().equals(schema.name()); } + private void setStructField(Schema schema, Message message, Struct result, Descriptors.FieldDescriptor fieldDescriptor) { + final String fieldName = getConnectFieldName(fieldDescriptor); + final Field field = schema.field(fieldName); + Object obj = message.getField(fieldDescriptor); + result.put(fieldName, toConnectData(field.schema(), obj)); + } + Object toConnectData(Schema schema, Object value) { try { if (isProtobufTimestamp(schema)) { @@ -329,23 +332,20 @@ Object toConnectData(Schema schema, Object value) { } final Struct result = new Struct(schema.schema()); + final Descriptors.Descriptor descriptor = message.getDescriptorForType(); - for (Descriptors.FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { + for (OneofDescriptor oneOfDescriptor : descriptor.getOneofs()) { + final Descriptors.FieldDescriptor fieldDescriptor = message.getOneofFieldDescriptor(oneOfDescriptor); + setStructField(schema, message, result, fieldDescriptor); + } + + for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) { Descriptors.OneofDescriptor oneOfDescriptor = fieldDescriptor.getContainingOneof(); if (oneOfDescriptor != null) { - Descriptors.FieldDescriptor oneFieldDescriptor = message.getOneofFieldDescriptor(oneOfDescriptor); - - String oneFieldName = getConnectFieldName(oneFieldDescriptor); - Field oneField = schema.field(oneFieldName); - Object oneValue = message.getField(oneFieldDescriptor); - - result.put(oneFieldName, toConnectData(oneField.schema(), oneValue)); - } else { - final String fieldName = getConnectFieldName(fieldDescriptor); - final Field field = schema.field(fieldName); - Object obj = message.getField(fieldDescriptor); - result.put(fieldName, toConnectData(field.schema(), obj)); + continue; } + + setStructField(schema, message, result, fieldDescriptor); } converted = result;