Skip to content
This repository has been archived by the owner on Jun 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #11 from blueapron/fix-oneof-handling
Browse files Browse the repository at this point in the history
Fix oneof handling
  • Loading branch information
makearl authored Jun 4, 2018
2 parents 4281d94 + 60a0fcb commit f1e6cec
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.blueapron</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<packaging>jar</packaging>
<version>2.0.0</version>
<version>2.0.1</version>

<properties>
<protobuf.version>3.4.0</protobuf.version>
Expand Down
32 changes: 20 additions & 12 deletions src/main/java/com/blueapron/connect/protobuf/ProtobufData.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.OneofDescriptor;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
Expand All @@ -21,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import com.google.protobuf.util.Timestamps;
Expand Down Expand Up @@ -81,12 +83,6 @@ private String getProtoFieldName(String descriptorForTypeName, String connectFie
return connectProtoNameMap.get(getProtoMapKey(descriptorForTypeName, connectFieldName));
}

private boolean isUnsetOneof(Descriptors.FieldDescriptor fieldDescriptor, Object value) {
return fieldDescriptor.getContainingOneof() != null &&
fieldDescriptor.getType() != MESSAGE &&
fieldDescriptor.getDefaultValue().equals(value);
}

ProtobufData(Class<? extends com.google.protobuf.GeneratedMessageV3> clazz, String legacyName) {
this.legacyName = legacyName;

Expand Down Expand Up @@ -225,6 +221,13 @@ private boolean isProtobufDate(Schema schema) {
return Date.SCHEMA.name().equals(schema.name());
}

private void setStructField(Schema schema, Message message, Struct result, Descriptors.FieldDescriptor fieldDescriptor) {
final String fieldName = getConnectFieldName(fieldDescriptor);
final Field field = schema.field(fieldName);
Object obj = message.getField(fieldDescriptor);
result.put(fieldName, toConnectData(field.schema(), obj));
}

Object toConnectData(Schema schema, Object value) {
try {
if (isProtobufTimestamp(schema)) {
Expand Down Expand Up @@ -329,15 +332,20 @@ Object toConnectData(Schema schema, Object value) {
}

final Struct result = new Struct(schema.schema());
final Descriptors.Descriptor descriptor = message.getDescriptorForType();

for (Descriptors.FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) {
final String fieldName = getConnectFieldName(fieldDescriptor);
final Field field = schema.field(fieldName);
for (OneofDescriptor oneOfDescriptor : descriptor.getOneofs()) {
final Descriptors.FieldDescriptor fieldDescriptor = message.getOneofFieldDescriptor(oneOfDescriptor);
setStructField(schema, message, result, fieldDescriptor);
}

Object obj = message.getField(fieldDescriptor);
if (!isUnsetOneof(fieldDescriptor, obj)) {
result.put(fieldName, toConnectData(field.schema(), obj));
for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) {
Descriptors.OneofDescriptor oneOfDescriptor = fieldDescriptor.getContainingOneof();
if (oneOfDescriptor != null) {
continue;
}

setStructField(schema, message, result, fieldDescriptor);
}

converted = result;
Expand Down
55 changes: 50 additions & 5 deletions src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ private Schema getExpectedNestedTestProtoSchemaIntUserId() {
return getExpectedNestedTestProtoSchema();
}

private SchemaBuilder getComplexTypeSchemaBuilder() {
final SchemaBuilder complexTypeBuilder = SchemaBuilder.struct();
complexTypeBuilder.field("one_id", SchemaBuilder.string().optional().build());
complexTypeBuilder.field("other_id", SchemaBuilder.int32().optional().build());
complexTypeBuilder.field("is_active", SchemaBuilder.bool().optional().build());
return complexTypeBuilder;
}

private Schema getExpectedNestedTestProtoSchema() {
final SchemaBuilder builder = SchemaBuilder.struct();
final SchemaBuilder userIdBuilder = SchemaBuilder.struct();
Expand All @@ -99,11 +107,7 @@ private Schema getExpectedNestedTestProtoSchema() {
builder.field("experiments_active", SchemaBuilder.array(SchemaBuilder.string().optional().build()).optional().build());
builder.field("updated_at", org.apache.kafka.connect.data.Timestamp.builder().optional().build());
builder.field("status", SchemaBuilder.string().optional().build());
final SchemaBuilder complexTypeBuilder = SchemaBuilder.struct();
complexTypeBuilder.field("one_id", SchemaBuilder.string().optional().build());
complexTypeBuilder.field("other_id", SchemaBuilder.int32().optional().build());
complexTypeBuilder.field("is_active", SchemaBuilder.bool().optional().build());
builder.field("complex_type", complexTypeBuilder.optional().build());
builder.field("complex_type", getComplexTypeSchemaBuilder().optional().build());
builder.field("map_type", SchemaBuilder.array(SchemaBuilder.struct().field("key", Schema.OPTIONAL_STRING_SCHEMA).field("value", Schema.OPTIONAL_STRING_SCHEMA).optional().build()).optional().build());
return builder.build();
}
Expand Down Expand Up @@ -161,6 +165,27 @@ private Struct getExpectedNestedTestProtoResultIntUserId() throws ParseException
return result;
}

private NestedTestProtoOuterClass.ComplexType createProtoDefaultOneOf() throws ParseException {
NestedTestProtoOuterClass.ComplexType.Builder complexTypeBuilder = NestedTestProtoOuterClass.ComplexType.newBuilder();
complexTypeBuilder.setOtherId(0);
return complexTypeBuilder.build();
}

private NestedTestProtoOuterClass.ComplexType createProtoMultipleSetOneOf() throws ParseException {
NestedTestProtoOuterClass.ComplexType.Builder complexTypeBuilder = NestedTestProtoOuterClass.ComplexType.newBuilder();
complexTypeBuilder.setOneId("asdf");
complexTypeBuilder.setOtherId(0);
return complexTypeBuilder.build();
}

private Struct getExpectedComplexTypeProtoWithDefaultOneOf() {
Schema schema = getComplexTypeSchemaBuilder().build();
Struct result = new Struct(schema.schema());
result.put("other_id", 0);
result.put("is_active", false);
return result;
}

private void assertSchemasEqual(Schema expectedSchema, Schema actualSchema) {
assertEquals(expectedSchema.type(), actualSchema.type());
assertEquals(expectedSchema.isOptional(), actualSchema.isOptional());
Expand Down Expand Up @@ -196,6 +221,26 @@ public void testToConnectDataWithNestedProtobufMessageAndIntUserId() throws Pars
assertEquals(new SchemaAndValue(getExpectedNestedTestProtoSchemaIntUserId(), getExpectedNestedTestProtoResultIntUserId()), result);
}

@Test
public void testToConnectDataDefaultOneOf() throws ParseException {
Schema schema = getComplexTypeSchemaBuilder().build();
NestedTestProtoOuterClass.ComplexType message = createProtoDefaultOneOf();
ProtobufData protobufData = new ProtobufData(NestedTestProtoOuterClass.ComplexType.class, LEGACY_NAME);
SchemaAndValue result = protobufData.toConnectData(message.toByteArray());
assertSchemasEqual(schema, result.schema());
assertEquals(new SchemaAndValue(schema, getExpectedComplexTypeProtoWithDefaultOneOf()), result);
}

@Test
public void testToConnectDataDefaultOneOfCannotHaveTwoOneOfsSet() throws ParseException {
Schema schema = getComplexTypeSchemaBuilder().build();
NestedTestProtoOuterClass.ComplexType message = createProtoMultipleSetOneOf();
ProtobufData protobufData = new ProtobufData(NestedTestProtoOuterClass.ComplexType.class, LEGACY_NAME);
SchemaAndValue result = protobufData.toConnectData(message.toByteArray());
assertSchemasEqual(schema, result.schema());
assertEquals(new SchemaAndValue(schema, getExpectedComplexTypeProtoWithDefaultOneOf()), result);
}

// Data Conversion tests
@Test
public void testToConnectSupportsOptionalValues() {
Expand Down

0 comments on commit f1e6cec

Please sign in to comment.