From 0b79ed2859c3de3ef09d1c4b4110cb97aaa3b6ff Mon Sep 17 00:00:00 2001 From: gintarasm Date: Mon, 8 Apr 2024 12:30:49 +0300 Subject: [PATCH] fix: retry when stream writer is closed --- .../sink/async/AsyncBigQuerySinkWriter.java | 23 +++++++++++++++++++ .../flink/bigquery/AsyncBigQuerySinkTest.java | 17 ++++++++++++++ .../util/MockAsyncProtoClientProvider.java | 16 +++++++++++++ 3 files changed, 56 insertions(+) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index 52c6604..0327ce4 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -128,6 +128,15 @@ protected void submitRequestEntries(List list, Consumer list, Consumer { + runner + .withRetryCount(0) + .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( + givenRow(1) + )))); + }).isInstanceOf(JobExecutionException.class); + + + verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); + } + @Test public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception { mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN); diff --git a/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java b/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java index 2619bd6..7a9f723 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java +++ b/src/test/java/com/vinted/flink/bigquery/util/MockAsyncProtoClientProvider.java @@ -102,6 +102,14 @@ public void givenFailingAppendWithStatus(Status status) throws Descriptors.Descr .thenReturn(createAppendRowsResponseError(new StatusException(status))); } + public void givenStreamWriterClosed() throws Descriptors.DescriptorValidationException, IOException { + var response = createAppendRowsResponseError( + new StatusException(Status.ABORTED.withCause(createStreamWriterClosedException())) + ); + Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) + .thenReturn(response); + } + public void givenTimeoutForAppend() throws Descriptors.DescriptorValidationException, IOException { Mockito.when(MockAsyncProtoClientProvider.protoWriter.append(Mockito.any())) .thenReturn(createTimeoutAppendRowsResponse()); @@ -148,6 +156,14 @@ private static Exceptions.OffsetAlreadyExists createOffsetAlreadyExistsException return offsetMock; } + private static Exceptions.StreamWriterClosedException createStreamWriterClosedException() { + var offsetMock = Mockito.mock(Exceptions.StreamWriterClosedException.class); + Mockito.when(offsetMock.getStatus()).thenReturn(Status.ABORTED); + Mockito.when(offsetMock.getStreamName()).thenReturn("stream"); + Mockito.when(offsetMock.getCause()).thenReturn(new RuntimeException()); + return offsetMock; + } + private static SettableApiFuture createAppendRowsResponse() { SettableApiFuture result = SettableApiFuture.create(); result.set(AppendRowsResponse.newBuilder().buildPartial());