Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/retry logic #22

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,29 @@ publishing {
}
}

tasks.withType(Test).configureEach {
doFirst {
jvmArgs = [
'--add-exports=java.base/sun.net.util=ALL-UNNAMED',
'--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED',
'--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED',
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.net=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/sun.nio.ch=ALL-UNNAMED',
'--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
'--add-opens=java.base/java.text=ALL-UNNAMED',
'--add-opens=java.base/java.time=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED'
]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
Expand All @@ -10,6 +11,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -46,13 +48,25 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
var writer = JsonStreamWriter
var writerBuilder = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();
.setExecutorProvider(executorProvider);

if (writerSettings.getRetrySettings() != null) {
var settings = writerSettings.getRetrySettings();
var retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
.setMaxAttempts(settings.getMaxRetryAttempts())
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
.build();

writerBuilder.setRetrySettings(retrySettings);
}
JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writerBuilder.build());
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
Expand All @@ -9,6 +10,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -45,6 +47,9 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();



var streamWriterBuilder = StreamWriter
.newBuilder(streamName, getClient())
.setMaxInflightRequests(this.writerSettings.getMaxInflightRequests())
Expand All @@ -56,6 +61,19 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setLocation(table.getProject())
.setWriterSchema(protoSchema);

if (writerSettings.getRetrySettings() != null) {
var settings = writerSettings.getRetrySettings();
var retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
.setMaxAttempts(settings.getMaxRetryAttempts())
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
.build();

streamWriterBuilder.setRetrySettings(retrySettings);
}

StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.vinted.flink.bigquery.model.config;

import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;

public class WriterRetrySettings implements Serializable {

private Duration initialRetryDelay;
private double retryDelayMultiplier;

private int maxRetryAttempts;

private Duration maxRetryDelay;

public Duration getInitialRetryDelay() {
return initialRetryDelay;
}

public void setInitialRetryDelay(Duration initialRetryDelay) {
this.initialRetryDelay = initialRetryDelay;
}

public double getRetryDelayMultiplier() {
return retryDelayMultiplier;
}

public void setRetryDelayMultiplier(double retryDelayMultiplier) {
this.retryDelayMultiplier = retryDelayMultiplier;
}

public int getMaxRetryAttempts() {
return maxRetryAttempts;
}

public void setMaxRetryAttempts(int maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
}

public Duration getMaxRetryDelay() {
return maxRetryDelay;
}

public void setMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public static WriterRetrySettingsBuilder newBuilder() {
return new WriterRetrySettingsBuilder();
}

public static final class WriterRetrySettingsBuilder implements Serializable {
private Duration initialRetryDelay = Duration.ofMillis(500);
private double retryDelayMultiplier = 1.1;

private int maxRetryAttempts = 5;

private Duration maxRetryDelay = Duration.ofMinutes(1);
private WriterRetrySettingsBuilder() {
}

public WriterRetrySettingsBuilder withInitialRetryDelay(Duration initialRetryDelay) {
this.initialRetryDelay = initialRetryDelay;
return this;
}

public WriterRetrySettingsBuilder withRetryDelayMultiplier(double retryDelayMultiplier) {
this.retryDelayMultiplier = retryDelayMultiplier;
return this;
}

public WriterRetrySettingsBuilder withMaxRetryAttempts(int maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
return this;
}

public WriterRetrySettingsBuilder withMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
return this;
}

public WriterRetrySettings build() {
WriterRetrySettings retrySettings = new WriterRetrySettings();
retrySettings.initialRetryDelay = this.initialRetryDelay;
retrySettings.retryDelayMultiplier = this.retryDelayMultiplier;
retrySettings.maxRetryAttempts = this.maxRetryAttempts;
retrySettings.maxRetryDelay = this.maxRetryDelay;
return retrySettings;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ public class WriterSettings implements Serializable {
private Duration timeout;
private int retryCount;
private Duration retryPause;
private Long maxInflightRequests;
private Long maxInflightBytes;
private long maxInflightRequests;
private long maxInflightBytes;
private Duration maxRetryDuration;

private Duration maxRequestWaitCallbackTime;
private Boolean enableConnectionPool;
private boolean enableConnectionPool;

private WriterRetrySettings retrySettings;

public int getStreamsPerTable() {
return streamsPerTable;
Expand Down Expand Up @@ -79,6 +81,14 @@ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
}

public WriterRetrySettings getRetrySettings() {
return retrySettings;
}

public void setRetrySettings(WriterRetrySettings retrySettings) {
this.retrySettings = retrySettings;
}

public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
Expand All @@ -91,6 +101,8 @@ public static final class WriterSettingsBuilder implements Serializable {
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;

private WriterRetrySettings retrySettings = null;

private WriterSettingsBuilder() {
}

Expand Down Expand Up @@ -135,7 +147,7 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
}

public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRetryDuration;
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
return this;
}

Expand All @@ -144,6 +156,11 @@ public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPo
return this;
}

public WriterSettingsBuilder withRetrySettings(WriterRetrySettings retrySettings) {
this.retrySettings = retrySettings;
return this;
}

public WriterSettings build() {
WriterSettings writerSettings = new WriterSettings();
writerSettings.writerThreads = this.writerThreads;
Expand All @@ -156,6 +173,7 @@ public WriterSettings build() {
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
writerSettings.retrySettings = this.retrySettings;
return writerSettings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount
);
var result = append(traceId, rows);
var callback = new AppendCallBack<>(this, result.writerId, traceId, rows, retryCount);
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(result.response, callback, appendExecutor);
inflightRequestCount.register();
} catch (AppendException exception) {
Expand Down Expand Up @@ -130,14 +130,12 @@ static class AppendCallBack<A> implements ApiFutureCallback<AppendRowsResponse>
private final BigQueryDefaultSinkWriter<A> parent;
private final Rows<A> rows;

private final String writerId;
private final String traceId;

private final int retryCount;

public AppendCallBack(BigQueryDefaultSinkWriter<A> parent, String writerId, String traceId, Rows<A> rows, int retryCount) {
public AppendCallBack(BigQueryDefaultSinkWriter<A> parent, String traceId, Rows<A> rows, int retryCount) {
this.parent = parent;
this.writerId = writerId;
this.traceId = traceId;
this.rows = rows;
this.retryCount = retryCount;
Expand All @@ -155,79 +153,8 @@ public void onSuccess(AppendRowsResponse result) {

@Override
public void onFailure(Throwable t) {
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
case ABORTED:
case UNAVAILABLE: {
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
break;
}
case INVALID_ARGUMENT:
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var data = rows.getData();
var first = data.subList(0, data.size() / 2);
var second = data.subList(data.size() / 2, data.size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
case UNKNOWN:
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
.ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
this.parent.inflightRequestCount.arriveAndDeregister();
}

private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}

private void doPauseBeforeRetry() {
try {
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Loading
Loading