diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapper.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapper.java index 5ff838c6e..2d61cf05d 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapper.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapper.java @@ -34,6 +34,8 @@ import java.util.Map; +import java.util.Optional; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -79,23 +81,39 @@ public static SourceRecord mapSingleServiceBusMessage(ServiceBusReceivedMessage } private static Struct createStructFromServiceBusMessage(final ServiceBusReceivedMessage serviceBusMessage) { - return new Struct(VALUE_SCHEMA) - .put(DELIVERY_COUNT, serviceBusMessage.getDeliveryCount()) - .put(ENQUEUED_TIME_UTC, serviceBusMessage.getEnqueuedTime().toEpochSecond()) - .put(CONTENT_TYPE, serviceBusMessage.getContentType()) - .put(LABEL, AzureServiceBusSourceConnector.class.getSimpleName()) - .put(CORRELATION_ID, serviceBusMessage.getCorrelationId()) - .put(PARTITION_KEY, serviceBusMessage.getPartitionKey()) - .put(REPLY_TO, serviceBusMessage.getReplyTo()) - .put(REPLY_TO_SESSION_ID, serviceBusMessage.getReplyToSessionId()) - .put(DEAD_LETTER_SOURCE, serviceBusMessage.getDeadLetterSource()) - .put(TIME_TO_LIVE, serviceBusMessage.getTimeToLive().toMillis()) - .put(LOCKED_UNTIL_UTC, serviceBusMessage.getLockedUntil().toEpochSecond()) - .put(SEQUENCE_NUMBER, serviceBusMessage.getSequenceNumber()) - .put(SESSION_ID, serviceBusMessage.getSessionId()) - .put(LOCK_TOKEN, serviceBusMessage.getLockToken()) - .put(MESSAGE_BODY, serviceBusMessage.getBody().toBytes()) - .put(GET_TO, serviceBusMessage.getTo()); + Struct struct = + new Struct(VALUE_SCHEMA) + .put(DELIVERY_COUNT, serviceBusMessage.getDeliveryCount()) + .put(ENQUEUED_TIME_UTC, serviceBusMessage.getEnqueuedTime().toEpochSecond()) + .put(LABEL, AzureServiceBusSourceConnector.class.getSimpleName()) + .put(TIME_TO_LIVE, serviceBusMessage.getTimeToLive().toMillis()) + .put(MESSAGE_BODY, serviceBusMessage.getBody().toBytes()) + .put(GET_TO, serviceBusMessage.getTo()); + + addOptionalSchemaValues(struct, serviceBusMessage); + + return struct; + } + + private static void addOptionalSchemaValues(Struct struct, ServiceBusReceivedMessage serviceBusMessage) { + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getContentType(), CONTENT_TYPE); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getCorrelationId(), CORRELATION_ID); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getPartitionKey(), PARTITION_KEY); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getReplyTo(), REPLY_TO); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getReplyToSessionId(), REPLY_TO_SESSION_ID); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getDeadLetterSource(), DEAD_LETTER_SOURCE); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getSequenceNumber(), SEQUENCE_NUMBER); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getSessionId(), SESSION_ID); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getLockToken(), LOCK_TOKEN); + addOptionalSchemaValueIfExists(struct, serviceBusMessage.getTo(), GET_TO); + + Optional.ofNullable(serviceBusMessage.getLockedUntil()) + .ifPresent(lu -> struct.put(LOCKED_UNTIL_UTC, lu.toEpochSecond())); + + } + + private static void addOptionalSchemaValueIfExists(Struct struct, Object value, Field field) { + Optional.ofNullable(value).ifPresent(val -> struct.put(field, val)); } } diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java index 6c9b45527..335ac8e8f 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java @@ -30,9 +30,9 @@ public class ServiceBusValueSchemaField { static final Field ENQUEUED_TIME_UTC = new Field(SchemaFieldConstants.ENQUEUED_TIME_UTC, 1, Schema.INT64_SCHEMA); static final Field CONTENT_TYPE = - new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.STRING_SCHEMA); + new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.OPTIONAL_STRING_SCHEMA); static final Field LABEL = - new Field(SchemaFieldConstants.LABEL, 3, Schema.STRING_SCHEMA); + new Field(SchemaFieldConstants.LABEL, 3, Schema.OPTIONAL_STRING_SCHEMA); static final Field CORRELATION_ID = new Field(SchemaFieldConstants.CORRELATION_ID, 4, Schema.OPTIONAL_STRING_SCHEMA); static final Field MESSAGE_PROPERTIES = @@ -56,7 +56,7 @@ public class ServiceBusValueSchemaField { static final Field LOCK_TOKEN = new Field(SchemaFieldConstants.LOCK_TOKEN, 14, Schema.OPTIONAL_STRING_SCHEMA); static final Field MESSAGE_BODY = - new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.BYTES_SCHEMA); + new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.OPTIONAL_BYTES_SCHEMA); static final Field GET_TO = new Field(SchemaFieldConstants.GET_TO, 16, Schema.OPTIONAL_STRING_SCHEMA); diff --git a/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapperTest.java b/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapperTest.java index 094b6ba22..97ce46a33 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapperTest.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusToSourceRecordMapperTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import com.azure.core.util.BinaryData; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; @@ -84,6 +83,50 @@ void mapSingleSourceRecordWitAllParameters() { } + @Test + void mapSingleSourceRecordAllowsForOptionalScemaFieldsToBeNull() { + //given + ServiceBusReceivedMessage busMessage = prepareMessageBusWithOnlyRequiredFields(); + AzureServiceBusPartitionKey partitionKey = new AzureServiceBusPartitionKey(OUTPUT_TOPIC, PARTITION_KEY); + AzureServiceBusOffsetMarker busOffsetMarker = new AzureServiceBusOffsetMarker(SEQUENCE_NUMBER); + + //when + SourceRecord sourceRecord = + ServiceBusToSourceRecordMapper.mapSingleServiceBusMessage(busMessage, OUTPUT_TOPIC, partitionKey, + busOffsetMarker); + + //then + assertThat(sourceRecord) + .returns(TIME_NOW.toEpochSecond(), from(SourceRecord::timestamp)) + .returns(partitionKey, from(SourceRecord::sourcePartition)) + .returns(null, from(SourceRecord::kafkaPartition)) + .returns(busOffsetMarker, from(SourceRecord::sourceOffset)) + .returns(OUTPUT_TOPIC, from(SourceRecord::topic)) + .returns(Schema.STRING_SCHEMA, from(SourceRecord::keySchema)) + .returns(ServiceBusToSourceRecordMapper.VALUE_SCHEMA, from(SourceRecord::valueSchema)); + + Struct valueStruct = (Struct) sourceRecord.value(); + + assertThat(valueStruct) + .returns(DELIVERY_COUNT, from(v -> v.get(ServiceBusValueSchemaField.DELIVERY_COUNT))) + .returns(TIME_NOW.toEpochSecond(), from(v -> v.get(ServiceBusValueSchemaField.ENQUEUED_TIME_UTC))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.CONTENT_TYPE))) + .returns(LABEL, from(v -> v.get(ServiceBusValueSchemaField.LABEL))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.CORRELATION_ID))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.PARTITION_KEY))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.REPLY_TO))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.REPLY_TO_SESSION_ID))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.DEAD_LETTER_SOURCE))) + .returns(TIME_TO_LIVE.toMillis(), from(v -> v.get(ServiceBusValueSchemaField.TIME_TO_LIVE))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.LOCKED_UNTIL_UTC))) + .returns(SEQUENCE_NUMBER, from(v -> v.get(ServiceBusValueSchemaField.SEQUENCE_NUMBER))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.SESSION_ID))) + .returns(null, from(v -> v.get(ServiceBusValueSchemaField.LOCK_TOKEN))) + .returns(MESSAGE_BODY, from(v -> v.get(ServiceBusValueSchemaField.MESSAGE_BODY))) + .returns(DELIVERY_COUNT, from(v -> v.get(ServiceBusValueSchemaField.DELIVERY_COUNT))); + + } + private void assertMappedStructValues(Struct valueStruct) { assertThat(valueStruct) .returns(DELIVERY_COUNT, from(v -> v.get(ServiceBusValueSchemaField.DELIVERY_COUNT))) @@ -110,22 +153,48 @@ private ServiceBusReceivedMessage prepareMessageBusWithAllConsumedFields() { BinaryData bodyBinary = mock(BinaryData.class); when(bodyBinary.toBytes()).thenReturn(MESSAGE_BODY); - Mockito.when(busReceivedMessage.getMessageId()).thenReturn(MESSAGE_ID); - Mockito.when(busReceivedMessage.getDeliveryCount()).thenReturn(DELIVERY_COUNT); - Mockito.when(busReceivedMessage.getEnqueuedTime()).thenReturn(TIME_NOW); - Mockito.when(busReceivedMessage.getContentType()).thenReturn(CONTENT_TYPE); - Mockito.when(busReceivedMessage.getCorrelationId()).thenReturn(CORRELATION_ID); - Mockito.when(busReceivedMessage.getPartitionKey()).thenReturn(PARTITION_KEY); - Mockito.when(busReceivedMessage.getReplyTo()).thenReturn(REPLY_TO); - Mockito.when(busReceivedMessage.getReplyToSessionId()).thenReturn(REPLY_TO_SESSION_ID); - Mockito.when(busReceivedMessage.getDeadLetterSource()).thenReturn(DEAD_LETTER_SOURCE); - Mockito.when(busReceivedMessage.getTimeToLive()).thenReturn(TIME_TO_LIVE); - Mockito.when(busReceivedMessage.getLockedUntil()).thenReturn(TIME_NOW); - Mockito.when(busReceivedMessage.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER); - Mockito.when(busReceivedMessage.getSessionId()).thenReturn(SESSION_ID); - Mockito.when(busReceivedMessage.getLockToken()).thenReturn(LOCK_TOKEN); - Mockito.when(busReceivedMessage.getBody()).thenReturn(bodyBinary); - Mockito.when(busReceivedMessage.getTo()).thenReturn(GET_TO); + when(busReceivedMessage.getMessageId()).thenReturn(MESSAGE_ID); + when(busReceivedMessage.getDeliveryCount()).thenReturn(DELIVERY_COUNT); + when(busReceivedMessage.getEnqueuedTime()).thenReturn(TIME_NOW); + when(busReceivedMessage.getContentType()).thenReturn(CONTENT_TYPE); + when(busReceivedMessage.getCorrelationId()).thenReturn(CORRELATION_ID); + when(busReceivedMessage.getPartitionKey()).thenReturn(PARTITION_KEY); + when(busReceivedMessage.getReplyTo()).thenReturn(REPLY_TO); + when(busReceivedMessage.getReplyToSessionId()).thenReturn(REPLY_TO_SESSION_ID); + when(busReceivedMessage.getDeadLetterSource()).thenReturn(DEAD_LETTER_SOURCE); + when(busReceivedMessage.getTimeToLive()).thenReturn(TIME_TO_LIVE); + when(busReceivedMessage.getLockedUntil()).thenReturn(TIME_NOW); + when(busReceivedMessage.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER); + when(busReceivedMessage.getSessionId()).thenReturn(SESSION_ID); + when(busReceivedMessage.getLockToken()).thenReturn(LOCK_TOKEN); + when(busReceivedMessage.getBody()).thenReturn(bodyBinary); + when(busReceivedMessage.getTo()).thenReturn(GET_TO); + + return busReceivedMessage; + } + + private ServiceBusReceivedMessage prepareMessageBusWithOnlyRequiredFields() { + ServiceBusReceivedMessage busReceivedMessage = mock(ServiceBusReceivedMessage.class); + + BinaryData bodyBinary = mock(BinaryData.class); + when(bodyBinary.toBytes()).thenReturn(MESSAGE_BODY); + + when(busReceivedMessage.getMessageId()).thenReturn(MESSAGE_ID); + when(busReceivedMessage.getDeliveryCount()).thenReturn(DELIVERY_COUNT); + when(busReceivedMessage.getEnqueuedTime()).thenReturn(TIME_NOW); + when(busReceivedMessage.getContentType()).thenReturn(null); + when(busReceivedMessage.getCorrelationId()).thenReturn(null); + when(busReceivedMessage.getPartitionKey()).thenReturn(null); + when(busReceivedMessage.getReplyTo()).thenReturn(null); + when(busReceivedMessage.getReplyToSessionId()).thenReturn(null); + when(busReceivedMessage.getDeadLetterSource()).thenReturn(null); + when(busReceivedMessage.getTimeToLive()).thenReturn(TIME_TO_LIVE); + when(busReceivedMessage.getLockedUntil()).thenReturn(null); + when(busReceivedMessage.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER); + when(busReceivedMessage.getSessionId()).thenReturn(null); + when(busReceivedMessage.getLockToken()).thenReturn(null); + when(busReceivedMessage.getBody()).thenReturn(bodyBinary); + when(busReceivedMessage.getTo()).thenReturn(null); return busReceivedMessage; }