From 99e4d2d2b2340f60f1c5dda80a751de5b149b957 Mon Sep 17 00:00:00 2001 From: Siddharth Jain Date: Sat, 21 Dec 2024 13:30:20 +0530 Subject: [PATCH] GH-249 Code Review comments address. Updated Tests, javadocs. Added links and references to the properties. Removed commented code from checkstyle. --- src/checkstyle/checkstyle.xml | 5 -- .../aws/outbound/KplMessageHandler.java | 47 +++++++++++-------- .../aws/outbound/KplMessageHandlerTests.java | 4 -- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index 194131d..afabea2 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -178,9 +178,4 @@ - - - - - diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 27a319f..fb95cd8 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -125,13 +125,13 @@ public void setConverter(Converter converter) { } /** - * When in KPL mode, the setting allows handling backpressure on the KPL native process. - * Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in - * maxRecordInFlightsSleepDurationInMillis. - * + * Configure maximum records in flight on the KPL Native Process for handling backpressure. Used in conjuction + * with {@link KplMessageHandler#maxInFlightRecordsBackoffDuration} * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. - * Specify a positive value to enable back pressure. * @since 3.0.9 + * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate + * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxRecordsInFlight(long maxRecordsInFlight) { Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0."); @@ -139,37 +139,44 @@ public void setMaxRecordsInFlight(long maxRecordsInFlight) { } /** - * The setting allows handling backpressure on the KPL native process. - * Enabled when maxOutstandingRecordsCount is greater than 0. - * The configurations puts the KPL Thread to sleep for the specified number of milliseconds. - * - * @param maxInFlightRecordsBackoffDuration Default is 100ms. + * Configure a backoff duration period in milliseconds when the number of records in flight in KPL Native Process + * is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. The configuration helps in handling + * backpressure by sleeping the KPL Thread using exponential backoff. Enabled when + * {@link KplMessageHandler#maxInFlightRecords} is greater than 0. + * @param maxInFlightRecordsBackoffDuration Initial backoff duration in milliseconds. Default is 100ms. * @since 3.0.9 + * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate + * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) { Assert.isTrue(maxInFlightRecordsBackoffDuration > 0, - "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); + "'maxInFlightRecordsBackoffDuration must be greater than 0."); this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration; } /** - * The setting allows handling backpressure on the KPL native process using exponential retry. - * - * @param maxInFlightRecordsBackoffRate The property enables a back off rate to - * * define the exponential retry duration defined in setMaxInFlightRecordsBackoffDuration. Default is 2 + * Configure exponential back off rate when handling backpressure on the KPL Native process using + * {@link KplMessageHandler#maxInFlightRecords}. + * @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2 * @since 3.0.9 + * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts */ public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) { this.maxInFlightRecordsBackoffRate = maxInFlightRecordsBackoffRate; } /** - * The setting allows handling backpressure on the KPL native process using exponential retry. On attempts - * exhausted, RunTimeException is thrown. - * - * @param maxInFlightRecordsBackoffMaxAttempts When specified, maxInFlightRecordsBackoffMaxAttempts defines the - * maximum of exponential retry attempts to wait until the KPL Buffer clears out. + * Configure maximum number of retry attempts with exponential backoff until there is a capacity in the KPL + * native process using. On maximum attempts exhausted, RunTimeException is thrown. + * @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity in KPL + * buffer. * @since 3.0.9 + * @see KplMessageHandler#setMaxRecordsInFlight + * @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration + * @see KplMessageHandler#setMaxInFlightRecordsBackoffRate */ public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) { this.maxInFlightRecordsBackoffMaxAttempts = maxInFlightRecordsBackoffMaxAttempts; diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index ffc6287..f450c2f 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -30,7 +30,6 @@ import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.aws.support.AwsHeaders; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -45,7 +44,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; - /** * @author Siddharth Jain * @since 3.0.9 @@ -183,7 +181,6 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() { assertThat(ex).isNotNull(); assertThat(ex.getCause()).isNotNull(); assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); - assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class); assertThat(ex.getCause().getMessage()).isEqualTo("KPL Buffer already at max capacity."); } @@ -211,7 +208,6 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer); kplMessageHandler.setAsync(true); kplMessageHandler.setStream("foo"); - kplMessageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper("foo")); return kplMessageHandler; } }