From d762d9f6626385fae607268a33dd3d9c9b439951 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Tue, 2 Apr 2024 15:15:36 +0300 Subject: [PATCH 1/3] feat: only close writers on flush --- .../sink/async/AsyncBigQuerySinkWriter.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 96ec2ec..a116b83 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -38,6 +38,7 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR private final Executor appendExecutor; + protected transient Queue writersToClose = new ArrayDeque<>(); protected transient Map streamMap = new ConcurrentHashMap<>(); public AsyncBigQuerySinkWriter(ExecutorProvider executorProvider, AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { @@ -94,11 +95,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str streamMap.replaceAll((key, writer) -> { var newWriter = writer; if (writer.getWriterId().equals(writerId)) { - try { - writer.close(); - } catch (Exception e) { - logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); - } + writersToClose.add(writer); newWriter = this.clientProvider.getWriter(streamName, table); registerInflightMetric(newWriter); } @@ -229,6 +226,20 @@ protected long getSizeInBytes(StreamRequest StreamRequest) { return StreamRequest.getData().getSerializedSize(); } + @Override + public void flush(boolean flush) throws InterruptedException { + super.flush(flush); + + while (writersToClose.peek() != null) { + var writer = writersToClose.poll(); + try { + writer.close(); + } catch (Exception e) { + logger.error("Could not close writer", e); + } + } + } + @Override public void close() { logger.info("Closing BigQuery write stream"); From 59d966279d803659b9a529d2745029a8da2cc614 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 3 Apr 2024 14:40:10 +0300 Subject: [PATCH 2/3] fix: do not flush on close --- .../flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index a116b83..33c5faa 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -244,7 +244,7 @@ public void flush(boolean flush) throws InterruptedException { public void close() { logger.info("Closing BigQuery write stream"); try { - flush(true); + flush(false); streamMap.values().forEach(stream -> { try { stream.close(); From 837ce1abb9ba22b3184b1b7d72241d5532305369 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 3 Apr 2024 15:45:26 +0300 Subject: [PATCH 3/3] refactor: do not flush on close --- .../sink/async/AsyncBigQuerySinkWriter.java | 16 ++-------------- .../flink/bigquery/AsyncBigQuerySinkTest.java | 4 ++-- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 33c5faa..52c6604 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -235,7 +235,7 @@ public void flush(boolean flush) throws InterruptedException { try { writer.close(); } catch (Exception e) { - logger.error("Could not close writer", e); + logger.error("Could not close unused writer", e); } } } @@ -243,19 +243,7 @@ public void flush(boolean flush) throws InterruptedException { @Override public void close() { logger.info("Closing BigQuery write stream"); - try { - flush(false); - streamMap.values().forEach(stream -> { - try { - stream.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - + streamMap.values().forEach(StreamWriter::close); } diff --git a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java index ffc84de..f25d724 100644 --- a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java @@ -65,7 +65,7 @@ public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTim @Test public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenStreamIsFinalized(stream); + mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN); assertThatThrownBy(() -> { runner @@ -76,7 +76,7 @@ public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); + verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any()); } @Test