From 4184dae6c08aaed738c63296fc4a967f6c708a28 Mon Sep 17 00:00:00 2001 From: Bayard Neville Date: Mon, 6 Jan 2020 15:53:55 -0500 Subject: [PATCH] Propagate null timestamps and dates [updated with CI fix] (#34) * Propagate null timestamps and dates * Distinguish between default and null messages * Non-static import for MESSAGE Co-authored-by: Matthew Jones --- .../connect/protobuf/ProtobufData.java | 11 +++---- .../connect/protobuf/ProtobufDataTest.java | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java index e923497..e4afc53 100644 --- a/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java +++ b/src/main/java/com/blueapron/connect/protobuf/ProtobufData.java @@ -3,6 +3,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Descriptors.OneofDescriptor; import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.InvalidProtocolBufferException; @@ -229,8 +230,11 @@ private boolean isProtobufDate(Schema schema) { private void setStructField(Schema schema, Message message, Struct result, Descriptors.FieldDescriptor fieldDescriptor) { final String fieldName = getConnectFieldName(fieldDescriptor); final Field field = schema.field(fieldName); - Object obj = message.getField(fieldDescriptor); - result.put(fieldName, toConnectData(field.schema(), obj)); + Object obj = null; + if (fieldDescriptor.getType() != FieldDescriptor.Type.MESSAGE || fieldDescriptor.isRepeated() || fieldDescriptor.isMapField() || message.hasField(fieldDescriptor)) { + obj = toConnectData(field.schema(), message.getField(fieldDescriptor)); + } + result.put(fieldName, obj); } Object toConnectData(Schema schema, Object value) { @@ -338,9 +342,6 @@ Object toConnectData(Schema schema, Object value) { case STRUCT: { final Message message = (Message) value; // Validate type - if (message == message.getDefaultInstanceForType()) { - return null; - } final Struct result = new Struct(schema.schema()); final Descriptors.Descriptor descriptor = message.getDescriptorForType(); diff --git a/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java b/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java index 9de09f3..53035da 100644 --- a/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java +++ b/src/test/java/com/blueapron/connect/protobuf/ProtobufDataTest.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -428,6 +429,35 @@ public void testToConnectTimestamp() throws ParseException { assertEquals(getExpectedSchemaAndValue(timestampSchema, expectedValue, expectedName), result); } + @Test + public void testToConnectNullTimestamp() { + String expectedName = "TimestampValue"; + TimestampValueOuterClass.TimestampValue message = TimestampValueOuterClass.TimestampValue.getDefaultInstance(); + + ProtobufData protobufData = new ProtobufData(TimestampValueOuterClass.TimestampValue.class, LEGACY_NAME); + SchemaAndValue result = protobufData.toConnectData(message.toByteArray()); + + Schema timestampSchema = org.apache.kafka.connect.data.Timestamp.builder().optional().build(); + assertEquals(getExpectedSchemaAndValue(timestampSchema, null, expectedName), result); + } + + @Test + public void testToConnectEpochTimestamp() { + java.util.Date expectedValue = java.util.Date.from(Instant.EPOCH); + String expectedName = "TimestampValue"; + + Timestamp timestamp = Timestamp.getDefaultInstance(); + TimestampValueOuterClass.TimestampValue.Builder builder = TimestampValueOuterClass.TimestampValue.newBuilder(); + builder.setValue(timestamp); + TimestampValueOuterClass.TimestampValue message = builder.build(); + + ProtobufData protobufData = new ProtobufData(TimestampValueOuterClass.TimestampValue.class, LEGACY_NAME); + SchemaAndValue result = protobufData.toConnectData(message.toByteArray()); + + Schema timestampSchema = org.apache.kafka.connect.data.Timestamp.builder().optional().build(); + assertEquals(getExpectedSchemaAndValue(timestampSchema, expectedValue, expectedName), result); + } + @Test public void testToConnectDate() throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");