Skip to content

Commit

Permalink
feat: handle callback timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Feb 20, 2024
1 parent 7a8d911 commit 8689b91
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ gitVersioning.apply {
ext {
flinkVersion = '1.18.1'
bigqueryVersion = '2.36.0'
bigqueryStorageVersion = '3.0.0'
bigqueryStorageVersion = '3.2.0'
json4sVersion = '4.0.3'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.vinted.flink.bigquery.client.ClientProvider;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
Expand Down Expand Up @@ -50,7 +51,7 @@ private void checkAsyncException() {
var errorRows = e.<A>getRows();
var errorTraceId = e.getTraceId();
var status = Status.fromThrowable(error);
logger.error(this.createLogMessage("Non recoverable BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, 0), error);
logger.error(this.createLogMessage("Non recoverable async BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, 0), error);
throw e;
}
}
Expand Down Expand Up @@ -106,6 +107,7 @@ static class AppendCallBack<A> implements ApiFutureCallback<AppendRowsResponse>
private final BigQueryDefaultSinkWriter<A, ?> parent;
private final Rows<A> rows;
private final String traceId;

private final int retryCount;

public AppendCallBack(BigQueryDefaultSinkWriter<A, ?> parent, String traceId, Rows<A> rows, int retryCount) {
Expand Down Expand Up @@ -160,6 +162,16 @@ public void onFailure(Throwable t) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
case UNKNOWN:
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
retryWrite(t, retryCount - 1);
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import com.vinted.flink.bigquery.util.FlinkTest;
Expand All @@ -15,6 +16,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -56,6 +58,26 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam
verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any());
}

@Test
public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
var cause = new Exceptions.MaximumRequestCallbackWaitTimeExceededException(Duration.ofMinutes(6), "id", Duration.ofMinutes(5));
mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN.withCause(cause));
mockClientProvider.givenRetryCount(2);


assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any());
assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3);
}

@Test
@Disabled("Retry causes out of order exception in committer and later in writer")
public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
Expand Down

0 comments on commit 8689b91

Please sign in to comment.