diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java
index d9a28cb..7a97df5 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java
@@ -100,8 +100,8 @@ public void write(Rows rows, Context context) {
protected abstract void writeWithRetry(String traceId, Rows rows, int retryCount) throws Throwable;
- protected String createLogMessage(String title, String errorTraceId, Status status, Throwable error, Rows errorRows) {
- return String.format("Trace-id: %s %s \nstatus: %s\nerror: %s\nstream: %s\ntable: %s\nactual offset: %s\nsize: %s",
+ protected String createLogMessage(String title, String errorTraceId, Status status, Throwable error, Rows errorRows, int retryCount) {
+ return String.format("Trace-id: %s %s \nstatus: %s\nerror: %s\nstream: %s\ntable: %s\nactual offset: %s\nsize: %s\n retryCount:%s",
errorTraceId,
title,
status.getCode(),
@@ -109,7 +109,8 @@ protected String createLogMessage(String title, String errorTraceId, Status stat
errorRows.getStream(),
errorRows.getTable(),
errorRows.getOffset(),
- errorRows.getData().size()
+ errorRows.getData().size(),
+ retryCount
);
}
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
index 573e81d..1a7badd 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java
@@ -61,7 +61,7 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro
var errorTraceId = exception.getTraceId();
var status = Status.fromThrowable(error);
Function createLogMessage = (title) ->
- this.createLogMessage(title, errorTraceId, status, error, errorRows);
+ this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount);
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
index 195ebae..19b3357 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
@@ -17,6 +17,7 @@
import java.util.Optional;
import java.util.concurrent.Phaser;
+import java.util.function.Function;
public abstract class BigQueryDefaultSinkWriter
extends BigQuerySinkWriter {
@@ -36,9 +37,10 @@ public BigQueryDefaultSinkWriter(
private void checkAsyncException() {
// reset this exception since we could close the writer later on
- RuntimeException e = appendAsyncException;
+ AppendException e = appendAsyncException;
if (e != null) {
appendAsyncException = null;
+ logger.error("Throwing non recoverable exception", e);
throw e;
}
}
@@ -55,8 +57,17 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(response, callback, appendExecutor);
inflightRequestCount.register();
+ } catch (AppendException exception) {
+ var error = exception.getError();
+ var errorRows = exception.getRows();
+ var errorTraceId = exception.getTraceId();
+ var status = Status.fromThrowable(error);
+ Function createLogMessage = (title) ->
+ this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount);
+ logger.error(createLogMessage.apply("Non recoverable BigQuery stream AppendException for:"), error);
+ throw error;
} catch (Throwable t) {
- logger.error("Non recoverable BigQuery stream error for:", t);
+ logger.error("Trace-id: {} Non recoverable BigQuery stream error for: {}. Retry count: {}", traceId, t.getMessage(), retryCount);
throw t;
}
}
@@ -108,7 +119,6 @@ public void onSuccess(AppendRowsResponse result) {
@Override
public void onFailure(Throwable t) {
- logger.error("Trace-id {} Received error {}", t.getMessage(), traceId);
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
@@ -134,10 +144,12 @@ public void onFailure(Throwable t) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
} else {
+ logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
+ logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
this.parent.inflightRequestCount.arriveAndDeregister();
@@ -147,7 +159,7 @@ private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
try {
if (newRetryCount > 0) {
- logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
+ logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);