diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index 20ad901..8d1ab7b 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -713,7 +713,7 @@ else if (KclMessageDrivenChannelAdapter.this.converter != null) { partitionKeys.add(r.partitionKey()); sequenceNumbers.add(r.sequenceNumber()); - return KclMessageDrivenChannelAdapter.this.converter.convert(r.data().array()); + return KclMessageDrivenChannelAdapter.this.converter.convert(BinaryUtils.copyAllBytesFrom(r.data())); }) .toList(); diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 35198ae..e4c4830 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -16,16 +16,30 @@ package org.springframework.integration.aws.kinesis; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.Consumer; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.metrics.MetricsFactory; @@ -37,7 +51,9 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.aws.LocalstackContainerTest; +import org.springframework.integration.aws.inbound.kinesis.CheckpointMode; import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter; +import org.springframework.integration.aws.inbound.kinesis.ListenerMode; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; @@ -53,7 +69,6 @@ * @author Artem Bilan * @author Siddharth Jain * @author Minkyu Moon - * * @since 3.0 */ @SpringJUnitConfig @@ -61,6 +76,8 @@ public class KclMessageDrivenChannelAdapterTests implements LocalstackContainerTest { private static final String TEST_STREAM = "TestStreamKcl"; + public static final String LEASE_TABLE_NAME = "test_table"; + public static final String TEST_DATA = "test data"; private static KinesisAsyncClient AMAZON_KINESIS; @@ -80,10 +97,10 @@ static void setup() { DYNAMO_DB = LocalstackContainerTest.dynamoDbClient(); CLOUD_WATCH = LocalstackContainerTest.cloudWatchClient(); - AMAZON_KINESIS.createStream(request -> request.streamName(TEST_STREAM).shardCount(1)) - .thenCompose(result -> - AMAZON_KINESIS.waiter().waitUntilStreamExists(request -> request.streamName(TEST_STREAM))) - .join(); + CompletableFuture.allOf( + initialiseStream(TEST_STREAM), + initialiseLeaseTableFor(LEASE_TABLE_NAME) + ).join(); } @AfterAll @@ -96,20 +113,36 @@ static void tearDown() { } @Test - void kclChannelAdapterReceivesRecords() { - String testData = "test data"; + void kclChannelAdapterReceivesBatchedRecords() { + this.kclMessageDrivenChannelAdapter.setListenerMode(ListenerMode.batch); + this.kclMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.batch); + + Message received = verifyRecordReceived(TEST_DATA); + assertThat(received.getPayload()).isEqualTo(Collections.singletonList(TEST_DATA)); + List receivedSequences = received.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, List.class); + assertThat(receivedSequences).isNotEmpty(); + } + + @Test + void kclChannelAdapterReceivesSingleRecord() { + this.kclMessageDrivenChannelAdapter.setListenerMode(ListenerMode.record); + this.kclMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.record); + + Message receive = verifyRecordReceived(TEST_DATA); + assertThat(receive.getPayload()).isEqualTo(TEST_DATA); + assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA); + assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty(); + } + + private Message verifyRecordReceived(String testData) { AMAZON_KINESIS.putRecord(request -> request.streamName(TEST_STREAM) .data(SdkBytes.fromUtf8String(testData)) .partitionKey("test")); - // We need so long delay because KCL has a more than a minute setup phase. - Message receive = this.kinesisReceiveChannel.receive(300_000); + Message receive = this.kinesisReceiveChannel.receive(5000); assertThat(receive).isNotNull(); - assertThat(receive.getPayload()).isEqualTo(testData); - assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA); - assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty(); List streamConsumers = AMAZON_KINESIS.describeStream(r -> r.streamName(TEST_STREAM)) @@ -120,10 +153,11 @@ void kclChannelAdapterReceivesRecords() { .consumers(); // Because FanOut is false, there would be no Stream Consumers. - assertThat(streamConsumers).hasSize(0); + assertThat(streamConsumers).isEmpty(); List tableNames = DYNAMO_DB.listTables().join().tableNames(); - assertThat(tableNames).contains("test_table"); + assertThat(tableNames).contains(LEASE_TABLE_NAME); + return receive; } @Test @@ -176,10 +210,10 @@ public void pollingMaxRecordsIsPropagated() { public static class TestConfiguration { @Bean - public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { + public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter(PollableChannel kinesisReceiveChannel) { KclMessageDrivenChannelAdapter adapter = new KclMessageDrivenChannelAdapter(AMAZON_KINESIS, CLOUD_WATCH, DYNAMO_DB, TEST_STREAM); - adapter.setOutputChannel(kinesisReceiveChannel()); + adapter.setOutputChannel(kinesisReceiveChannel); adapter.setStreamInitialSequence( InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); adapter.setConverter(String::new); @@ -202,7 +236,45 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { public PollableChannel kinesisReceiveChannel() { return new QueueChannel(); } + } + private static CompletableFuture> initialiseStream(String streamName) { + return AMAZON_KINESIS.createStream(request -> request.streamName(streamName).shardCount(1)) + .thenCompose( + result -> AMAZON_KINESIS.waiter().waitUntilStreamExists(request -> request.streamName(streamName))); } + /** + * Initialises the lease table to improve KCL initialisation time + */ + private static CompletableFuture initialiseLeaseTableFor(String leaseTableName) { + return DYNAMO_DB.createTable(CreateTableRequest.builder() + .tableName(leaseTableName) + .attributeDefinitions(AttributeDefinition.builder() + .attributeName("leaseKey") + .attributeType(ScalarAttributeType.S) + .build()) + .keySchema(KeySchemaElement.builder() + .attributeName("leaseKey") + .keyType(KeyType.HASH) + .build()) + .provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(1L) + .writeCapacityUnits(1L) + .build()) + .build()) + .thenCompose( + result -> DYNAMO_DB.waiter().waitUntilTableExists(request -> request.tableName(leaseTableName))) + .thenCompose(describeTableResponseWaiterResponse -> DYNAMO_DB.putItem(PutItemRequest.builder() + .tableName(leaseTableName) + .item(Map.of( + "leaseKey", AttributeValue.fromS("shardId-000000000000"), + "checkpoint", AttributeValue.fromS("TRIM_HORIZON"), + "leaseCounter", AttributeValue.fromN("1"), + "startingHashKey", AttributeValue.fromS("0"), + "ownerSwitchesSinceCheckpoint", AttributeValue.fromN("0"), + "checkpointSubSequenceNumber", AttributeValue.fromN("0") + )) + .build())); + } }