Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add retry for recoverable errors #9

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public void open(Configuration parameters) throws Exception {
@Override
public void process(K k, ProcessWindowFunction<A, Rows<A>, K, W>.Context context, Iterable<A> batch, Collector<Rows<A>> out) throws Exception {
var table = getTable(batch.iterator().next());

var data = StreamSupport.stream(batch.spliterator(), false).collect(Collectors.toList());
var result = Rows.defaultStream(data, table);
out.collect(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ public class AppendException extends RuntimeException {

private final Throwable error;

public AppendException(String traceId, Rows<?> rows, Throwable error) {
private final int retryCount;

public AppendException(String traceId, Rows<?> rows, int retryCount, Throwable error) {
this.traceId = traceId;
this.rows = rows;
this.retryCount = retryCount;
this.error = error;
}

Expand All @@ -23,6 +26,10 @@ public <A> Rows<A> getRows() {
return (Rows<A>) rows;
}

public int getRetryCount() {
return retryCount;
}

public Throwable getError() {
return error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public abstract class BigQueryBufferedSinkWriter<A, StreamT extends AutoCloseabl
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Rows<A>, BigQueryCommittable> {
private static final Logger logger = LoggerFactory.getLogger(BigQueryBufferedSinkWriter.class);
private Map<String, Long> streamOffsets = new ConcurrentHashMap<>();

public BigQueryBufferedSinkWriter(
Sink.InitContext sinkInitContext,
RowValueSerializer<A> rowSerializer,
Expand Down Expand Up @@ -200,7 +201,7 @@ public AppendCallBack(BigQueryBufferedSinkWriter<?, ?> parent, Rows<A> rows, int
@Override
public void onFailure(Throwable t) {
logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
future.completeExceptionally(new AppendException(traceId, rows, t));
future.completeExceptionally(new AppendException(traceId, rows, retryCount, t));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.protobuf.ByteString;
import com.vinted.flink.bigquery.sink.AppendException;
import io.grpc.Status;
import io.grpc.StatusException;
import org.apache.flink.api.connector.sink2.Sink;
import com.vinted.flink.bigquery.client.ClientProvider;
import com.vinted.flink.bigquery.model.Rows;
Expand Down Expand Up @@ -43,7 +46,6 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
return writer.append(prows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import io.grpc.Status;
import org.apache.flink.api.connector.sink2.Sink;
import com.vinted.flink.bigquery.client.ClientProvider;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import com.vinted.flink.bigquery.sink.AppendException;
import com.vinted.flink.bigquery.sink.BigQuerySinkWriter;
import com.vinted.flink.bigquery.sink.ExecutorProvider;
import io.grpc.Status;
import org.apache.flink.api.connector.sink2.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.function.Function;

public abstract class BigQueryDefaultSinkWriter<A, StreamT extends AutoCloseable>
extends BigQuerySinkWriter<A, StreamT> {
Expand Down Expand Up @@ -56,15 +55,6 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(response, callback, appendExecutor);
inflightRequestCount.register();
} catch (AppendException exception) {
var error = exception.getError();
var errorRows = exception.<A>getRows();
var errorTraceId = exception.getTraceId();
var status = Status.fromThrowable(error);
Function<String, String> createLogMessage = (title) ->
this.createLogMessage(title, errorTraceId, status, error, errorRows);
logger.error(createLogMessage.apply("Non recoverable BigQuery stream error for:"), error);
throw error;
} catch (Throwable t) {
logger.error("Non recoverable BigQuery stream error for:", t);
throw t;
Expand Down Expand Up @@ -107,32 +97,70 @@ public AppendCallBack(BigQueryDefaultSinkWriter<A, ?> parent, String traceId, Ro
}

@Override
public void onFailure(Throwable t) {
logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
public void onSuccess(AppendRowsResponse result) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> {
m.setBatchCount(rows.getData().size());
m.setOffset(result.getAppendResult().getOffset().getValue());
});
this.parent.inflightRequestCount.arriveAndDeregister();
}

private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.INVALID_ARGUMENT && t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var first = rows.getData().subList(0, rows.getData().size() / 2);
var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
} else {
this.parent.appendAsyncException = new AppendException(traceId, rows, t);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}

private void doPauseBeforeRetry() {
try {
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void onSuccess(AppendRowsResponse result) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(m -> {
m.setBatchCount(rows.getData().size());
m.setOffset(result.getAppendResult().getOffset().getValue());
});
public void onFailure(Throwable t) {
logger.error("Trace-id {} Received error {}", t.getMessage(), traceId);
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
case ABORTED:
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
case UNAVAILABLE:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
case INVALID_ARGUMENT:
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var first = rows.getData().subList(0, rows.getData().size() / 2);
var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
} else {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
this.parent.inflightRequestCount.arriveAndDeregister();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.protobuf.Int64Value;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import com.vinted.flink.bigquery.util.FlinkTest;
import com.vinted.flink.bigquery.util.MockJsonClientProvider;
import io.grpc.Status;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down
Loading