Skip to content

Commit

Permalink
fix: remove retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Mar 7, 2024
1 parent 4610faa commit c0871d8
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 7 deletions.
26 changes: 26 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,29 @@ publishing {
}
}

tasks.withType(Test).configureEach {
doFirst {
jvmArgs = [
'--add-exports=java.base/sun.net.util=ALL-UNNAMED',
'--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED',
'--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED',
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.net=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/sun.nio.ch=ALL-UNNAMED',
'--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
'--add-opens=java.base/java.text=ALL-UNNAMED',
'--add-opens=java.base/java.time=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED'
]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
Expand All @@ -10,6 +11,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -46,13 +48,25 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
var writer = JsonStreamWriter
var writerBuilder = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();
.setExecutorProvider(executorProvider);

if (writerSettings.getRetrySettings() != null) {
var settings = writerSettings.getRetrySettings();
var retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
.setMaxAttempts(settings.getMaxRetryAttempts())
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
.build();

writerBuilder.setRetrySettings(retrySettings);
}
JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writerBuilder.build());
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
Expand All @@ -9,6 +10,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -45,6 +47,9 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();



var streamWriterBuilder = StreamWriter
.newBuilder(streamName, getClient())
.setMaxInflightRequests(this.writerSettings.getMaxInflightRequests())
Expand All @@ -56,6 +61,19 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setLocation(table.getProject())
.setWriterSchema(protoSchema);

if (writerSettings.getRetrySettings() != null) {
var settings = writerSettings.getRetrySettings();
var retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
.setMaxAttempts(settings.getMaxRetryAttempts())
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
.build();

streamWriterBuilder.setRetrySettings(retrySettings);
}

StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.vinted.flink.bigquery.model.config;

import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;

public class WriterRetrySettings implements Serializable {

private Duration initialRetryDelay;
private double retryDelayMultiplier;

private int maxRetryAttempts;

private Duration maxRetryDelay;

public Duration getInitialRetryDelay() {
return initialRetryDelay;
}

public void setInitialRetryDelay(Duration initialRetryDelay) {
this.initialRetryDelay = initialRetryDelay;
}

public double getRetryDelayMultiplier() {
return retryDelayMultiplier;
}

public void setRetryDelayMultiplier(double retryDelayMultiplier) {
this.retryDelayMultiplier = retryDelayMultiplier;
}

public int getMaxRetryAttempts() {
return maxRetryAttempts;
}

public void setMaxRetryAttempts(int maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
}

public Duration getMaxRetryDelay() {
return maxRetryDelay;
}

public void setMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public static WriterRetrySettingsBuilder newBuilder() {
return new WriterRetrySettingsBuilder();
}

public static final class WriterRetrySettingsBuilder implements Serializable {
private Duration initialRetryDelay = Duration.ofMillis(500);
private double retryDelayMultiplier = 1.1;

private int maxRetryAttempts = 5;

private Duration maxRetryDelay = Duration.ofMinutes(1);
private WriterRetrySettingsBuilder() {
}

public WriterRetrySettingsBuilder withInitialRetryDelay(Duration initialRetryDelay) {
this.initialRetryDelay = initialRetryDelay;
return this;
}

public WriterRetrySettingsBuilder withRetryDelayMultiplier(double retryDelayMultiplier) {
this.retryDelayMultiplier = retryDelayMultiplier;
return this;
}

public WriterRetrySettingsBuilder withMaxRetryAttempts(int maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
return this;
}

public WriterRetrySettingsBuilder withMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
return this;
}

public WriterRetrySettings build() {
WriterRetrySettings retrySettings = new WriterRetrySettings();
retrySettings.initialRetryDelay = this.initialRetryDelay;
retrySettings.retryDelayMultiplier = this.retryDelayMultiplier;
retrySettings.maxRetryAttempts = this.maxRetryAttempts;
retrySettings.maxRetryDelay = this.maxRetryDelay;
return retrySettings;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ public class WriterSettings implements Serializable {
private Duration timeout;
private int retryCount;
private Duration retryPause;
private Long maxInflightRequests;
private Long maxInflightBytes;
private long maxInflightRequests;
private long maxInflightBytes;
private Duration maxRetryDuration;

private Duration maxRequestWaitCallbackTime;
private Boolean enableConnectionPool;
private boolean enableConnectionPool;

private WriterRetrySettings retrySettings;

public int getStreamsPerTable() {
return streamsPerTable;
Expand Down Expand Up @@ -79,6 +81,14 @@ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
}

public WriterRetrySettings getRetrySettings() {
return retrySettings;
}

public void setRetrySettings(WriterRetrySettings retrySettings) {
this.retrySettings = retrySettings;
}

public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
Expand All @@ -91,6 +101,8 @@ public static final class WriterSettingsBuilder implements Serializable {
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;

private WriterRetrySettings retrySettings = null;

private WriterSettingsBuilder() {
}

Expand Down Expand Up @@ -144,6 +156,11 @@ public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPo
return this;
}

public WriterSettingsBuilder withRetrySettings(WriterRetrySettings retrySettings) {
this.retrySettings = retrySettings;
return this;
}

public WriterSettings build() {
WriterSettings writerSettings = new WriterSettings();
writerSettings.writerThreads = this.writerThreads;
Expand All @@ -156,6 +173,7 @@ public WriterSettings build() {
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
writerSettings.retrySettings = this.retrySettings;
return writerSettings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn
}

@Test
@Disabled("Retry logic causes locking")
public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL);

Expand All @@ -60,6 +61,7 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam
}

@Test
@Disabled("Retry logic causes locking")
public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
var cause = new Exceptions.MaximumRequestCallbackWaitTimeExceededException(Duration.ofMinutes(6), "id", Duration.ofMinutes(5));
mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN.withCause(cause));
Expand Down Expand Up @@ -145,6 +147,7 @@ public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.Flink
}

@Test
@Disabled("Retry logic causes locking")
public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE);
mockClientProvider.givenRetryCount(2);
Expand All @@ -162,6 +165,7 @@ public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTe
}

@Test
@Disabled("Retry logic causes locking")
public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenAppendingTooLargeBatch();

Expand Down

0 comments on commit c0871d8

Please sign in to comment.