Skip to content

Commit

Permalink
fix: correctly resolve nested exception
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Mar 19, 2024
1 parent 8065b79 commit 9f6f8b8
Showing 1 changed file with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.common.base.Preconditions;
import com.vinted.flink.bigquery.metric.AsyncBigQueryStreamMetrics;
import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.sink.ExecutorProvider;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
Expand Down Expand Up @@ -152,7 +155,7 @@ protected void submitRequestEntries(List<StreamRequest> list, Consumer<List<Stre
throw new AsyncWriterException(traceId, status.getCode(), t);
}
case UNKNOWN:
if (status.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
if (isMaximumRequestCallbackWaitTimeExceededException(t)) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
Optional.ofNullable(this.metrics.get(request.getStream()))
.ifPresent(AsyncBigQueryStreamMetrics::incrementTimeoutCount);
Expand Down Expand Up @@ -194,6 +197,20 @@ protected void submitRequestEntries(List<StreamRequest> list, Consumer<List<Stre

}

private boolean isMaximumRequestCallbackWaitTimeExceededException(Throwable t) {
if (t == null) {
return false;
}

for (Throwable cause = t; cause != null; cause = cause.getCause()) {
if (cause instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
return true;
}
}

return false;
}

private List<StreamRequest> retry(Throwable t, String traceId, StreamRequest request) {
var status = Status.fromThrowable(t);
request.setRetries(request.getRetries() - 1);
Expand Down

0 comments on commit 9f6f8b8

Please sign in to comment.