Skip to content

Commit

Permalink
Merge pull request #10 from vinted/feat/add-retry-for-default-stream
Browse files Browse the repository at this point in the history
feat: add retry for recoverable errors
  • Loading branch information
s-gelazevicius authored Dec 21, 2023
2 parents 014a8f1 + c88cbcf commit 8256fc6
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public void open(Configuration parameters) throws Exception {
@Override
public void process(K k, ProcessWindowFunction<A, Rows<A>, K, W>.Context context, Iterable<A> batch, Collector<Rows<A>> out) throws Exception {
var table = getTable(batch.iterator().next());

var data = StreamSupport.stream(batch.spliterator(), false).collect(Collectors.toList());
var result = Rows.defaultStream(data, table);
out.collect(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ public class AppendException extends RuntimeException {

private final Throwable error;

public AppendException(String traceId, Rows<?> rows, Throwable error) {
private final int retryCount;

public AppendException(String traceId, Rows<?> rows, int retryCount, Throwable error) {
this.traceId = traceId;
this.rows = rows;
this.retryCount = retryCount;
this.error = error;
}

Expand All @@ -23,6 +26,10 @@ public <A> Rows<A> getRows() {
return (Rows<A>) rows;
}

public int getRetryCount() {
return retryCount;
}

public Throwable getError() {
return error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public abstract class BigQueryBufferedSinkWriter<A, StreamT extends AutoCloseabl
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Rows<A>, BigQueryCommittable> {
private static final Logger logger = LoggerFactory.getLogger(BigQueryBufferedSinkWriter.class);
private Map<String, Long> streamOffsets = new ConcurrentHashMap<>();

public BigQueryBufferedSinkWriter(
Sink.InitContext sinkInitContext,
RowValueSerializer<A> rowSerializer,
Expand Down Expand Up @@ -200,7 +201,7 @@ public AppendCallBack(BigQueryBufferedSinkWriter<?, ?> parent, Rows<A> rows, int
@Override
public void onFailure(Throwable t) {
logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
future.completeExceptionally(new AppendException(traceId, rows, t));
future.completeExceptionally(new AppendException(traceId, rows, retryCount, t));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
return writer.append(prows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import io.grpc.Status;
import org.apache.flink.api.connector.sink2.Sink;
import com.vinted.flink.bigquery.client.ClientProvider;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import com.vinted.flink.bigquery.sink.AppendException;
import com.vinted.flink.bigquery.sink.BigQuerySinkWriter;
import com.vinted.flink.bigquery.sink.ExecutorProvider;
import io.grpc.Status;
import org.apache.flink.api.connector.sink2.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.function.Function;

public abstract class BigQueryDefaultSinkWriter<A, StreamT extends AutoCloseable>
extends BigQuerySinkWriter<A, StreamT> {
Expand Down Expand Up @@ -56,15 +55,6 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(response, callback, appendExecutor);
inflightRequestCount.register();
} catch (AppendException exception) {
var error = exception.getError();
var errorRows = exception.<A>getRows();
var errorTraceId = exception.getTraceId();
var status = Status.fromThrowable(error);
Function<String, String> createLogMessage = (title) ->
this.createLogMessage(title, errorTraceId, status, error, errorRows);
logger.error(createLogMessage.apply("Non recoverable BigQuery stream error for:"), error);
throw error;
} catch (Throwable t) {
logger.error("Non recoverable BigQuery stream error for:", t);
throw t;
Expand Down Expand Up @@ -106,34 +96,74 @@ public AppendCallBack(BigQueryDefaultSinkWriter<A, ?> parent, String traceId, Ro
this.retryCount = retryCount;
}

@Override
public void onSuccess(AppendRowsResponse result) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> {
m.setBatchCount(rows.getData().size());
m.setOffset(result.getAppendResult().getOffset().getValue());
});
this.parent.inflightRequestCount.arriveAndDeregister();
}


@Override
public void onFailure(Throwable t) {
logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
logger.error("Trace-id {} Received error {}", t.getMessage(), traceId);
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
case ABORTED:
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
case UNAVAILABLE:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
case INVALID_ARGUMENT:
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var data = rows.getData();
var first = data.subList(0, data.size() / 2);
var second = data.subList(data.size() / 2, data.size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
} else {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
this.parent.inflightRequestCount.arriveAndDeregister();
}

private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.INVALID_ARGUMENT && t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var first = rows.getData().subList(0, rows.getData().size() / 2);
var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
} else {
this.parent.appendAsyncException = new AppendException(traceId, rows, t);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}

@Override
public void onSuccess(AppendRowsResponse result) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> {
m.setBatchCount(rows.getData().size());
m.setOffset(result.getAppendResult().getOffset().getValue());
});
this.parent.inflightRequestCount.arriveAndDeregister();
private void doPauseBeforeRetry() {
try {
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,24 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkParam PipelineRu
}

@Test
public void shouldNotRetryOnException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL);
public void shouldRetryOnRecoverableException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.FAILED_PRECONDITION);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRows(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any());
}

@Test
public void shouldNotRetryOnNonRecoverableException(@FlinkParam PipelineRunner runner, @FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.PERMISSION_DENIED);

assertThatThrownBy(() -> {
runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.protobuf.Int64Value;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import com.vinted.flink.bigquery.util.FlinkTest;
import com.vinted.flink.bigquery.util.MockJsonClientProvider;
import io.grpc.Status;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down

0 comments on commit 8256fc6

Please sign in to comment.