From b879fe8a3a69ee0e184ece8043442c474a653e0c Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Wed, 27 Dec 2023 10:58:05 +0200 Subject: [PATCH] chore: Improve logging --- .../bigquery/sink/BigQuerySinkWriter.java | 7 ++++--- .../buffered/BigQueryBufferedSinkWriter.java | 2 +- .../BigQueryDefaultSinkWriter.java | 20 +++++++++++++++---- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index d9a28cb..7a97df5 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -100,8 +100,8 @@ public void write(Rows rows, Context context) { protected abstract void writeWithRetry(String traceId, Rows rows, int retryCount) throws Throwable; - protected String createLogMessage(String title, String errorTraceId, Status status, Throwable error, Rows errorRows) { - return String.format("Trace-id: %s %s \nstatus: %s\nerror: %s\nstream: %s\ntable: %s\nactual offset: %s\nsize: %s", + protected String createLogMessage(String title, String errorTraceId, Status status, Throwable error, Rows errorRows, int retryCount) { + return String.format("Trace-id: %s %s \nstatus: %s\nerror: %s\nstream: %s\ntable: %s\nactual offset: %s\nsize: %s\n retryCount:%s", errorTraceId, title, status.getCode(), @@ -109,7 +109,8 @@ protected String createLogMessage(String title, String errorTraceId, Status stat errorRows.getStream(), errorRows.getTable(), errorRows.getOffset(), - errorRows.getData().size() + errorRows.getData().size(), + retryCount ); } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java index 573e81d..1a7badd 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java @@ -61,7 +61,7 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro var errorTraceId = exception.getTraceId(); var status = Status.fromThrowable(error); Function createLogMessage = (title) -> - this.createLogMessage(title, errorTraceId, status, error, errorRows); + this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount); switch (status.getCode()) { case INTERNAL: case CANCELLED: diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index 195ebae..19b3357 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -17,6 +17,7 @@ import java.util.Optional; import java.util.concurrent.Phaser; +import java.util.function.Function; public abstract class BigQueryDefaultSinkWriter extends BigQuerySinkWriter { @@ -36,9 +37,10 @@ public BigQueryDefaultSinkWriter( private void checkAsyncException() { // reset this exception since we could close the writer later on - RuntimeException e = appendAsyncException; + AppendException e = appendAsyncException; if (e != null) { appendAsyncException = null; + logger.error("Throwing non recoverable exception", e); throw e; } } @@ -55,8 +57,17 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro var callback = new AppendCallBack<>(this, traceId, rows, retryCount); ApiFutures.addCallback(response, callback, appendExecutor); inflightRequestCount.register(); + } catch (AppendException exception) { + var error = exception.getError(); + var errorRows = exception.getRows(); + var errorTraceId = exception.getTraceId(); + var status = Status.fromThrowable(error); + Function createLogMessage = (title) -> + this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount); + logger.error(createLogMessage.apply("Non recoverable BigQuery stream AppendException for:"), error); + throw error; } catch (Throwable t) { - logger.error("Non recoverable BigQuery stream error for:", t); + logger.error("Trace-id: {} Non recoverable BigQuery stream error for: {}. Retry count: {}", traceId, t.getMessage(), retryCount); throw t; } } @@ -108,7 +119,6 @@ public void onSuccess(AppendRowsResponse result) { @Override public void onFailure(Throwable t) { - logger.error("Trace-id {} Received error {}", t.getMessage(), traceId); var status = Status.fromThrowable(t); switch (status.getCode()) { case INTERNAL: @@ -134,10 +144,12 @@ public void onFailure(Throwable t) { this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); } } else { + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); } break; default: + logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); } this.parent.inflightRequestCount.arriveAndDeregister(); @@ -147,7 +159,7 @@ private void retryWrite(Throwable t, int newRetryCount) { var status = Status.fromThrowable(t); try { if (newRetryCount > 0) { - logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t); + logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount); this.parent.writeWithRetry(traceId, rows, newRetryCount); } else { logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);