Skip to content

Commit

Permalink
Merge pull request #11 from vinted/chore/improve-logging
Browse files Browse the repository at this point in the history
chore: Improve logging
  • Loading branch information
s-gelazevicius authored Dec 27, 2023
2 parents 8256fc6 + b879fe8 commit cf3563d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,17 @@ public void write(Rows<A> rows, Context context) {

protected abstract void writeWithRetry(String traceId, Rows<A> rows, int retryCount) throws Throwable;

protected String createLogMessage(String title, String errorTraceId, Status status, Throwable error, Rows<A> errorRows) {
return String.format("Trace-id: %s %s \nstatus: %s\nerror: %s\nstream: %s\ntable: %s\nactual offset: %s\nsize: %s",
protected String createLogMessage(String title, String errorTraceId, Status status, Throwable error, Rows<A> errorRows, int retryCount) {
return String.format("Trace-id: %s %s \nstatus: %s\nerror: %s\nstream: %s\ntable: %s\nactual offset: %s\nsize: %s\n retryCount:%s",
errorTraceId,
title,
status.getCode(),
error.getMessage(),
errorRows.getStream(),
errorRows.getTable(),
errorRows.getOffset(),
errorRows.getData().size()
errorRows.getData().size(),
retryCount
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
var errorTraceId = exception.getTraceId();
var status = Status.fromThrowable(error);
Function<String, String> createLogMessage = (title) ->
this.createLogMessage(title, errorTraceId, status, error, errorRows);
this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount);
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.function.Function;

public abstract class BigQueryDefaultSinkWriter<A, StreamT extends AutoCloseable>
extends BigQuerySinkWriter<A, StreamT> {
Expand All @@ -36,9 +37,10 @@ public BigQueryDefaultSinkWriter(

private void checkAsyncException() {
// reset this exception since we could close the writer later on
RuntimeException e = appendAsyncException;
AppendException e = appendAsyncException;
if (e != null) {
appendAsyncException = null;
logger.error("Throwing non recoverable exception", e);
throw e;
}
}
Expand All @@ -55,8 +57,17 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(response, callback, appendExecutor);
inflightRequestCount.register();
} catch (AppendException exception) {
var error = exception.getError();
var errorRows = exception.<A>getRows();
var errorTraceId = exception.getTraceId();
var status = Status.fromThrowable(error);
Function<String, String> createLogMessage = (title) ->
this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount);
logger.error(createLogMessage.apply("Non recoverable BigQuery stream AppendException for:"), error);
throw error;
} catch (Throwable t) {
logger.error("Non recoverable BigQuery stream error for:", t);
logger.error("Trace-id: {} Non recoverable BigQuery stream error for: {}. Retry count: {}", traceId, t.getMessage(), retryCount);
throw t;
}
}
Expand Down Expand Up @@ -108,7 +119,6 @@ public void onSuccess(AppendRowsResponse result) {

@Override
public void onFailure(Throwable t) {
logger.error("Trace-id {} Received error {}", t.getMessage(), traceId);
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
Expand All @@ -134,10 +144,12 @@ public void onFailure(Throwable t) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
this.parent.inflightRequestCount.arriveAndDeregister();
Expand All @@ -147,7 +159,7 @@ private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
Expand Down

0 comments on commit cf3563d

Please sign in to comment.