Skip to content

Commit

Permalink
test: update tests for default sink
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Feb 6, 2024
1 parent eedfb6f commit 4552d3b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 79 deletions.
108 changes: 30 additions & 78 deletions src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.vinted.flink.bigquery;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.protobuf.Int64Value;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import com.vinted.flink.bigquery.util.FlinkTest;
Expand All @@ -22,7 +20,7 @@
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;


Expand All @@ -36,26 +34,10 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn
mockClientProvider.givenSuccessfulAppend();

runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));

verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(0L));
}

@Test
public void shouldFlushRowsWhenExactlyOnceDeliveryEnabled(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenSuccessfulAppend();

runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 1)
))));

verify(mockClientProvider.getClient(), times(1)).flushRows(
FlushRowsRequest.newBuilder()
.setWriteStream(stream)
.setOffset(Int64Value.of(1))
.build()
);
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
Expand All @@ -66,12 +48,12 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 1)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any());
}

@Test
Expand All @@ -83,106 +65,77 @@ public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.Pipeli
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 1)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong());
}


@Test
public void shouldDoNothingWhenFullBatchWasAlreadyAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenAppendingExistingOffset(16, 4, stream);

runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(4, 4)
))));


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
}

@Test
public void shouldSplitBatchWhenAppendingBatchWhereNotAllRowsAreAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenAppendingExistingOffset(4, 2, stream);

runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(6, 2)
))));


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(2L));
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(4L));
verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenFailedWithOutOfRange(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.OUT_OF_RANGE);
public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS);
public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT);
public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenStreamIsFinalized(stream);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenStreamIsFinalized(stream);
public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE);
mockClientProvider.givenRetryCount(2);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any());
assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3);
}

@Test
Expand All @@ -192,20 +145,19 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(6, 4)
givenRow(6)
))));


verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any(), eq(4L));
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(7L));
verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any());
}

private Rows<String> givenRowWithOffset(int count, int offset) {
private Rows<String> givenRow(int count) {
var data = new ArrayList<String>(count);
IntStream.rangeClosed(1, count)
.forEach(i -> data.add("{\"value\": " + i + "}"));

return new Rows<>(data, offset, stream, testTable);
return new Rows<>(data, -1, stream, testTable);
}

private Function<StreamExecutionEnvironment, DataStream<Rows<String>>> pipeline(List<Rows<String>> data) {
Expand All @@ -215,7 +167,7 @@ private Function<StreamExecutionEnvironment, DataStream<Rows<String>>> pipeline(
private Function<StreamExecutionEnvironment, DataStreamSink<Rows<String>>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function<StreamExecutionEnvironment, DataStream<Rows<String>>> pipeline) {
var sink = BigQueryStreamSink.<String>newJson()
.withClientProvider(mockClientProvider)
.withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.withRowValueSerializer((JsonRowValueSerializer<String>) String::getBytes)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class MockJsonClientProvider implements ClientProvider<JsonStreamWriter>, Serializable {
private static BigQueryWriteClient mockClient = Mockito.mock(BigQueryWriteClient.class);
private static JsonStreamWriter writer = Mockito.mock(JsonStreamWriter.class);

private static AtomicInteger numOfCreatedWriters = new AtomicInteger(0);

private int retryCount = 5;


public void givenRetryCount(int count) {
this.retryCount = count;
}
public void givenStreamDoesNotExist(String streamName) {
Mockito.doThrow(new RuntimeException(new StatusException(Status.NOT_FOUND)))
.when(MockJsonClientProvider.mockClient).getWriteStream(streamName);
Expand Down Expand Up @@ -97,9 +106,14 @@ public void givenAppendingTooLargeBatch() throws Descriptors.DescriptorValidatio
.thenReturn(createAppendRowsResponse());
}

public int getNumOfCreatedWriter() {
return numOfCreatedWriters.get();
}

public static void reset() {
Mockito.reset(MockJsonClientProvider.mockClient);
Mockito.reset(MockJsonClientProvider.writer);
MockJsonClientProvider.numOfCreatedWriters.set(0);
}

private static Exceptions.StreamFinalizedException createFinalizedStreamException() {
Expand Down Expand Up @@ -147,12 +161,13 @@ public BigQueryWriteClient getClient() {

@Override
public JsonStreamWriter getWriter(String streamName, TableId table) {
numOfCreatedWriters.incrementAndGet();
return MockJsonClientProvider.writer;
}


@Override
public WriterSettings writeSettings() {
return WriterSettings.newBuilder().build();
return WriterSettings.newBuilder().withRetryCount(retryCount).build();
}
}

0 comments on commit 4552d3b

Please sign in to comment.