Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-249 Added properties for handling backpressure in KplMessageHandler #250

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
377dbe6
GH-249 Added properties for handling backpressure in KplMessageHandle…
siddharthjain210 Dec 8, 2024
3d7de0f
GH-249 Addressing code review comments related to code style.
siddharthjain210 Dec 14, 2024
4e3b571
GH-249 Addressed smart handling when KPL Buffer is at max capacity. I…
siddharthjain210 Dec 14, 2024
99e4d2d
GH-249 Code Review comments address. Updated Tests, javadocs. Added l…
siddharthjain210 Dec 21, 2024
512fa18
GH-249 Addressing consistency related comments in the javadocs.
siddharthjain210 Dec 21, 2024
af06a15
GH-249 Removing KPL Native Mentions in the java docs for KPL Message …
siddharthjain210 Dec 22, 2024
7daa438
GH-249 Handling the sleep duration inside the while loop.
siddharthjain210 Dec 22, 2024
35f0d9c
GH-249 Addressing code review comments, handling checkstyle failures.
siddharthjain210 Dec 22, 2024
9fb17f3
GH-249 Renamed maxInFlightRecordsInitBackoffDuration to maxInFlightRe…
siddharthjain210 Dec 22, 2024
fc96e7e
GH-249 Removing Blank lines from Java doc in KPL Message Handler.
siddharthjain210 Dec 24, 2024
3e6d178
GH-249 Using mockito mock method that doesnt require args,
siddharthjain210 Dec 27, 2024
33657f7
GH-249 Simplication of backpressure handling, introduced KPLBackpress…
siddharthjain210 Dec 29, 2024
4d4fcf8
GH-249 Javadoc corrections.
siddharthjain210 Dec 29, 2024
186314e
GH-249 Code Review comments addressed. Added Test cases with RetryAdv…
siddharthjain210 Dec 29, 2024
3c442c7
GH-249 Added Javadoc for KplBackpressureException in the KplMessageHa…
siddharthjain210 Dec 29, 2024
b7ae296
GH-249 Corrected all java docs in KplMessageHandler
siddharthjain210 Dec 29, 2024
25f98e6
Revert "GH-249 Corrected all java docs in KplMessageHandler"
siddharthjain210 Dec 31, 2024
974d0fc
GH-249 Javadoc related code review actions fixed.
siddharthjain210 Dec 31, 2024
51027aa
GH-249 Updated Copyright and Javadoc related comments.
siddharthjain210 Jan 2, 2025
12f7572
GH-249 Updated Copyright and Javadoc related comments. Revert in Abst…
siddharthjain210 Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>

</module>

<!-- Enable this in the future to restrict a Line Length Check.-->
<!-- <module name="LineLength">-->
<!-- <property name="max" value="120"/>-->
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
<!-- </module>-->
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
*
* @author Arnaud Lecollaire
* @author Artem Bilan
* @author Siddharth Jain
*
* @since 2.2
*
Expand Down Expand Up @@ -99,6 +100,14 @@ public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implement

private volatile ScheduledFuture<?> flushFuture;

private long maxInFlightRecords = 0;

private int maxInFlightRecordsBackoffDuration = 100;

private int maxInFlightRecordsBackoffRate = 2;

private int maxInFlightRecordsBackoffMaxAttempts = 3;

public KplMessageHandler(KinesisProducer kinesisProducer) {
Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null.");
this.kinesisProducer = kinesisProducer;
Expand All @@ -115,8 +124,60 @@ public void setConverter(Converter<Object, byte[]> converter) {
setMessageConverter(new ConvertingFromMessageConverter(converter));
}

/**
* When in KPL mode, the setting allows handling backpressure on the KPL native process.
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
* Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in
* maxRecordInFlightsSleepDurationInMillis.
*
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
* @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled.
* Specify a positive value to enable back pressure.
* @since 3.0.9
*/
public void setMaxRecordsInFlight(long maxRecordsInFlight) {
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0.");
this.maxInFlightRecords = maxRecordsInFlight;
}

/**
* The setting allows handling backpressure on the KPL native process.
* Enabled when maxOutstandingRecordsCount is greater than 0.
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
* The configurations puts the KPL Thread to sleep for the specified number of milliseconds.
*
* @param maxInFlightRecordsBackoffDuration Default is 100ms.
* @since 3.0.9
*/
public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) {
Assert.isTrue(maxInFlightRecordsBackoffDuration > 0,
"'maxRecordInFlightsSleepDurationInMillis must be greater than 0.");
this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration;
}

/**
* The setting allows handling backpressure on the KPL native process using exponential retry.
*
* @param maxInFlightRecordsBackoffRate The property enables a back off rate to
* * define the exponential retry duration defined in setMaxInFlightRecordsBackoffDuration. Default is 2
* @since 3.0.9
*/
public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) {
this.maxInFlightRecordsBackoffRate = maxInFlightRecordsBackoffRate;
}

