Skip to content

Commit

Permalink
GH-247: Fix java.nio.ReadOnlyBufferException in KclMessageDrivenChann…
Browse files Browse the repository at this point in the history
…elAdapter

Fixes: #247
Issue link: #247


* Use `BinaryUtils.copyAllBytes` to fix `java.nio.ReadOnlyBufferException` thrown when processing batch records in `KclMessageDrivenAdapter`
* Improve test time by more than a couple minutes by initializing the lease table
  • Loading branch information
kelseyh authored Nov 8, 2024
1 parent 93fb5eb commit adb66e4
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ else if (KclMessageDrivenChannelAdapter.this.converter != null) {
partitionKeys.add(r.partitionKey());
sequenceNumbers.add(r.sequenceNumber());

return KclMessageDrivenChannelAdapter.this.converter.convert(r.data().array());
return KclMessageDrivenChannelAdapter.this.converter.convert(BinaryUtils.copyAllBytesFrom(r.data()));
})
.toList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,30 @@

package org.springframework.integration.aws.kinesis;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Consumer;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.metrics.MetricsFactory;
Expand All @@ -37,7 +51,9 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aws.LocalstackContainerTest;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
Expand All @@ -53,14 +69,15 @@
* @author Artem Bilan
* @author Siddharth Jain
* @author Minkyu Moon
*
* @since 3.0
*/
@SpringJUnitConfig
@DirtiesContext
public class KclMessageDrivenChannelAdapterTests implements LocalstackContainerTest {

private static final String TEST_STREAM = "TestStreamKcl";
public static final String LEASE_TABLE_NAME = "test_table";
public static final String TEST_DATA = "test data";

private static KinesisAsyncClient AMAZON_KINESIS;

Expand All @@ -80,10 +97,10 @@ static void setup() {
DYNAMO_DB = LocalstackContainerTest.dynamoDbClient();
CLOUD_WATCH = LocalstackContainerTest.cloudWatchClient();

AMAZON_KINESIS.createStream(request -> request.streamName(TEST_STREAM).shardCount(1))
.thenCompose(result ->
AMAZON_KINESIS.waiter().waitUntilStreamExists(request -> request.streamName(TEST_STREAM)))
.join();
CompletableFuture.allOf(
initialiseStream(TEST_STREAM),
initialiseLeaseTableFor(LEASE_TABLE_NAME)
).join();
}

@AfterAll
Expand All @@ -96,20 +113,36 @@ static void tearDown() {
}

@Test
void kclChannelAdapterReceivesRecords() {
String testData = "test data";
void kclChannelAdapterReceivesBatchedRecords() {
this.kclMessageDrivenChannelAdapter.setListenerMode(ListenerMode.batch);
this.kclMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.batch);

Message<?> received = verifyRecordReceived(TEST_DATA);
assertThat(received.getPayload()).isEqualTo(Collections.singletonList(TEST_DATA));
List<?> receivedSequences = received.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, List.class);
assertThat(receivedSequences).isNotEmpty();
}

@Test
void kclChannelAdapterReceivesSingleRecord() {

this.kclMessageDrivenChannelAdapter.setListenerMode(ListenerMode.record);
this.kclMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.record);

Message<?> receive = verifyRecordReceived(TEST_DATA);
assertThat(receive.getPayload()).isEqualTo(TEST_DATA);
assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA);
assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty();
}

private Message<?> verifyRecordReceived(String testData) {
AMAZON_KINESIS.putRecord(request ->
request.streamName(TEST_STREAM)
.data(SdkBytes.fromUtf8String(testData))
.partitionKey("test"));

// We need so long delay because KCL has a more than a minute setup phase.
Message<?> receive = this.kinesisReceiveChannel.receive(300_000);
Message<?> receive = this.kinesisReceiveChannel.receive(5000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(testData);
assertThat(receive.getHeaders()).containsKey(IntegrationMessageHeaderAccessor.SOURCE_DATA);
assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, String.class)).isNotEmpty();

List<Consumer> streamConsumers =
AMAZON_KINESIS.describeStream(r -> r.streamName(TEST_STREAM))
Expand All @@ -120,10 +153,11 @@ void kclChannelAdapterReceivesRecords() {
.consumers();

// Because FanOut is false, there would be no Stream Consumers.
assertThat(streamConsumers).hasSize(0);
assertThat(streamConsumers).isEmpty();

List<String> tableNames = DYNAMO_DB.listTables().join().tableNames();
assertThat(tableNames).contains("test_table");
assertThat(tableNames).contains(LEASE_TABLE_NAME);
return receive;
}

@Test
Expand Down Expand Up @@ -176,10 +210,10 @@ public void pollingMaxRecordsIsPropagated() {
public static class TestConfiguration {

@Bean
public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter(PollableChannel kinesisReceiveChannel) {
KclMessageDrivenChannelAdapter adapter =
new KclMessageDrivenChannelAdapter(AMAZON_KINESIS, CLOUD_WATCH, DYNAMO_DB, TEST_STREAM);
adapter.setOutputChannel(kinesisReceiveChannel());
adapter.setOutputChannel(kinesisReceiveChannel);
adapter.setStreamInitialSequence(
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
adapter.setConverter(String::new);
Expand All @@ -202,7 +236,45 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
public PollableChannel kinesisReceiveChannel() {
return new QueueChannel();
}
}

private static CompletableFuture<WaiterResponse<DescribeStreamResponse>> initialiseStream(String streamName) {
return AMAZON_KINESIS.createStream(request -> request.streamName(streamName).shardCount(1))
.thenCompose(
result -> AMAZON_KINESIS.waiter().waitUntilStreamExists(request -> request.streamName(streamName)));
}

/**
* Initialises the lease table to improve KCL initialisation time
*/
private static CompletableFuture<PutItemResponse> initialiseLeaseTableFor(String leaseTableName) {
return DYNAMO_DB.createTable(CreateTableRequest.builder()
.tableName(leaseTableName)
.attributeDefinitions(AttributeDefinition.builder()
.attributeName("leaseKey")
.attributeType(ScalarAttributeType.S)
.build())
.keySchema(KeySchemaElement.builder()
.attributeName("leaseKey")
.keyType(KeyType.HASH)
.build())
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(1L)
.writeCapacityUnits(1L)
.build())
.build())
.thenCompose(
result -> DYNAMO_DB.waiter().waitUntilTableExists(request -> request.tableName(leaseTableName)))
.thenCompose(describeTableResponseWaiterResponse -> DYNAMO_DB.putItem(PutItemRequest.builder()
.tableName(leaseTableName)
.item(Map.of(
"leaseKey", AttributeValue.fromS("shardId-000000000000"),
"checkpoint", AttributeValue.fromS("TRIM_HORIZON"),
"leaseCounter", AttributeValue.fromN("1"),
"startingHashKey", AttributeValue.fromS("0"),
"ownerSwitchesSinceCheckpoint", AttributeValue.fromN("0"),
"checkpointSubSequenceNumber", AttributeValue.fromN("0")
))
.build()));
}
}

0 comments on commit adb66e4

Please sign in to comment.