From 8689b91efc02d0d220bc19ba54ca77a4d8a28718 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Tue, 20 Feb 2024 09:57:18 +0200 Subject: [PATCH] feat: handle callback timeouts --- build.gradle | 2 +- .../BigQueryDefaultSinkWriter.java | 14 +++++++++++- .../bigquery/BigQueryDefaultSinkTest.java | 22 +++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 9b84f88..ce615cf 100755 --- a/build.gradle +++ b/build.gradle @@ -30,7 +30,7 @@ gitVersioning.apply { ext { flinkVersion = '1.18.1' bigqueryVersion = '2.36.0' - bigqueryStorageVersion = '3.0.0' + bigqueryStorageVersion = '3.2.0' json4sVersion = '4.0.3' } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index 45899b3..b1a3d51 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -3,6 +3,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions; import com.vinted.flink.bigquery.client.ClientProvider; import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; import com.vinted.flink.bigquery.model.Rows; @@ -50,7 +51,7 @@ private void checkAsyncException() { var errorRows = e.getRows(); var errorTraceId = e.getTraceId(); var status = Status.fromThrowable(error); - logger.error(this.createLogMessage("Non recoverable BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, 0), error); + logger.error(this.createLogMessage("Non recoverable async BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, 0), error); throw e; } } @@ -106,6 +107,7 @@ static class AppendCallBack implements ApiFutureCallback private final BigQueryDefaultSinkWriter parent; private final Rows rows; private final String traceId; + private final int retryCount; public AppendCallBack(BigQueryDefaultSinkWriter parent, String traceId, Rows rows, int retryCount) { @@ -160,6 +162,16 @@ public void onFailure(Throwable t) { this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); } break; + case UNKNOWN: + if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + retryWrite(t, retryCount - 1); + } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); + this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); + } + break; default: logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index 2166d3c..fba9609 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.storage.v1.Exceptions; import com.vinted.flink.bigquery.model.Rows; import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer; import com.vinted.flink.bigquery.util.FlinkTest; @@ -15,6 +16,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -56,6 +58,26 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any()); } + @Test + public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + var cause = new Exceptions.MaximumRequestCallbackWaitTimeExceededException(Duration.ofMinutes(6), "id", Duration.ofMinutes(5)); + mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN.withCause(cause)); + mockClientProvider.givenRetryCount(2); + + + assertThatThrownBy(() -> { + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(1) + )))); + }).isInstanceOf(JobExecutionException.class); + + + verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any()); + assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3); + } + @Test @Disabled("Retry causes out of order exception in committer and later in writer") public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {