From ca3b1c9f1c3e699665b5ed76f1f1e541e84d385d Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Wed, 20 Dec 2023 16:21:47 +0200 Subject: [PATCH 1/2] feat: add retry for recoverable errors --- .../flink/bigquery/process/RowBatcher.java | 1 - .../flink/bigquery/sink/AppendException.java | 9 +- .../buffered/BigQueryBufferedSinkWriter.java | 3 +- .../BigQueryDefaultProtoSinkWriter.java | 1 - .../BigQueryDefaultSinkWriter.java | 92 ++++++++++++------- .../bigquery/BigQueryBufferedSinkTest.java | 20 +++- .../bigquery/BigQueryDefaultSinkTest.java | 2 +- 7 files changed, 89 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java b/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java index 65789c8..72c59a4 100644 --- a/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java +++ b/src/main/java/com/vinted/flink/bigquery/process/RowBatcher.java @@ -20,7 +20,6 @@ public void open(Configuration parameters) throws Exception { @Override public void process(K k, ProcessWindowFunction, K, W>.Context context, Iterable batch, Collector> out) throws Exception { var table = getTable(batch.iterator().next()); - var data = StreamSupport.stream(batch.spliterator(), false).collect(Collectors.toList()); var result = Rows.defaultStream(data, table); out.collect(result); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java b/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java index 11fad7a..daa18b9 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/AppendException.java @@ -9,9 +9,12 @@ public class AppendException extends RuntimeException { private final Throwable error; - public AppendException(String traceId, Rows rows, Throwable error) { + private final int retryCount; + + public AppendException(String traceId, Rows rows, int retryCount, Throwable error) { this.traceId = traceId; this.rows = rows; + this.retryCount = retryCount; this.error = error; } @@ -23,6 +26,10 @@ public Rows getRows() { return (Rows) rows; } + public int getRetryCount() { + return retryCount; + } + public Throwable getError() { return error; } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java index bc006ee..573e81d 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java @@ -29,6 +29,7 @@ public abstract class BigQueryBufferedSinkWriter, BigQueryCommittable> { private static final Logger logger = LoggerFactory.getLogger(BigQueryBufferedSinkWriter.class); private Map streamOffsets = new ConcurrentHashMap<>(); + public BigQueryBufferedSinkWriter( Sink.InitContext sinkInitContext, RowValueSerializer rowSerializer, @@ -200,7 +201,7 @@ public AppendCallBack(BigQueryBufferedSinkWriter parent, Rows rows, int @Override public void onFailure(Throwable t) { logger.info("Trace-id {} Received error {}", t.getMessage(), traceId); - future.completeExceptionally(new AppendException(traceId, rows, t)); + future.completeExceptionally(new AppendException(traceId, rows, retryCount, t)); } @Override diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java index 9351637..6786546 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java @@ -43,7 +43,6 @@ protected ApiFuture append(String traceId, Rows rows) { if (writer.isClosed() || writer.isUserClosed()) { logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); } - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try { return writer.append(prows); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index fa2b1f9..6ac549a 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -3,21 +3,20 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; -import io.grpc.Status; -import org.apache.flink.api.connector.sink2.Sink; import com.vinted.flink.bigquery.client.ClientProvider; +import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; import com.vinted.flink.bigquery.model.Rows; import com.vinted.flink.bigquery.serializer.RowValueSerializer; import com.vinted.flink.bigquery.sink.AppendException; import com.vinted.flink.bigquery.sink.BigQuerySinkWriter; import com.vinted.flink.bigquery.sink.ExecutorProvider; +import io.grpc.Status; +import org.apache.flink.api.connector.sink2.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.concurrent.Phaser; -import java.util.function.Function; public abstract class BigQueryDefaultSinkWriter extends BigQuerySinkWriter { @@ -56,15 +55,6 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro var callback = new AppendCallBack<>(this, traceId, rows, retryCount); ApiFutures.addCallback(response, callback, appendExecutor); inflightRequestCount.register(); - } catch (AppendException exception) { - var error = exception.getError(); - var errorRows = exception.getRows(); - var errorTraceId = exception.getTraceId(); - var status = Status.fromThrowable(error); - Function createLogMessage = (title) -> - this.createLogMessage(title, errorTraceId, status, error, errorRows); - logger.error(createLogMessage.apply("Non recoverable BigQuery stream error for:"), error); - throw error; } catch (Throwable t) { logger.error("Non recoverable BigQuery stream error for:", t); throw t; @@ -107,32 +97,70 @@ public AppendCallBack(BigQueryDefaultSinkWriter parent, String traceId, Ro } @Override - public void onFailure(Throwable t) { - logger.info("Trace-id {} Received error {}", t.getMessage(), traceId); + public void onSuccess(AppendRowsResponse result) { + Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> { + m.setBatchCount(rows.getData().size()); + m.setOffset(result.getAppendResult().getOffset().getValue()); + }); this.parent.inflightRequestCount.arriveAndDeregister(); + } + + private void retryWrite(Throwable t, int newRetryCount) { var status = Status.fromThrowable(t); - if (status.getCode() == Status.Code.INVALID_ARGUMENT && t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { - Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); - logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); - var first = rows.getData().subList(0, rows.getData().size() / 2); - var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size()); - try { - this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount); - this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount); - } catch (Throwable e) { - this.parent.appendAsyncException = new AppendException(traceId, rows, t); + try { + if (newRetryCount > 0) { + logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t); + this.parent.writeWithRetry(traceId, rows, newRetryCount); + } else { + logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); + this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t); } - } else { - this.parent.appendAsyncException = new AppendException(traceId, rows, t); + } catch (Throwable e) { + this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e); + } + } + + private void doPauseBeforeRetry() { + try { + Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @Override - public void onSuccess(AppendRowsResponse result) { - Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> { - m.setBatchCount(rows.getData().size()); - m.setOffset(result.getAppendResult().getOffset().getValue()); - }); + public void onFailure(Throwable t) { + logger.error("Trace-id {} Received error {}", t.getMessage(), traceId); + var status = Status.fromThrowable(t); + switch (status.getCode()) { + case INTERNAL: + case ABORTED: + case CANCELLED: + case FAILED_PRECONDITION: + case DEADLINE_EXCEEDED: + case UNAVAILABLE: + doPauseBeforeRetry(); + retryWrite(t, retryCount - 1); + break; + case INVALID_ARGUMENT: + if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { + Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); + logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); + var first = rows.getData().subList(0, rows.getData().size() / 2); + var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size()); + try { + this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1); + this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1); + } catch (Throwable e) { + this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); + } + } else { + this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); + } + break; + default: + this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); + } this.parent.inflightRequestCount.arriveAndDeregister(); } } diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java index 1d3d33c..1c1986c 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java @@ -56,8 +56,24 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkParam PipelineRu } @Test - public void shouldNotRetryOnException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL); + public void shouldRetryOnRecoverableException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.FAILED_PRECONDITION); + + assertThatThrownBy(() -> { + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRows(1) + )))); + }).isInstanceOf(JobExecutionException.class); + + + verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any()); + } + + @Test + public void shouldNotRetryOnNonRecoverableException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.PERMISSION_DENIED); assertThatThrownBy(() -> { runner diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index e7ee66e..96a4dd6 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -4,10 +4,10 @@ import com.google.cloud.bigquery.storage.v1.FlushRowsRequest; import com.google.protobuf.Int64Value; import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer; import com.vinted.flink.bigquery.util.FlinkTest; import com.vinted.flink.bigquery.util.MockJsonClientProvider; import io.grpc.Status; -import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; From c88cbcfeccfae2ae8890dd0e8fc82d53f7780cd0 Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Thu, 21 Dec 2023 11:24:13 +0200 Subject: [PATCH 2/2] chore: Address review comments --- .../BigQueryDefaultSinkWriter.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index 6ac549a..195ebae 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -105,28 +105,6 @@ public void onSuccess(AppendRowsResponse result) { this.parent.inflightRequestCount.arriveAndDeregister(); } - private void retryWrite(Throwable t, int newRetryCount) { - var status = Status.fromThrowable(t); - try { - if (newRetryCount > 0) { - logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t); - this.parent.writeWithRetry(traceId, rows, newRetryCount); - } else { - logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); - this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t); - } - } catch (Throwable e) { - this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e); - } - } - - private void doPauseBeforeRetry() { - try { - Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } @Override public void onFailure(Throwable t) { @@ -146,8 +124,9 @@ public void onFailure(Throwable t) { if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); - var first = rows.getData().subList(0, rows.getData().size() / 2); - var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size()); + var data = rows.getData(); + var first = data.subList(0, data.size() / 2); + var second = data.subList(data.size() / 2, data.size()); try { this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1); this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1); @@ -163,5 +142,28 @@ public void onFailure(Throwable t) { } this.parent.inflightRequestCount.arriveAndDeregister(); } + + private void retryWrite(Throwable t, int newRetryCount) { + var status = Status.fromThrowable(t); + try { + if (newRetryCount > 0) { + logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t); + this.parent.writeWithRetry(traceId, rows, newRetryCount); + } else { + logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); + this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t); + } + } catch (Throwable e) { + this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e); + } + } + + private void doPauseBeforeRetry() { + try { + Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } }