diff --git a/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java b/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java
index 65789c8..72c59a4 100644
--- a/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java
+++ b/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java
@@ -20,7 +20,6 @@ public void open(Configuration parameters) throws Exception {
@Override
public void process(K k, ProcessWindowFunction, K, W>.Context context, Iterable batch, Collector> out) throws Exception {
var table = getTable(batch.iterator().next());
-
var data = StreamSupport.stream(batch.spliterator(), false).collect(Collectors.toList());
var result = Rows.defaultStream(data, table);
out.collect(result);
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java b/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java
index 11fad7a..daa18b9 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java
@@ -9,9 +9,12 @@ public class AppendException extends RuntimeException {
private final Throwable error;
- public AppendException(String traceId, Rows> rows, Throwable error) {
+ private final int retryCount;
+
+ public AppendException(String traceId, Rows> rows, int retryCount, Throwable error) {
this.traceId = traceId;
this.rows = rows;
+ this.retryCount = retryCount;
this.error = error;
}
@@ -23,6 +26,10 @@ public Rows getRows() {
return (Rows) rows;
}
+ public int getRetryCount() {
+ return retryCount;
+ }
+
public Throwable getError() {
return error;
}
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
index bc006ee..573e81d 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
@@ -29,6 +29,7 @@ public abstract class BigQueryBufferedSinkWriter, BigQueryCommittable> {
private static final Logger logger = LoggerFactory.getLogger(BigQueryBufferedSinkWriter.class);
private Map streamOffsets = new ConcurrentHashMap<>();
+
public BigQueryBufferedSinkWriter(
Sink.InitContext sinkInitContext,
RowValueSerializer rowSerializer,
@@ -200,7 +201,7 @@ public AppendCallBack(BigQueryBufferedSinkWriter, ?> parent, Rows rows, int
@Override
public void onFailure(Throwable t) {
logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
- future.completeExceptionally(new AppendException(traceId, rows, t));
+ future.completeExceptionally(new AppendException(traceId, rows, retryCount, t));
}
@Override
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java
index 9351637..6786546 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java
@@ -43,7 +43,6 @@ protected ApiFuture append(String traceId, Rows rows) {
if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
}
-
logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
return writer.append(prows);
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
index fa2b1f9..195ebae 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
@@ -3,21 +3,20 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
-import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
-import io.grpc.Status;
-import org.apache.flink.api.connector.sink2.Sink;
import com.vinted.flink.bigquery.client.ClientProvider;
+import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import com.vinted.flink.bigquery.sink.AppendException;
import com.vinted.flink.bigquery.sink.BigQuerySinkWriter;
import com.vinted.flink.bigquery.sink.ExecutorProvider;
+import io.grpc.Status;
+import org.apache.flink.api.connector.sink2.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.Phaser;
-import java.util.function.Function;
public abstract class BigQueryDefaultSinkWriter
extends BigQuerySinkWriter {
@@ -56,15 +55,6 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(response, callback, appendExecutor);
inflightRequestCount.register();
- } catch (AppendException exception) {
- var error = exception.getError();
- var errorRows = exception.getRows();
- var errorTraceId = exception.getTraceId();
- var status = Status.fromThrowable(error);
- Function createLogMessage = (title) ->
- this.createLogMessage(title, errorTraceId, status, error, errorRows);
- logger.error(createLogMessage.apply("Non recoverable BigQuery stream error for:"), error);
- throw error;
} catch (Throwable t) {
logger.error("Non recoverable BigQuery stream error for:", t);
throw t;
@@ -106,34 +96,74 @@ public AppendCallBack(BigQueryDefaultSinkWriter parent, String traceId, Ro
this.retryCount = retryCount;
}
+ @Override
+ public void onSuccess(AppendRowsResponse result) {
+ Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> {
+ m.setBatchCount(rows.getData().size());
+ m.setOffset(result.getAppendResult().getOffset().getValue());
+ });
+ this.parent.inflightRequestCount.arriveAndDeregister();
+ }
+
+
@Override
public void onFailure(Throwable t) {
- logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
+ logger.error("Trace-id {} Received error {}", t.getMessage(), traceId);
+ var status = Status.fromThrowable(t);
+ switch (status.getCode()) {
+ case INTERNAL:
+ case ABORTED:
+ case CANCELLED:
+ case FAILED_PRECONDITION:
+ case DEADLINE_EXCEEDED:
+ case UNAVAILABLE:
+ doPauseBeforeRetry();
+ retryWrite(t, retryCount - 1);
+ break;
+ case INVALID_ARGUMENT:
+ if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
+ Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
+ logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
+ var data = rows.getData();
+ var first = data.subList(0, data.size() / 2);
+ var second = data.subList(data.size() / 2, data.size());
+ try {
+ this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
+ this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
+ } catch (Throwable e) {
+ this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
+ }
+ } else {
+ this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
+ }
+ break;
+ default:
+ this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
+ }
this.parent.inflightRequestCount.arriveAndDeregister();
+ }
+
+ private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
- if (status.getCode() == Status.Code.INVALID_ARGUMENT && t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
- Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
- logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
- var first = rows.getData().subList(0, rows.getData().size() / 2);
- var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size());
- try {
- this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount);
- this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount);
- } catch (Throwable e) {
- this.parent.appendAsyncException = new AppendException(traceId, rows, t);
+ try {
+ if (newRetryCount > 0) {
+ logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
+ this.parent.writeWithRetry(traceId, rows, newRetryCount);
+ } else {
+ logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
+ this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
- } else {
- this.parent.appendAsyncException = new AppendException(traceId, rows, t);
+ } catch (Throwable e) {
+ this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}
- @Override
- public void onSuccess(AppendRowsResponse result) {
- Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> {
- m.setBatchCount(rows.getData().size());
- m.setOffset(result.getAppendResult().getOffset().getValue());
- });
- this.parent.inflightRequestCount.arriveAndDeregister();
+ private void doPauseBeforeRetry() {
+ try {
+ Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
}
diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java
index 1d3d33c..1c1986c 100644
--- a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java
+++ b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java
@@ -56,8 +56,24 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkParam PipelineRu
}
@Test
- public void shouldNotRetryOnException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
- mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL);
+ public void shouldRetryOnRecoverableException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
+ mockClientProvider.givenFailingAppendWithStatus(Status.FAILED_PRECONDITION);
+
+ assertThatThrownBy(() -> {
+ runner
+ .withRetryCount(0)
+ .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
+ givenRows(1)
+ ))));
+ }).isInstanceOf(JobExecutionException.class);
+
+
+ verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any());
+ }
+
+ @Test
+ public void shouldNotRetryOnNonRecoverableException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
+ mockClientProvider.givenFailingAppendWithStatus(Status.PERMISSION_DENIED);
assertThatThrownBy(() -> {
runner
diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
index e7ee66e..96a4dd6 100644
--- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
+++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
@@ -4,10 +4,10 @@
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.protobuf.Int64Value;
import com.vinted.flink.bigquery.model.Rows;
+import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import com.vinted.flink.bigquery.util.FlinkTest;
import com.vinted.flink.bigquery.util.MockJsonClientProvider;
import io.grpc.Status;
-import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;