Skip to content

Commit

Permalink
Fix an issue in configuring idleTimeBetweenReadsInMillis in MultiLang…
Browse files Browse the repository at this point in the history
…Daemon

Fix an issue where the idleTimeBetweenReadInMillis configured
via MultiLangDaemon was not taking effect because it used
the auto-generated setter from Lombok to set the configured value,
while there is a custom setter that must be invoked to set the
value correctly.

There is also a general confusion between using Lombok's setter vs
custom setter in java.

Unifying the approach to use the custom Lombok-fluent-style setter
and deprecating the previously added custom setIdleTimeBetweenReadsInMillis

Correct way to configure idleTimeBetweenReadsInMillis for MultiLang is
to add this in the properties file:
idleTimeBetweenReadsInMillis = 10000 # 10 seconds

Correct way to configure for java:
configsBuilder.retrievalConfig().retrievalSpecificConfig(
    new PollingConfig(streamName, kinesisClient)
        .idleTimeBetweenReadsInMillis(Duration.ofSeconds(10).toMillis())

Issues: #999, #950, #515
  • Loading branch information
akidambisrinivasan committed Nov 22, 2023
1 parent a48f543 commit d39bde3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtilsBean;
Expand Down Expand Up @@ -103,12 +105,20 @@ public void testDefaultRetrievalConfig() {
public void testDefaultRetrievalConfigWithPollingConfigSet() {
MultiLangDaemonConfiguration configuration = baseConfiguration();
configuration.setMaxRecords(10);
configuration.setIdleTimeBetweenReadsInMillis(60000);

MultiLangDaemonConfiguration.ResolvedConfiguration resolvedConfiguration = configuration
.resolvedConfiguration(shardRecordProcessorFactory);

assertThat(resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig(),
instanceOf(PollingConfig.class));
assertEquals(10,
((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig()).maxRecords());
assertEquals(60000,
((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig())
.idleTimeBetweenReadsInMillis());
assertTrue(((PollingConfig) resolvedConfiguration.getRetrievalConfig().retrievalSpecificConfig())
.usePollingConfigIdleTimeValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -86,11 +88,15 @@ public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) {
* The value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}.
*
* If this is not set using {@link PollingConfig#idleTimeBetweenReadsInMillis},
* it defaults to 1500 ms.
*
* <p>
* Default value: 1000L
* Default value: 1500L
* </p>
*/
private long idleTimeBetweenReadsInMillis = 1000L;
@Setter(AccessLevel.NONE)
private long idleTimeBetweenReadsInMillis = 1500L;

/**
* Time to wait in seconds before the worker retries to get a record.
Expand Down Expand Up @@ -119,14 +125,23 @@ public PollingConfig(String streamName, KinesisAsyncClient kinesisClient) {
*/
private RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory();

/**
* @Deprecated Use {@link PollingConfig#idleTimeBetweenReadsInMillis} instead
*/
@Deprecated
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
}

/**
* Set the value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
* {@link RecordsFetcherFactory} will be used.
*/
public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
usePollingConfigIdleTimeValue = true;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
}

/**
Expand Down

0 comments on commit d39bde3

Please sign in to comment.