From 4552d3be699773bab9f9c11ac6ea7ea89a4607af Mon Sep 17 00:00:00 2001 From: gintarasm Date: Tue, 6 Feb 2024 16:11:08 +0200 Subject: [PATCH] test: update tests for default sink --- .../bigquery/BigQueryDefaultSinkTest.java | 108 +++++------------- .../bigquery/util/MockJsonClientProvider.java | 17 ++- 2 files changed, 46 insertions(+), 79 deletions(-) diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index 96a4dd6..2166d3c 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -1,8 +1,6 @@ package com.vinted.flink.bigquery; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.FlushRowsRequest; -import com.google.protobuf.Int64Value; import com.vinted.flink.bigquery.model.Rows; import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer; import com.vinted.flink.bigquery.util.FlinkTest; @@ -22,7 +20,7 @@ import java.util.function.Function; import java.util.stream.IntStream; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; @@ -36,26 +34,10 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn mockClientProvider.givenSuccessfulAppend(); runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(0L)); - } - - @Test - public void shouldFlushRowsWhenExactlyOnceDeliveryEnabled(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenSuccessfulAppend(); - - runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 1) - )))); - - verify(mockClientProvider.getClient(), times(1)).flushRows( - FlushRowsRequest.newBuilder() - .setWriteStream(stream) - .setOffset(Int64Value.of(1)) - .build() - ); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test @@ -66,12 +48,12 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 1) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any()); } @Test @@ -83,106 +65,77 @@ public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.Pipeli runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 1) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong()); - } - - - @Test - public void shouldDoNothingWhenFullBatchWasAlreadyAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenAppendingExistingOffset(16, 4, stream); - - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(4, 4) - )))); - - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); - } - - @Test - public void shouldSplitBatchWhenAppendingBatchWhereNotAllRowsAreAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenAppendingExistingOffset(4, 2, stream); - - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(6, 2) - )))); - - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(2L)); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(4L)); + verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any()); } @Test - public void shouldFailAndNotRetryWhenFailedWithOutOfRange(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.OUT_OF_RANGE); + public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test - public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS); + public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test - public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT); + public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenStreamIsFinalized(stream); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test - public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenStreamIsFinalized(stream); + public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE); + mockClientProvider.givenRetryCount(2); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any()); + assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3); } @Test @@ -192,20 +145,19 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(6, 4) + givenRow(6) )))); - verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any(), eq(4L)); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(7L)); + verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any()); } - private Rows givenRowWithOffset(int count, int offset) { + private Rows givenRow(int count) { var data = new ArrayList(count); IntStream.rangeClosed(1, count) .forEach(i -> data.add("{\"value\": " + i + "}")); - return new Rows<>(data, offset, stream, testTable); + return new Rows<>(data, -1, stream, testTable); } private Function>> pipeline(List> data) { @@ -215,7 +167,7 @@ private Function>> pipeline( private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { var sink = BigQueryStreamSink.newJson() .withClientProvider(mockClientProvider) - .withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .withRowValueSerializer((JsonRowValueSerializer) String::getBytes) .build(); diff --git a/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java b/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java index 8bddc1b..077270a 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java +++ b/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java @@ -13,11 +13,20 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; public class MockJsonClientProvider implements ClientProvider, Serializable { private static BigQueryWriteClient mockClient = Mockito.mock(BigQueryWriteClient.class); private static JsonStreamWriter writer = Mockito.mock(JsonStreamWriter.class); + private static AtomicInteger numOfCreatedWriters = new AtomicInteger(0); + + private int retryCount = 5; + + + public void givenRetryCount(int count) { + this.retryCount = count; + } public void givenStreamDoesNotExist(String streamName) { Mockito.doThrow(new RuntimeException(new StatusException(Status.NOT_FOUND))) .when(MockJsonClientProvider.mockClient).getWriteStream(streamName); @@ -97,9 +106,14 @@ public void givenAppendingTooLargeBatch() throws Descriptors.DescriptorValidatio .thenReturn(createAppendRowsResponse()); } + public int getNumOfCreatedWriter() { + return numOfCreatedWriters.get(); + } + public static void reset() { Mockito.reset(MockJsonClientProvider.mockClient); Mockito.reset(MockJsonClientProvider.writer); + MockJsonClientProvider.numOfCreatedWriters.set(0); } private static Exceptions.StreamFinalizedException createFinalizedStreamException() { @@ -147,12 +161,13 @@ public BigQueryWriteClient getClient() { @Override public JsonStreamWriter getWriter(String streamName, TableId table) { + numOfCreatedWriters.incrementAndGet(); return MockJsonClientProvider.writer; } @Override public WriterSettings writeSettings() { - return WriterSettings.newBuilder().build(); + return WriterSettings.newBuilder().withRetryCount(retryCount).build(); } }