-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-249 Added properties for handling backpressure in KplMessageHandler #250
Closed
Closed
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
377dbe6
GH-249 Added properties for handling backpressure in KplMessageHandle…
siddharthjain210 3d7de0f
GH-249 Addressing code review comments related to code style.
siddharthjain210 4e3b571
GH-249 Addressed smart handling when KPL Buffer is at max capacity. I…
siddharthjain210 99e4d2d
GH-249 Code Review comments address. Updated Tests, javadocs. Added l…
siddharthjain210 512fa18
GH-249 Addressing consistency related comments in the javadocs.
siddharthjain210 af06a15
GH-249 Removing KPL Native Mentions in the java docs for KPL Message …
siddharthjain210 7daa438
GH-249 Handling the sleep duration inside the while loop.
siddharthjain210 35f0d9c
GH-249 Addressing code review comments, handling checkstyle failures.
siddharthjain210 9fb17f3
GH-249 Renamed maxInFlightRecordsInitBackoffDuration to maxInFlightRe…
siddharthjain210 fc96e7e
GH-249 Removing Blank lines from Java doc in KPL Message Handler.
siddharthjain210 3e6d178
GH-249 Using mockito mock method that doesnt require args,
siddharthjain210 33657f7
GH-249 Simplication of backpressure handling, introduced KPLBackpress…
siddharthjain210 4d4fcf8
GH-249 Javadoc corrections.
siddharthjain210 186314e
GH-249 Code Review comments addressed. Added Test cases with RetryAdv…
siddharthjain210 3c442c7
GH-249 Added Javadoc for KplBackpressureException in the KplMessageHa…
siddharthjain210 b7ae296
GH-249 Corrected all java docs in KplMessageHandler
siddharthjain210 25f98e6
Revert "GH-249 Corrected all java docs in KplMessageHandler"
siddharthjain210 974d0fc
GH-249 Javadoc related code review actions fixed.
siddharthjain210 51027aa
GH-249 Updated Copyright and Javadoc related comments.
siddharthjain210 12f7572
GH-249 Updated Copyright and Javadoc related comments. Revert in Abst…
siddharthjain210 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
src/main/java/org/springframework/integration/aws/support/KPLBackpressureException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Copyright 2017-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.aws.support; | ||
|
||
import org.springframework.messaging.MessagingException; | ||
|
||
/** | ||
* An exception triggered from {@link org.springframework.integration.aws.outbound.KplMessageHandler} while sending | ||
* records to kinesis when maximum number of records in flight exceeds the backpressure threshold. | ||
* @author Siddharth Jain | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @since 3.0.9 | ||
*/ | ||
public class KPLBackpressureException extends MessagingException { | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private static final long serialVersionUID = 1L; | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
public KPLBackpressureException(String message) { | ||
super(message); | ||
} | ||
} |
170 changes: 170 additions & 0 deletions
170
src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
/* | ||
* Copyright 2019-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.aws.outbound; | ||
|
||
import com.amazonaws.services.kinesis.producer.KinesisProducer; | ||
import com.amazonaws.services.kinesis.producer.UserRecord; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.ArgumentCaptor; | ||
import org.mockito.Mockito; | ||
|
||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.integration.annotation.ServiceActivator; | ||
import org.springframework.integration.aws.support.AwsHeaders; | ||
import org.springframework.integration.aws.support.KPLBackpressureException; | ||
import org.springframework.integration.config.EnableIntegration; | ||
import org.springframework.messaging.Message; | ||
import org.springframework.messaging.MessageChannel; | ||
import org.springframework.messaging.MessageHandler; | ||
import org.springframework.messaging.support.MessageBuilder; | ||
import org.springframework.test.annotation.DirtiesContext; | ||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.BDDMockito.given; | ||
import static org.mockito.Mockito.clearInvocations; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.verify; | ||
|
||
/** | ||
* @author Siddharth Jain | ||
* @since 3.0.9 | ||
*/ | ||
@SpringJUnitConfig | ||
@DirtiesContext | ||
public class KplMessageHandlerTests { | ||
|
||
@Autowired | ||
protected KinesisProducer kinesisProducer; | ||
|
||
@Autowired | ||
protected MessageChannel kinesisSendChannel; | ||
|
||
@Autowired | ||
protected KplMessageHandler kplMessageHandler; | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
void testKPLMessageHandler_raw_payload_success() { | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) | ||
.willReturn(mock()); | ||
final Message<?> message = MessageBuilder | ||
.withPayload("message1") | ||
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey") | ||
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") | ||
.setHeader("foo", "bar") | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.build(); | ||
|
||
|
||
ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor | ||
.forClass(UserRecord.class); | ||
|
||
this.kinesisSendChannel.send(message); | ||
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); | ||
verify(this.kinesisProducer, Mockito.times(0)).getOutstandingRecordsCount(); | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); | ||
assertThat(userRecord.getStreamName()).isEqualTo("foo"); | ||
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); | ||
assertThat(userRecord.getExplicitHashKey()).isNull(); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() { | ||
given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) | ||
.willReturn(mock()); | ||
this.kplMessageHandler.setBackPressureThreshold(2); | ||
given(this.kinesisProducer.getOutstandingRecordsCount()) | ||
.willReturn(1); | ||
final Message<?> message = MessageBuilder | ||
.withPayload("message1") | ||
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey") | ||
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") | ||
.setHeader("foo", "bar") | ||
.build(); | ||
|
||
|
||
ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor | ||
.forClass(UserRecord.class); | ||
|
||
this.kinesisSendChannel.send(message); | ||
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture()); | ||
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount(); | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue(); | ||
assertThat(userRecord.getStreamName()).isEqualTo("foo"); | ||
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey"); | ||
assertThat(userRecord.getExplicitHashKey()).isNull(); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() { | ||
given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) | ||
.willReturn(mock()); | ||
this.kplMessageHandler.setBackPressureThreshold(2); | ||
given(this.kinesisProducer.getOutstandingRecordsCount()) | ||
.willReturn(5); | ||
final Message<?> message = MessageBuilder | ||
.withPayload("message1") | ||
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey") | ||
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10") | ||
.setHeader("foo", "bar") | ||
.build(); | ||
|
||
try { | ||
this.kinesisSendChannel.send(message); | ||
} | ||
catch (Exception ex) { | ||
assertThat(ex).isNotNull(); | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertThat(ex.getCause()).isNotNull(); | ||
assertThat(ex.getCause().getClass()).isEqualTo(KPLBackpressureException.class); | ||
assertThat(ex.getCause().getMessage()).isEqualTo("Buffer already at max capacity."); | ||
} | ||
|
||
verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class)); | ||
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount(); | ||
} | ||
|
||
@AfterEach | ||
public void tearDown() { | ||
clearInvocations(this.kinesisProducer); | ||
} | ||
|
||
@Configuration | ||
@EnableIntegration | ||
public static class ContextConfiguration { | ||
|
||
@Bean | ||
public KinesisProducer kinesisProducer() { | ||
return mock(); | ||
} | ||
|
||
@Bean | ||
@ServiceActivator(inputChannel = "kinesisSendChannel") | ||
siddharthjain210 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { | ||
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); | ||
kplMessageHandler.setAsync(true); | ||
kplMessageHandler.setStream("foo"); | ||
return kplMessageHandler; | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we need the first part since you just nailed it with the second sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant remove
Defaulted to 0
.Doesn't sounds good and covered with the next sentence.