Skip to content

Commit

Permalink
Merge pull request #7 from vinted/fix/always-deregister-on-failure
Browse files Browse the repository at this point in the history
fix: always deregister on failure
  • Loading branch information
gintarasm authored Nov 28, 2023
2 parents 4e704ec + 61bde83 commit 4cbde99
Showing 1 changed file with 1 addition and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public AppendCallBack(BigQueryDefaultSinkWriter<A, ?> parent, String traceId, Ro
@Override
public void onFailure(Throwable t) {
logger.info("Trace-id {} Received error {}", t.getMessage(), traceId);
this.parent.inflightRequestCount.arriveAndDeregister();
var status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.INVALID_ARGUMENT && t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
Expand All @@ -120,11 +121,9 @@ public void onFailure(Throwable t) {
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, t);
this.parent.inflightRequestCount.arriveAndDeregister();
}
} else {
this.parent.appendAsyncException = new AppendException(traceId, rows, t);
this.parent.inflightRequestCount.arriveAndDeregister();
}
}

Expand Down

0 comments on commit 4cbde99

Please sign in to comment.