Skip to content

Commit

Permalink
chore: Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
s-gelazevicius committed Dec 21, 2023
1 parent ca3b1c9 commit c88cbcf
Showing 1 changed file with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,6 @@ public void onSuccess(AppendRowsResponse result) {
this.parent.inflightRequestCount.arriveAndDeregister();
}

private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}

private void doPauseBeforeRetry() {
try {
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void onFailure(Throwable t) {
Expand All @@ -146,8 +124,9 @@ public void onFailure(Throwable t) {
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var first = rows.getData().subList(0, rows.getData().size() / 2);
var second = rows.getData().subList(rows.getData().size() / 2, rows.getData().size());
var data = rows.getData();
var first = data.subList(0, data.size() / 2);
var second = data.subList(data.size() / 2, data.size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
Expand All @@ -163,5 +142,28 @@ public void onFailure(Throwable t) {
}
this.parent.inflightRequestCount.arriveAndDeregister();
}

private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying...", traceId, status.getCode(), t);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}

private void doPauseBeforeRetry() {
try {
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit c88cbcf

Please sign in to comment.