Skip to content

Commit

Permalink
feat: add closed stream handling for buffered stream
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Feb 8, 2024
1 parent 670b286 commit 3443e8d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
case ABORTED: {
logger.warn(createLogMessage.apply("Recoverable error. Retrying.., "), error);
try {
Thread.sleep(clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (retryCount > 0) {
writeWithRetry(errorTraceId, errorRows, retryCount - 1);
} else {
throw error;
}
break;
case UNAVAILABLE:
case ABORTED: {
if (retryCount > 0) {
writeWithRetry(errorTraceId, errorRows, retryCount - 1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ public BigQueryJsonBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValue
protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
var rowArray = new JSONArray();
rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
writer = streamWriter(traceId, rows.getStream(), rows.getTable());
}

try {
return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset());
return writer.append(rowArray, rows.getOffset());
} catch (Throwable t) {
return ApiFutures.immediateFailedFuture(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
writer = streamWriter(traceId, rows.getStream(), rows.getTable());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());

try {
Expand Down

0 comments on commit 3443e8d

Please sign in to comment.