diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 96ec2ec..52c6604 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -38,6 +38,7 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR private final Executor appendExecutor; + protected transient Queue writersToClose = new ArrayDeque<>(); protected transient Map streamMap = new ConcurrentHashMap<>(); public AsyncBigQuerySinkWriter(ExecutorProvider executorProvider, AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { @@ -94,11 +95,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str streamMap.replaceAll((key, writer) -> { var newWriter = writer; if (writer.getWriterId().equals(writerId)) { - try { - writer.close(); - } catch (Exception e) { - logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); - } + writersToClose.add(writer); newWriter = this.clientProvider.getWriter(streamName, table); registerInflightMetric(newWriter); } @@ -230,21 +227,23 @@ protected long getSizeInBytes(StreamRequest StreamRequest) { } @Override - public void close() { - logger.info("Closing BigQuery write stream"); - try { - flush(true); - streamMap.values().forEach(stream -> { - try { - stream.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); + public void flush(boolean flush) throws InterruptedException { + super.flush(flush); + + while (writersToClose.peek() != null) { + var writer = writersToClose.poll(); + try { + writer.close(); + } catch (Exception e) { + logger.error("Could not close unused writer", e); + } } + } + @Override + public void close() { + logger.info("Closing BigQuery write stream"); + streamMap.values().forEach(StreamWriter::close); } diff --git a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java index ffc84de..f25d724 100644 --- a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java @@ -65,7 +65,7 @@ public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTim @Test public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenStreamIsFinalized(stream); + mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN); assertThatThrownBy(() -> { runner @@ -76,7 +76,7 @@ public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); + verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any()); } @Test