Skip to content

Commit

Permalink
Add support for configurable HTTP retryDelayMultiplier to GCP Sink (#60)
Browse files Browse the repository at this point in the history
* Add support for configurable retryDelayMultiplier to GCP Sink

* scalafmt issues

* PR comments pt.1

* PR comments pt.2

* fix remaining tests, pruning non-HTTP retry configs

* fix remaining testcases
  • Loading branch information
GoMati-MU authored Jul 31, 2024
1 parent 9953f00 commit f6f7d54
Show file tree
Hide file tree
Showing 24 changed files with 144 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ public class RetryConfig {
* measured in milliseconds.
*/
private long retryIntervalMillis;

/**
* Delay Multiplier when Retrying
* This value controls the change in delay before the next retry or poll.
* The retry delay of the previous call is multiplied by the RetryDelayMultiplier
* to calculate the retry delay for the next call.
*/
private double retryDelayMultiplier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,29 @@ public interface ConfigSource {
Optional<String> getString(String key);

/**
* Retrieves a String property value associated with the given key.
* Retrieves an Int property value associated with the given key.
*
* @param key the property key
* @return an {@link Optional} containing the property value if present, otherwise empty
*/
Optional<Integer> getInt(String key);

/**
* Retrieves a String property value associated with the given key.
* Retrieves a Long property value associated with the given key.
*
* @param key the property key
* @return an {@link Optional} containing the property value if present, otherwise empty
*/
Optional<Long> getLong(String key);

/**
* Retrieves a Double property value associated with the given key.
*
* @param key the property key
* @return an {@link Optional} containing the property value if present, otherwise empty
*/
Optional<Double> getDouble(String key);

/**
* Retrieves a Password property value associated with the given key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public Optional<Long> getLong(String key) {
return Optional.ofNullable(abstractConfig.getLong(key));
}

@Override
public Optional<Double> getDouble(String key) {
return Optional.ofNullable(abstractConfig.getDouble(key));
}

@Override
public Optional<Password> getPassword(String key) {
return Optional.ofNullable(abstractConfig.getPassword(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public Optional<Long> getLong(String key) {
return Optional.ofNullable((Long) wrapped.get(key));
}

@Override
public Optional<Double> getDouble(String key) {
return Optional.ofNullable((Double) wrapped.get(key));
}

@Override
public Optional<Password> getPassword(String key) {
return Optional.ofNullable((Password) wrapped.get(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.connect.gcp.common.auth;

import static io.lenses.streamreactor.connect.gcp.common.config.GCPSettings.HTTP_BACKOFF_RETRY_MULTIPLIER_DEFAULT;
import static io.lenses.streamreactor.connect.gcp.common.config.GCPSettings.HTTP_ERROR_RETRY_INTERVAL_DEFAULT;
import static io.lenses.streamreactor.connect.gcp.common.config.GCPSettings.HTTP_NUMBER_OF_RETIRES_DEFAULT;

Expand All @@ -40,6 +41,7 @@ public class GCPConnectionConfig implements ConnectionConfig {
RetryConfig.builder()
.retryLimit(HTTP_NUMBER_OF_RETIRES_DEFAULT)
.retryIntervalMillis(HTTP_ERROR_RETRY_INTERVAL_DEFAULT)
.retryDelayMultiplier(HTTP_BACKOFF_RETRY_MULTIPLIER_DEFAULT)
.build();

@Builder.Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private static RetrySettings createRetrySettings(RetryConfig httpRetryConfig) {
return RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(httpRetryConfig.getRetryIntervalMillis()))
.setMaxRetryDelay(Duration.ofMillis(httpRetryConfig.getRetryIntervalMillis() * 5L))
.setRetryDelayMultiplier(httpRetryConfig.getRetryDelayMultiplier())
.setMaxAttempts(httpRetryConfig.getRetryLimit())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lenses.streamreactor.connect.gcp.common.config;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;

import cyclops.control.Either;
Expand All @@ -41,12 +42,14 @@ public class GCPSettings implements ConfigSettings<GCPConnectionConfig> {
private final String gcpQuotaProjectIdKey;
private final String hostKey;
private final String httpErrorRetryIntervalKey;
private final String httpErrorRetryTimeoutMultiplier;
private final String httpNbrOfRetriesKey;
private final String httpSocketTimeoutKey;
private final String httpConnectionTimeoutKey;

public static final Long HTTP_ERROR_RETRY_INTERVAL_DEFAULT = 50L;
public static final Integer HTTP_NUMBER_OF_RETIRES_DEFAULT = 5;
public static final Double HTTP_BACKOFF_RETRY_MULTIPLIER_DEFAULT = 3.0;
public static final Long HTTP_SOCKET_TIMEOUT_DEFAULT = 60000L;
public static final Long HTTP_CONNECTION_TIMEOUT_DEFAULT = 60000L;

Expand All @@ -62,6 +65,8 @@ public GCPSettings(ConnectorPrefix connectorPrefix) {
gcpQuotaProjectIdKey = connectorPrefix.prefixKey("gcp.quota.project.id");
hostKey = connectorPrefix.prefixKey("endpoint");
httpErrorRetryIntervalKey = connectorPrefix.prefixKey("http.retry.interval");
httpErrorRetryTimeoutMultiplier =
connectorPrefix.prefixKey("http.retry.timeout.multiplier");
httpNbrOfRetriesKey = connectorPrefix.prefixKey("http.max.retries");
httpSocketTimeoutKey = connectorPrefix.prefixKey("http.socket.timeout");
httpConnectionTimeoutKey = connectorPrefix.prefixKey("http.connection.timeout");
Expand Down Expand Up @@ -114,6 +119,16 @@ public ConfigDef withSettings(ConfigDef configDef) {
3,
ConfigDef.Width.LONG,
httpErrorRetryIntervalKey)
.define(
httpErrorRetryTimeoutMultiplier,
Type.DOUBLE,
HTTP_BACKOFF_RETRY_MULTIPLIER_DEFAULT,
ConfigDef.Importance.MEDIUM,
"This controls the change in delay before the next retry or poll",
"Error",
4,
ConfigDef.Width.LONG,
httpErrorRetryTimeoutMultiplier)
.define(
httpSocketTimeoutKey,
ConfigDef.Type.LONG,
Expand Down Expand Up @@ -143,7 +158,10 @@ public Either<ConfigException, GCPConnectionConfig> parseFromConfig(ConfigSource
val retryConfig =
new RetryConfig(
configSource.getInt(httpNbrOfRetriesKey).orElse(HTTP_NUMBER_OF_RETIRES_DEFAULT),
configSource.getLong(httpErrorRetryIntervalKey).orElse(HTTP_ERROR_RETRY_INTERVAL_DEFAULT));
configSource.getLong(httpErrorRetryIntervalKey).orElse(HTTP_ERROR_RETRY_INTERVAL_DEFAULT),
configSource.getDouble(httpErrorRetryTimeoutMultiplier).orElse(
HTTP_BACKOFF_RETRY_MULTIPLIER_DEFAULT)
);

val timeoutConfig =
new HttpTimeoutConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.google.cloud.ServiceOptions.Builder;
import java.io.IOException;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -62,15 +63,26 @@ void testConfigure_withHostAndProjectIdConfigured() throws IOException {

@Test
void testConfigure_withRetrySettingsConfigured() throws IOException {
RetryConfig retryConfig = RetryConfig.builder().retryIntervalMillis(1000).retryLimit(3).build();
int retryIntervalMillis = 1000;
int maxRetryIntervalMillis = 5 * retryIntervalMillis;
int retryLimit = 3;
double retryDelayMultiplier = 1.5;

RetryConfig retryConfig =
RetryConfig.builder()
.retryIntervalMillis(retryIntervalMillis)
.retryLimit(retryLimit)
.retryDelayMultiplier(retryDelayMultiplier)
.build();

val config = configBuilder.httpRetryConfig(retryConfig).build();

val builder = createMockBuilder();

GCPServiceBuilderConfigurer.configure(config, builder);

assertRetrySettingsConfigured(builder, 1000, 5000, 3);
assertRetrySettingsConfigured(builder, retryIntervalMillis,
maxRetryIntervalMillis, retryLimit, retryDelayMultiplier);
}

@Test
Expand Down Expand Up @@ -107,6 +119,7 @@ void testConfigure_withEmptyConfig() throws IOException {
.setInitialRetryDelay(Duration.ofMillis(50))
.setMaxRetryDelay(Duration.ofMillis(250))
.setMaxAttempts(5)
.setRetryDelayMultiplier(3.0)
.build());
verify(builder, times(1)).setCredentials(NoCredentials.getInstance());
verify(builder, never()).setTransportOptions(any());
Expand All @@ -123,19 +136,21 @@ private void assertHostAndProjectIdConfigured(
}

private void assertRetrySettingsConfigured(
ServiceOptions.Builder<?, ?, ?> builder,
Builder<?, ?, ?> builder,
long expectedInitialRetryDelay,
long expectedMaxRetryDelay,
int expectedMaxAttempts) {
int expectedMaxAttempts,
double retryDelayMultiplier) {
ArgumentCaptor<RetrySettings> retrySettingsCaptor =
ArgumentCaptor.forClass(RetrySettings.class);
verify(builder).setRetrySettings(retrySettingsCaptor.capture());

RetrySettings capturedRetrySettings = retrySettingsCaptor.getValue();
assertNotNull(capturedRetrySettings);
assertEquals(1000, capturedRetrySettings.getInitialRetryDelay().toMillis());
assertEquals(5000, capturedRetrySettings.getMaxRetryDelay().toMillis());
assertEquals(3, capturedRetrySettings.getMaxAttempts());
assertEquals(expectedInitialRetryDelay, capturedRetrySettings.getInitialRetryDelay().toMillis());
assertEquals(expectedMaxRetryDelay, capturedRetrySettings.getMaxRetryDelay().toMillis());
assertEquals(expectedMaxAttempts, capturedRetrySettings.getMaxAttempts());
assertEquals(retryDelayMultiplier, capturedRetrySettings.getRetryDelayMultiplier());
}

private void assertTransportOptionsConfigured(
Expand All @@ -148,7 +163,7 @@ private void assertTransportOptionsConfigured(

HttpTransportOptions capturedTransportOptions = transportOptionsCaptor.getValue();
assertNotNull(capturedTransportOptions);
assertEquals(5000, capturedTransportOptions.getReadTimeout());
assertEquals(3000, capturedTransportOptions.getConnectTimeout());
assertEquals(expectedReadTimeout, capturedTransportOptions.getReadTimeout());
assertEquals(expectedConnectTimeout, capturedTransportOptions.getConnectTimeout());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,24 @@ class GCPSettingsTest {
private static final GCPSettings gcpSettings = new GCPSettings(connectorPrefix);

private static Stream<Arguments> provideRetryConfigData() {
double defaultMultiplier = 1.0;
return Stream.of(
Arguments.of("no values", 0, 0L, new RetryConfig(0, 0L)),
Arguments.of("retry limit only", 10, 0L, new RetryConfig(10, 0L)),
Arguments.of("interval only", 0, 20L, new RetryConfig(0, 20L)),
Arguments.of("retry limit and interval", 30, 40L, new RetryConfig(30, 40L)));
Arguments.of("no values", 0, 0L, defaultMultiplier, new RetryConfig(0, 0L, defaultMultiplier)),
Arguments.of("retry limit only", 10, 0L, defaultMultiplier, new RetryConfig(10, 0L, defaultMultiplier)),
Arguments.of("interval only", 0, 20L, defaultMultiplier, new RetryConfig(0, 20L, defaultMultiplier)),
Arguments.of("retry limit and interval", 30, 40L, defaultMultiplier, new RetryConfig(30, 40L,
defaultMultiplier)),
Arguments.of("limit, interval and multiplier", 30, 40L, 10.0, new RetryConfig(30, 40L, 10.0)));
}

@ParameterizedTest(name = "{0}")
@MethodSource("provideRetryConfigData")
void testHttpRetryConfig(String testName, Object retries, Object interval, RetryConfig expected) {
void testHttpRetryConfig(String testName, Object retries, Object interval, Double multiplier, RetryConfig expected) {
val configMap =
new MapConfigSource(
Map.of(
"connect.gcpstorage.http.max.retries", retries,
"connect.gcpstorage.http.retry.timeout.multiplier", multiplier,
"connect.gcpstorage.http.retry.interval", interval));

val optionalRetryConfig = EitherValues.getRight(gcpSettings.parseFromConfig(configMap)).getHttpRetryConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
compressionCodec = compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

Expand Down Expand Up @@ -173,7 +173,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

Expand Down Expand Up @@ -241,7 +241,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

Expand Down Expand Up @@ -315,7 +315,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ object S3ConnectionConfig {
),
getString(props, CUSTOM_ENDPOINT),
getBoolean(props, ENABLE_VIRTUAL_HOST_BUCKETS).getOrElse(false),
new RetryConfig(
getInt(props, HTTP_NBR_OF_RETRIES).getOrElse(HTTP_NBR_OF_RETIRES_DEFAULT),
getLong(props, HTTP_ERROR_RETRY_INTERVAL).getOrElse(HTTP_ERROR_RETRY_INTERVAL_DEFAULT),
),
RetryConfig.builder.retryLimit(getInt(props, HTTP_NBR_OF_RETRIES).getOrElse(HTTP_NBR_OF_RETIRES_DEFAULT))
.retryIntervalMillis(getLong(props, HTTP_ERROR_RETRY_INTERVAL).getOrElse(HTTP_ERROR_RETRY_INTERVAL_DEFAULT))
.build(),
HttpTimeoutConfig(
getInt(props, HTTP_SOCKET_TIMEOUT),
getLong(props, HTTP_CONNECTION_TIMEOUT),
Expand All @@ -79,9 +78,10 @@ case class S3ConnectionConfig(
accessKey: Option[String],
secretKey: Option[String],
authMode: AuthMode,
customEndpoint: Option[String] = None,
enableVirtualHostBuckets: Boolean = false,
httpRetryConfig: RetryConfig = new RetryConfig(HTTP_NBR_OF_RETIRES_DEFAULT, HTTP_ERROR_RETRY_INTERVAL_DEFAULT),
timeouts: HttpTimeoutConfig = HttpTimeoutConfig(None, None),
connectionPoolConfig: Option[ConnectionPoolConfig] = Option.empty,
customEndpoint: Option[String] = None,
enableVirtualHostBuckets: Boolean = false,
httpRetryConfig: RetryConfig =
RetryConfig.builder.retryIntervalMillis(HTTP_ERROR_RETRY_INTERVAL_DEFAULT).retryLimit(HTTP_NBR_OF_RETIRES_DEFAULT).build(),
timeouts: HttpTimeoutConfig = HttpTimeoutConfig(None, None),
connectionPoolConfig: Option[ConnectionPoolConfig] = Option.empty,
) extends ConnectionConfig
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class S3ConfigTest extends AnyFlatSpec with Matchers with LazyLogging {

val retryValuesMap = Table[String, Any, Any, RetryConfig](
("testName", "retries", "interval", "result"),
("noret-noint", 0, 0, new RetryConfig(0, 0)),
("ret-and-int", 1, 2, new RetryConfig(1, 2)),
("noret-noint-strings", "0", "0", new RetryConfig(0, 0)),
("ret-and-int-strings", "1", "2", new RetryConfig(1, 2)),
("noret-noint", 0, 0, new RetryConfig(0, 0, 0.0)),
("ret-and-int", 1, 2, new RetryConfig(1, 2, 0.0)),
("noret-noint-strings", "0", "0", new RetryConfig(0, 0, 0.0)),
("ret-and-int-strings", "1", "2", new RetryConfig(1, 2, 0.0)),
)

"S3Config" should "set http retry config" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
AuthMode.Credentials,
Some("endpoint"),
false,
new RetryConfig(5, 50),
new RetryConfig(5, 50, 0.0),
HttpTimeoutConfig(Some(60000), Some(60000)),
None,
),
Expand Down Expand Up @@ -79,7 +79,7 @@ class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers {
AuthMode.Credentials,
Some("endpoint"),
false,
new RetryConfig(5, 50),
new RetryConfig(5, 50, 0.0),
HttpTimeoutConfig(Some(60000), Some(60000)),
None,
),
Expand Down
Loading

0 comments on commit f6f7d54

Please sign in to comment.