diff --git a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java index 20ad901..8d1ab7b 100644 --- a/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java @@ -713,7 +713,7 @@ else if (KclMessageDrivenChannelAdapter.this.converter != null) { partitionKeys.add(r.partitionKey()); sequenceNumbers.add(r.sequenceNumber()); - return KclMessageDrivenChannelAdapter.this.converter.convert(r.data().array()); + return KclMessageDrivenChannelAdapter.this.converter.convert(BinaryUtils.copyAllBytesFrom(r.data())); }) .toList(); diff --git a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java index 35198ae..d6f4ce6 100644 --- a/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java +++ b/src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java @@ -16,28 +16,17 @@ package org.springframework.integration.aws.kinesis; -import java.util.List; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.model.Consumer; -import software.amazon.kinesis.common.InitialPositionInStream; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.metrics.MetricsLevel; -import software.amazon.kinesis.metrics.NullMetricsFactory; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.aws.LocalstackContainerTest; +import org.springframework.integration.aws.inbound.kinesis.CheckpointMode; import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter; +import org.springframework.integration.aws.inbound.kinesis.ListenerMode; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; @@ -46,6 +35,32 @@ import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemResponse; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Consumer; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.NullMetricsFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; @@ -53,156 +68,223 @@ * @author Artem Bilan * @author Siddharth Jain * @author Minkyu Moon - * * @since 3.0 */ @SpringJUnitConfig @DirtiesContext public class KclMessageDrivenChannelAdapterTests implements LocalstackContainerTest { - private static final String TEST_STREAM = "TestStreamKcl"; - - private static KinesisAsyncClient AMAZON_KINESIS; - - private static DynamoDbAsyncClient DYNAMO_DB; - - private static CloudWatchAsyncClient CLOUD_WATCH; - - @Autowired - private PollableChannel kinesisReceiveChannel; - - @Autowired - private KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter; - - @BeforeAll - static void setup() { - AMAZON_KINESIS = LocalstackContainerTest.kinesisClient(); - DYNAMO_DB = LocalstackContainerTest.dynamoDbClient(); - CLOUD_WATCH = LocalstackContainerTest.cloudWatchClient(); - - AMAZON_KINESIS.createStream(request -> request.streamName(TEST_STREAM).shardCount(1)) - .thenCompose(result -> - AMAZON_KINESIS.waiter().waitUntilStreamExists(request -> request.streamName(TEST_STREAM))) - .join(); - } - - @AfterAll - static void tearDown() { - AMAZON_KINESIS - .deleteStream(request -> request.streamName(TEST_STREAM).enforceConsumerDeletion(true)) - .thenCompose(result -> AMAZON_KINESIS.waiter() - .waitUntilStreamNotExists(request -> request.streamName(TEST_STREAM))) - .join(); - } - - @Test - void kclChannelAdapterReceivesRecords() { - String testData = "test data"; - - AMAZON_KINESIS.putRecord(request -> - request.streamName(TEST_STREAM) - .data(SdkBytes.fromUtf8String(testData)) - .partitionKey("test")); - - // We need so long delay because KCL has a more than a minute setup phase. - Message receive = this.kinesisReceiveChannel.receive(300_000); - assertThat(receive).isNotNull(); - assertThat(receive.getPayload()).isEqualTo(testData); - assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA); - assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty(); - - List streamConsumers = - AMAZON_KINESIS.describeStream(r -> r.streamName(TEST_STREAM)) - .thenCompose(describeStreamResponse -> - AMAZON_KINESIS.listStreamConsumers(r -> - r.streamARN(describeStreamResponse.streamDescription().streamARN()))) - .join() - .consumers(); - - // Because FanOut is false, there would be no Stream Consumers. - assertThat(streamConsumers).hasSize(0); - - List tableNames = DYNAMO_DB.listTables().join().tableNames(); - assertThat(tableNames).contains("test_table"); - } - - @Test - public void metricsLevelOfMetricsConfigShouldBeSetToMetricsLevelOfAdapter() { - MetricsLevel metricsLevel = - TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, - "scheduler.metricsConfig.metricsLevel", - MetricsLevel.class); - assertThat(metricsLevel).isEqualTo(MetricsLevel.NONE); - } - - @Test - public void metricsFactoryOfSchedulerShouldBeSetNullMetricsFactoryIfMetricsLevelIsNone() { - MetricsFactory metricsFactory = - TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, - "scheduler.metricsFactory", - MetricsFactory.class); - assertThat(metricsFactory).isInstanceOf(NullMetricsFactory.class); - } - - @Test - public void maxLeasesForWorkerOverriddenByCustomizer() { - Integer maxLeasesForWorker = - TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, - "scheduler.leaseCoordinator.leaseTaker.maxLeasesForWorker", - Integer.class); - assertThat(maxLeasesForWorker).isEqualTo(10); - } - - @Test - public void shardConsumerDispatchPollIntervalMillisOverriddenByCustomizer() { - Long shardConsumerDispatchPollIntervalMillis = - TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, - "scheduler.shardConsumerDispatchPollIntervalMillis", - Long.class); - assertThat(shardConsumerDispatchPollIntervalMillis).isEqualTo(500L); - } - - @Test - public void pollingMaxRecordsIsPropagated() { - Integer maxRecords = - TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter, - "scheduler.retrievalConfig.retrievalSpecificConfig.maxRecords", - Integer.class); - assertThat(maxRecords).isEqualTo(99); - } - - @Configuration - @EnableIntegration - public static class TestConfiguration { - - @Bean - public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() { - KclMessageDrivenChannelAdapter adapter = - new KclMessageDrivenChannelAdapter(AMAZON_KINESIS, CLOUD_WATCH, DYNAMO_DB, TEST_STREAM); - adapter.setOutputChannel(kinesisReceiveChannel()); - adapter.setStreamInitialSequence( - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); - adapter.setConverter(String::new); - adapter.setConsumerGroup("single_stream_group"); - adapter.setLeaseTableName("test_table"); - adapter.setFanOut(false); - adapter.setMetricsLevel(MetricsLevel.NONE); - adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig -> - leaseManagementConfig.maxLeasesForWorker(10)); - adapter.setCoordinatorConfigCustomizer(coordinatorConfig -> - coordinatorConfig.shardConsumerDispatchPollIntervalMillis(500L)); - adapter.setBindSourceRecord(true); - adapter.setEmptyRecordList(true); - adapter.setPollingMaxRecords(99); - adapter.setGracefulShutdownTimeout(100); - return adapter; - } - - @Bean - public PollableChannel kinesisReceiveChannel() { - return new QueueChannel(); - } - - } + private static final String TEST_STREAM = "TestStreamKcl"; + public static final String LEASE_TABLE_NAME = "test_table"; + public static final String TEST_DATA = "test data"; + + private static KinesisAsyncClient AMAZON_KINESIS; + + private static DynamoDbAsyncClient DYNAMO_DB; + + private static CloudWatchAsyncClient CLOUD_WATCH; + + @Autowired + private PollableChannel kinesisReceiveChannel; + + @Autowired + private KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter; + + @BeforeAll + static void setup() { + AMAZON_KINESIS = LocalstackContainerTest.kinesisClient(); + DYNAMO_DB = LocalstackContainerTest.dynamoDbClient(); + CLOUD_WATCH = LocalstackContainerTest.cloudWatchClient(); + + CompletableFuture.allOf( + initialiseStream(TEST_STREAM), + initialiseLeaseTableFor(LEASE_TABLE_NAME) + ).join(); + } + + private static CompletableFuture> initialiseStream(String streamName) { + return AMAZON_KINESIS.createStream(request -> request.streamName(streamName).shardCount(1)) + .thenCompose(result -> + AMAZON_KINESIS.waiter().waitUntilStreamExists(request -> request.streamName(streamName))); + } + + @AfterAll + static void tearDown() { + AMAZON_KINESIS + .deleteStream(request -> request.streamName(TEST_STREAM).enforceConsumerDeletion(true)) + .thenCompose(result -> AMAZON_KINESIS.waiter() + .waitUntilStreamNotExists(request -> request.streamName(TEST_STREAM))) + .join(); + } + + @Test + void kclChannelAdapterReceivesBatchedRecords() { + this.kclMessageDrivenChannelAdapter.setListenerMode(ListenerMode.batch); + this.kclMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.batch); + + Message received = verifyRecordReceived(TEST_DATA); + assertThat(received.getPayload()).isEqualTo(Collections.singletonList(TEST_DATA)); + List receivedSequences = received.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, List.class); + assertThat(receivedSequences).isNotEmpty(); + } + + @Test + void kclChannelAdapterReceivesSingleRecord() { + + this.kclMessageDrivenChannelAdapter.setListenerMode(ListenerMode.record); + this.kclMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.record); + + Message receive = verifyRecordReceived(TEST_DATA); + assertThat(receive.getPayload()).isEqualTo(TEST_DATA); + assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA); + assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty(); + } + + private Message verifyRecordReceived(String testData) { + AMAZON_KINESIS.putRecord(request -> + request.streamName(TEST_STREAM) + .data(SdkBytes.fromUtf8String(testData)) + .partitionKey("test")); + + Message receive = this.kinesisReceiveChannel.receive(5000); + assertThat(receive).isNotNull(); + + List streamConsumers = + AMAZON_KINESIS.describeStream(r -> r.streamName(TEST_STREAM)) + .thenCompose(describeStreamResponse -> + AMAZON_KINESIS.listStreamConsumers(r -> + r.streamARN(describeStreamResponse.streamDescription().streamARN()))) + .join() + .consumers(); + + // Because FanOut is false, there would be no Stream Consumers. + assertThat(streamConsumers).isEmpty(); + + List tableNames = DYNAMO_DB.listTables().join().tableNames(); + assertThat(tableNames).contains(LEASE_TABLE_NAME); + return receive; + } + + @Test + public void metricsLevelOfMetricsConfigShouldBeSetToMetricsLevelOfAdapter() { + MetricsLevel metricsLevel = + TestUtils.getPropertyValue( + this.kclMessageDrivenChannelAdapter, + "scheduler.metricsConfig.metricsLevel", + MetricsLevel.class + ); + assertThat(metricsLevel).isEqualTo(MetricsLevel.NONE); + } + + @Test + public void metricsFactoryOfSchedulerShouldBeSetNullMetricsFactoryIfMetricsLevelIsNone() { + MetricsFactory metricsFactory = + TestUtils.getPropertyValue( + this.kclMessageDrivenChannelAdapter, + "scheduler.metricsFactory", + MetricsFactory.class + ); + assertThat(metricsFactory).isInstanceOf(NullMetricsFactory.class); + } + + @Test + public void maxLeasesForWorkerOverriddenByCustomizer() { + Integer maxLeasesForWorker = + TestUtils.getPropertyValue( + this.kclMessageDrivenChannelAdapter, + "scheduler.leaseCoordinator.leaseTaker.maxLeasesForWorker", + Integer.class + ); + assertThat(maxLeasesForWorker).isEqualTo(10); + } + + @Test + public void shardConsumerDispatchPollIntervalMillisOverriddenByCustomizer() { + Long shardConsumerDispatchPollIntervalMillis = + TestUtils.getPropertyValue( + this.kclMessageDrivenChannelAdapter, + "scheduler.shardConsumerDispatchPollIntervalMillis", + Long.class + ); + assertThat(shardConsumerDispatchPollIntervalMillis).isEqualTo(500L); + } + + @Test + public void pollingMaxRecordsIsPropagated() { + Integer maxRecords = + TestUtils.getPropertyValue( + this.kclMessageDrivenChannelAdapter, + "scheduler.retrievalConfig.retrievalSpecificConfig.maxRecords", + Integer.class + ); + assertThat(maxRecords).isEqualTo(99); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class TestConfiguration { + + @Bean + public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter(PollableChannel kinesisReceiveChannel) { + KclMessageDrivenChannelAdapter adapter = + new KclMessageDrivenChannelAdapter(AMAZON_KINESIS, CLOUD_WATCH, DYNAMO_DB, TEST_STREAM); + adapter.setOutputChannel(kinesisReceiveChannel); + adapter.setStreamInitialSequence( + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); + adapter.setConverter(String::new); + adapter.setConsumerGroup("single_stream_group"); + adapter.setLeaseTableName(LEASE_TABLE_NAME); + adapter.setFanOut(false); + adapter.setMetricsLevel(MetricsLevel.NONE); + adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig -> + leaseManagementConfig.maxLeasesForWorker(10)); + adapter.setCoordinatorConfigCustomizer(coordinatorConfig -> + coordinatorConfig.shardConsumerDispatchPollIntervalMillis(500L)); + adapter.setBindSourceRecord(true); + adapter.setEmptyRecordList(true); + adapter.setPollingMaxRecords(99); + adapter.setGracefulShutdownTimeout(100); + return adapter; + } + + @Bean + public PollableChannel kinesisReceiveChannel() { + return new QueueChannel(); + } + } + + /** + * Initialises the lease table to improve KCL initialisation time + */ + private static CompletableFuture initialiseLeaseTableFor(String leaseTableName) { + return DYNAMO_DB.createTable(CreateTableRequest.builder() + .tableName(leaseTableName) + .attributeDefinitions(AttributeDefinition.builder() + .attributeName("leaseKey") + .attributeType(ScalarAttributeType.S) + .build()) + .keySchema(KeySchemaElement.builder() + .attributeName("leaseKey") + .keyType(KeyType.HASH) + .build()) + .provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(1L) + .writeCapacityUnits(1L) + .build()) + .build()) + .thenCompose( + result -> DYNAMO_DB.waiter().waitUntilTableExists(request -> request.tableName(leaseTableName))) + .thenCompose(describeTableResponseWaiterResponse -> DYNAMO_DB.putItem(PutItemRequest.builder() + .tableName(leaseTableName) + .item(Map.of( + "leaseKey", AttributeValue.fromS("shardId-000000000000"), + "checkpoint", AttributeValue.fromS("TRIM_HORIZON"), + "leaseCounter", AttributeValue.fromN("1"), + "startingHashKey", AttributeValue.fromS("0"), + "ownerSwitchesSinceCheckpoint", AttributeValue.fromN("0"), + "checkpointSubSequenceNumber", AttributeValue.fromN("0") + )) + .build())); + } }