diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 96ec2ec..a116b83 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -38,6 +38,7 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR private final Executor appendExecutor; + protected transient Queue writersToClose = new ArrayDeque<>(); protected transient Map streamMap = new ConcurrentHashMap<>(); public AsyncBigQuerySinkWriter(ExecutorProvider executorProvider, AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { @@ -94,11 +95,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str streamMap.replaceAll((key, writer) -> { var newWriter = writer; if (writer.getWriterId().equals(writerId)) { - try { - writer.close(); - } catch (Exception e) { - logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); - } + writersToClose.add(writer); newWriter = this.clientProvider.getWriter(streamName, table); registerInflightMetric(newWriter); } @@ -229,6 +226,20 @@ protected long getSizeInBytes(StreamRequest StreamRequest) { return StreamRequest.getData().getSerializedSize(); } + @Override + public void flush(boolean flush) throws InterruptedException { + super.flush(flush); + + while (writersToClose.peek() != null) { + var writer = writersToClose.poll(); + try { + writer.close(); + } catch (Exception e) { + logger.error("Could not close writer", e); + } + } + } + @Override public void close() { logger.info("Closing BigQuery write stream");