diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java index 1a7badd..c233837 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryBufferedSinkWriter.java @@ -65,7 +65,6 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro switch (status.getCode()) { case INTERNAL: case CANCELLED: - case ABORTED: { logger.warn(createLogMessage.apply("Recoverable error. Retrying.., "), error); try { Thread.sleep(clientProvider.writeSettings().getRetryPause().toMillis()); @@ -73,6 +72,14 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro throw new RuntimeException(e); } + if (retryCount > 0) { + writeWithRetry(errorTraceId, errorRows, retryCount - 1); + } else { + throw error; + } + break; + case UNAVAILABLE: + case ABORTED: { if (retryCount > 0) { writeWithRetry(errorTraceId, errorRows, retryCount - 1); } else { diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java index 781bd2f..929d384 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryJsonBufferedSinkWriter.java @@ -24,9 +24,15 @@ public BigQueryJsonBufferedSinkWriter(Sink.InitContext sinkInitContext, RowValue protected ApiFuture append(String traceId, Rows rows) { var rowArray = new JSONArray(); rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); + var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + + if (writer.isClosed() || writer.isUserClosed()) { + recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } try { - return streamWriter(traceId, rows.getStream(), rows.getTable()).append(rowArray, rows.getOffset()); + return writer.append(rowArray, rows.getOffset()); } catch (Throwable t) { return ApiFutures.immediateFailedFuture(t); } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java index a45ec4b..db41a64 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java @@ -34,6 +34,12 @@ protected ApiFuture append(String traceId, Rows rows) { Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + if (writer.isClosed() || writer.isUserClosed()) { + logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); + recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + writer = streamWriter(traceId, rows.getStream(), rows.getTable()); + } + logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try {