/**
* The setting allows handling backpressure on the KPL native process using exponential retry. On attempts
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
* exhausted, RunTimeException is thrown.
*
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
* @param maxInFlightRecordsBackoffMaxAttempts When specified, maxInFlightRecordsBackoffMaxAttempts defines the
* maximum of exponential retry attempts to wait until the KPL Buffer clears out.
* @since 3.0.9
*/
public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) {
this.maxInFlightRecordsBackoffMaxAttempts = maxInFlightRecordsBackoffMaxAttempts;
}

/**
* Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record.
*
* @param messageConverter the {@link MessageConverter} to use.
* @since 2.3
*/
Expand Down Expand Up @@ -368,11 +429,50 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
}

private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
if (this.maxInFlightRecords != -1) {
waitForCapacityInUserRecordsBuffer();
}

ListenableFuture<UserRecordResult> recordResult = this.kinesisProducer.addUserRecord(userRecord);
return listenableFutureToCompletableFuture(recordResult)
.thenApply(UserRecordResponse::new);
}

private void waitForCapacityInUserRecordsBuffer() {
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
var kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount();
var attempts = 1;
var sleepDuration = this.maxInFlightRecordsBackoffDuration;
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
while (kplOutstandingRecordsCount >= this.maxInFlightRecords &&
attempts <= this.maxInFlightRecordsBackoffMaxAttempts) {
try {
logger.info("Buffer size: {} has reached the max records limit of {}. Attempt# {}".formatted(
kplOutstandingRecordsCount, this.maxInFlightRecords));
logger.info("Buffer sleeping for {} ms".formatted(
this.maxInFlightRecordsBackoffDuration));
Thread.sleep(this.maxInFlightRecordsBackoffDuration);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
finally {
sleepDuration = sleepDuration * this.maxInFlightRecordsBackoffRate;
attempts++;
kplOutstandingRecordsCount = this.kinesisProducer.getOutstandingRecordsCount();
}
}

if (kplOutstandingRecordsCount < this.maxInFlightRecords) {
logger.info("Buffer cleared on number of attempts: {}".formatted(attempts));
return;
}

if (attempts > this.maxInFlightRecordsBackoffMaxAttempts) {
logger.error("Buffer not cleared after maximum {} number of attempts & {} sleepDuration".formatted(attempts,
sleepDuration));
throw new RuntimeException("KPL Buffer already at max capacity.");
}
}

private PutRecordRequest buildPutRecordRequest(Message<?> message) {
Object payload = message.getPayload();

Expand Down Expand Up @@ -403,7 +503,8 @@ private PutRecordRequest buildPutRecordRequest(Message<?> message) {
if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
}
Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. "
Assert.state(partitionKey != null,
"'partitionKey' must not be null for sending a Kinesis record. "
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
+ "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " +
"or supply an 'aws_partitionKey' message header.");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.aws.outbound;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecord;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;


/**
* @author Siddharth Jain
* @since 3.0.9
*/
@SpringJUnitConfig
@DirtiesContext
public class KplMessageHandlerTests {

@Autowired
protected KinesisProducer kinesisProducer;

@Autowired
protected MessageChannel kinesisSendChannel;

@Autowired
protected KplMessageHandler kplMessageHandler;

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_success() {
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
.build();


ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);

this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());

UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("foo");
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_backpressure_capacity_available_Test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxRecordsInFlight(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(1);
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
.build();


ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);

this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount();
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("foo");
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_backpressure_insuff_capacity_test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxRecordsInFlight(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(3)
.willReturn(2)
.willReturn(1)
.willReturn(0);
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
.build();


ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);

this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
verify(this.kinesisProducer, Mockito.times(3)).getOutstandingRecordsCount();
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("foo");
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_backpressure_failure_test() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock(ListenableFuture.class));
this.kplMessageHandler.setMaxRecordsInFlight(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffDuration(100);
this.kplMessageHandler.setMaxInFlightRecordsBackoffMaxAttempts(2);
this.kplMessageHandler.setMaxInFlightRecordsBackoffRate(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(5)
.willReturn(4)
.willReturn(3);
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
.build();

ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);

try {
this.kinesisSendChannel.send(message);
}
catch (Exception ex) {
assertThat(ex).isNotNull();
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
assertThat(ex.getCause()).isNotNull();
assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class);
assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class);
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
assertThat(ex.getCause().getMessage()).isEqualTo("KPL Buffer already at max capacity.");
}

verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class));
verify(this.kinesisProducer, Mockito.times(3)).getOutstandingRecordsCount();
}

@AfterEach
public void tearDown() {
clearInvocations(this.kinesisProducer);
}

@Configuration
@EnableIntegration
public static class ContextConfiguration {

@Bean
public KinesisProducer kinesisProducer() {
return mock(KinesisProducer.class);
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
kplMessageHandler.setAsync(true);
kplMessageHandler.setStream("foo");
kplMessageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper("foo"));
siddharthjain210 marked this conversation as resolved.
Show resolved Hide resolved
return kplMessageHandler;
}
}

}
Loading