Skip to content

Commit

Permalink
GH-249 Code Review comments address. Updated Tests, javadocs. Added l…
Browse files Browse the repository at this point in the history
…inks and references to the properties. Removed commented code from checkstyle.
  • Loading branch information
siddharthjain210 committed Dec 21, 2024
1 parent 4e3b571 commit 99e4d2d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 29 deletions.
5 changes: 0 additions & 5 deletions src/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,4 @@
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
</module>

<!-- Enable this in the future to restrict a Line Length Check.-->
<!-- <module name="LineLength">-->
<!-- <property name="max" value="120"/>-->
<!-- </module>-->
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -125,51 +125,58 @@ public void setConverter(Converter<Object, byte[]> converter) {
}

/**
* When in KPL mode, the setting allows handling backpressure on the KPL native process.
* Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in
* maxRecordInFlightsSleepDurationInMillis.
*
* Configure maximum records in flight on the KPL Native Process for handling backpressure. Used in conjuction
* with {@link KplMessageHandler#maxInFlightRecordsBackoffDuration}
* @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled.
* Specify a positive value to enable back pressure.
* @since 3.0.9
* @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsBackoffRate
* @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts
*/
public void setMaxRecordsInFlight(long maxRecordsInFlight) {
Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0.");
this.maxInFlightRecords = maxRecordsInFlight;
}

/**
* The setting allows handling backpressure on the KPL native process.
* Enabled when maxOutstandingRecordsCount is greater than 0.
* The configurations puts the KPL Thread to sleep for the specified number of milliseconds.
*
* @param maxInFlightRecordsBackoffDuration Default is 100ms.
* Configure a backoff duration period in milliseconds when the number of records in flight in KPL Native Process
* is greater than or equal to {@link KplMessageHandler#maxInFlightRecords}. The configuration helps in handling
* backpressure by sleeping the KPL Thread using exponential backoff. Enabled when
* {@link KplMessageHandler#maxInFlightRecords} is greater than 0.
* @param maxInFlightRecordsBackoffDuration Initial backoff duration in milliseconds. Default is 100ms.
* @since 3.0.9
* @see KplMessageHandler#setMaxRecordsInFlight
* @see KplMessageHandler#setMaxInFlightRecordsBackoffRate
* @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts
*/
public void setMaxInFlightRecordsBackoffDuration(int maxInFlightRecordsBackoffDuration) {
Assert.isTrue(maxInFlightRecordsBackoffDuration > 0,
"'maxRecordInFlightsSleepDurationInMillis must be greater than 0.");
"'maxInFlightRecordsBackoffDuration must be greater than 0.");
this.maxInFlightRecordsBackoffDuration = maxInFlightRecordsBackoffDuration;
}

/**
* The setting allows handling backpressure on the KPL native process using exponential retry.
*
* @param maxInFlightRecordsBackoffRate The property enables a back off rate to
* * define the exponential retry duration defined in setMaxInFlightRecordsBackoffDuration. Default is 2
* Configure exponential back off rate when handling backpressure on the KPL Native process using
* {@link KplMessageHandler#maxInFlightRecords}.
* @param maxInFlightRecordsBackoffRate Exponential back off rate. Default is 2
* @since 3.0.9
* @see KplMessageHandler#setMaxRecordsInFlight
* @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsBackoffMaxAttempts
*/
public void setMaxInFlightRecordsBackoffRate(int maxInFlightRecordsBackoffRate) {
this.maxInFlightRecordsBackoffRate = maxInFlightRecordsBackoffRate;
}

/**
* The setting allows handling backpressure on the KPL native process using exponential retry. On attempts
* exhausted, RunTimeException is thrown.
*
* @param maxInFlightRecordsBackoffMaxAttempts When specified, maxInFlightRecordsBackoffMaxAttempts defines the
* maximum of exponential retry attempts to wait until the KPL Buffer clears out.
* Configure maximum number of retry attempts with exponential backoff until there is a capacity in the KPL
* native process using. On maximum attempts exhausted, RunTimeException is thrown.
* @param maxInFlightRecordsBackoffMaxAttempts maximum of exponential retry attempts to waiting for capacity in KPL
* buffer.
* @since 3.0.9
* @see KplMessageHandler#setMaxRecordsInFlight
* @see KplMessageHandler#setMaxInFlightRecordsBackoffDuration
* @see KplMessageHandler#setMaxInFlightRecordsBackoffRate
*/
public void setMaxInFlightRecordsBackoffMaxAttempts(int maxInFlightRecordsBackoffMaxAttempts) {
this.maxInFlightRecordsBackoffMaxAttempts = maxInFlightRecordsBackoffMaxAttempts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
Expand All @@ -45,7 +44,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;


/**
* @author Siddharth Jain
* @since 3.0.9
Expand Down Expand Up @@ -183,7 +181,6 @@ void testKPLMessageHandler_raw_payload_backpressure_failure_test() {
assertThat(ex).isNotNull();
assertThat(ex.getCause()).isNotNull();
assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class);
assertThat(ex.getCause().getClass()).isEqualTo(RuntimeException.class);
assertThat(ex.getCause().getMessage()).isEqualTo("KPL Buffer already at max capacity.");
}

Expand Down Expand Up @@ -211,7 +208,6 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
kplMessageHandler.setAsync(true);
kplMessageHandler.setStream("foo");
kplMessageHandler.setEmbeddedHeadersMapper(new EmbeddedJsonHeadersMessageMapper("foo"));
return kplMessageHandler;
}
}
Expand Down

0 comments on commit 99e4d2d

Please sign in to comment.