From cfe4bafd79c2669755285d1aea42ac4998039e91 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 7 Mar 2024 15:47:04 +0200 Subject: [PATCH 1/4] feat: set correct config --- .../com/vinted/flink/bigquery/model/config/WriterSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java index 72673cc..d96eb5b 100644 --- a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java +++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java @@ -135,7 +135,7 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) { } public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) { - this.maxRequestWaitCallbackTime = maxRetryDuration; + this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime; return this; } From 4610faa3132b6d026ce508a45d346e4f75eac11f Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 7 Mar 2024 15:47:52 +0200 Subject: [PATCH 2/4] fix: remove flawed retry logic --- .../BigQueryDefaultSinkWriter.java | 79 +------------------ 1 file changed, 3 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index d35a744..c1ab24c 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -88,7 +88,7 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount ); var result = append(traceId, rows); - var callback = new AppendCallBack<>(this, result.writerId, traceId, rows, retryCount); + var callback = new AppendCallBack<>(this, traceId, rows, retryCount); ApiFutures.addCallback(result.response, callback, appendExecutor); inflightRequestCount.register(); } catch (AppendException exception) { @@ -130,14 +130,12 @@ static class AppendCallBack implements ApiFutureCallback private final BigQueryDefaultSinkWriter parent; private final Rows rows; - private final String writerId; private final String traceId; private final int retryCount; - public AppendCallBack(BigQueryDefaultSinkWriter parent, String writerId, String traceId, Rows rows, int retryCount) { + public AppendCallBack(BigQueryDefaultSinkWriter parent, String traceId, Rows rows, int retryCount) { this.parent = parent; - this.writerId = writerId; this.traceId = traceId; this.rows = rows; this.retryCount = retryCount; @@ -155,79 +153,8 @@ public void onSuccess(AppendRowsResponse result) { @Override public void onFailure(Throwable t) { - var status = Status.fromThrowable(t); - switch (status.getCode()) { - case INTERNAL: - case CANCELLED: - case FAILED_PRECONDITION: - case DEADLINE_EXCEEDED: - doPauseBeforeRetry(); - retryWrite(t, retryCount - 1); - break; - case ABORTED: - case UNAVAILABLE: { - this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable()); - retryWrite(t, retryCount - 1); - break; - } - case INVALID_ARGUMENT: - if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { - Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); - logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId); - var data = rows.getData(); - var first = data.subList(0, data.size() / 2); - var second = data.subList(data.size() / 2, data.size()); - try { - this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1); - this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1); - } catch (Throwable e) { - this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); - } - } else { - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); - } - break; - case UNKNOWN: - if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { - logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); - Optional.ofNullable(this.parent.metrics.get(rows.getStream())) - .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); - this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable()); - retryWrite(t, retryCount - 1); - } else { - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); - } - break; - default: - logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode()); - this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); - } + this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t); this.parent.inflightRequestCount.arriveAndDeregister(); } - - private void retryWrite(Throwable t, int newRetryCount) { - var status = Status.fromThrowable(t); - try { - if (newRetryCount > 0) { - logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount); - this.parent.writeWithRetry(traceId, rows, newRetryCount); - } else { - logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t); - this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t); - } - } catch (Throwable e) { - this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e); - } - } - - private void doPauseBeforeRetry() { - try { - Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } } } From c0871d85155b9166d7772b6e339af2e5ce03c175 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 7 Mar 2024 16:14:38 +0200 Subject: [PATCH 3/4] fix: remove retry logic --- build.gradle | 26 +++++ .../client/BigQueryJsonClientProvider.java | 22 ++++- .../client/BigQueryProtoClientProvider.java | 18 ++++ .../model/config/WriterRetrySettings.java | 94 +++++++++++++++++++ .../bigquery/model/config/WriterSettings.java | 24 ++++- .../bigquery/BigQueryDefaultSinkTest.java | 4 + 6 files changed, 181 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java diff --git a/build.gradle b/build.gradle index ce615cf..474d97f 100755 --- a/build.gradle +++ b/build.gradle @@ -80,3 +80,29 @@ publishing { } } +tasks.withType(Test).configureEach { + doFirst { + jvmArgs = [ + '--add-exports=java.base/sun.net.util=ALL-UNNAMED', + '--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED', + '--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED', + '--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED', + '--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED', + '--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED', + '--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED', + '--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED', + '--add-opens=java.base/java.lang=ALL-UNNAMED', + '--add-opens=java.base/java.net=ALL-UNNAMED', + '--add-opens=java.base/java.io=ALL-UNNAMED', + '--add-opens=java.base/java.nio=ALL-UNNAMED', + '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED', + '--add-opens=java.base/java.text=ALL-UNNAMED', + '--add-opens=java.base/java.time=ALL-UNNAMED', + '--add-opens=java.base/java.util=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED' + ] + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java index ff4193a..a37a46d 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.client; import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.*; @@ -10,6 +11,7 @@ import com.vinted.flink.bigquery.model.config.WriterSettings; import com.vinted.flink.bigquery.schema.SchemaTransformer; import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import org.threeten.bp.Duration; import java.io.IOException; import java.util.Optional; @@ -46,13 +48,25 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa var executorProvider = this.writerSettings.getWriterThreads() > 1 ? FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) : BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); - var writer = JsonStreamWriter + var writerBuilder = JsonStreamWriter .newBuilder(streamName, getTableSchema(table), this.getClient()) .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) - .setExecutorProvider(executorProvider) - .build(); + .setExecutorProvider(executorProvider); + + if (writerSettings.getRetrySettings() != null) { + var settings = writerSettings.getRetrySettings(); + var retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis())) + .setRetryDelayMultiplier(settings.getRetryDelayMultiplier()) + .setMaxAttempts(settings.getMaxRetryAttempts()) + .setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis())) + .build(); + + writerBuilder.setRetrySettings(retrySettings); + } JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); - return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer); + return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writerBuilder.build()); } catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index d4e2ec5..bec03ce 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -1,6 +1,7 @@ package com.vinted.flink.bigquery.client; import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.*; @@ -9,6 +10,7 @@ import com.vinted.flink.bigquery.model.config.WriterSettings; import com.vinted.flink.bigquery.schema.SchemaTransformer; import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import org.threeten.bp.Duration; import java.io.IOException; import java.util.concurrent.Executors; @@ -45,6 +47,9 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa var executorProvider = this.writerSettings.getWriterThreads() > 1 ? FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) : BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); + + + var streamWriterBuilder = StreamWriter .newBuilder(streamName, getClient()) .setMaxInflightRequests(this.writerSettings.getMaxInflightRequests()) @@ -56,6 +61,19 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setLocation(table.getProject()) .setWriterSchema(protoSchema); + if (writerSettings.getRetrySettings() != null) { + var settings = writerSettings.getRetrySettings(); + var retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis())) + .setRetryDelayMultiplier(settings.getRetryDelayMultiplier()) + .setMaxAttempts(settings.getMaxRetryAttempts()) + .setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis())) + .build(); + + streamWriterBuilder.setRetrySettings(retrySettings); + } + StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build()); } catch (IOException | Descriptors.DescriptorValidationException e) { diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java new file mode 100644 index 0000000..5072fcf --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java @@ -0,0 +1,94 @@ +package com.vinted.flink.bigquery.model.config; + +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; + +public class WriterRetrySettings implements Serializable { + + private Duration initialRetryDelay; + private double retryDelayMultiplier; + + private int maxRetryAttempts; + + private Duration maxRetryDelay; + + public Duration getInitialRetryDelay() { + return initialRetryDelay; + } + + public void setInitialRetryDelay(Duration initialRetryDelay) { + this.initialRetryDelay = initialRetryDelay; + } + + public double getRetryDelayMultiplier() { + return retryDelayMultiplier; + } + + public void setRetryDelayMultiplier(double retryDelayMultiplier) { + this.retryDelayMultiplier = retryDelayMultiplier; + } + + public int getMaxRetryAttempts() { + return maxRetryAttempts; + } + + public void setMaxRetryAttempts(int maxRetryAttempts) { + this.maxRetryAttempts = maxRetryAttempts; + } + + public Duration getMaxRetryDelay() { + return maxRetryDelay; + } + + public void setMaxRetryDelay(Duration maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + } + public static WriterRetrySettingsBuilder newBuilder() { + return new WriterRetrySettingsBuilder(); + } + + public static final class WriterRetrySettingsBuilder implements Serializable { + private Duration initialRetryDelay = Duration.ofMillis(500); + private double retryDelayMultiplier = 1.1; + + private int maxRetryAttempts = 5; + + private Duration maxRetryDelay = Duration.ofMinutes(1); + private WriterRetrySettingsBuilder() { + } + + public WriterRetrySettingsBuilder withInitialRetryDelay(Duration initialRetryDelay) { + this.initialRetryDelay = initialRetryDelay; + return this; + } + + public WriterRetrySettingsBuilder withRetryDelayMultiplier(double retryDelayMultiplier) { + this.retryDelayMultiplier = retryDelayMultiplier; + return this; + } + + public WriterRetrySettingsBuilder withMaxRetryAttempts(int maxRetryAttempts) { + this.maxRetryAttempts = maxRetryAttempts; + return this; + } + + public WriterRetrySettingsBuilder withMaxRetryDelay(Duration maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + return this; + } + + public WriterRetrySettings build() { + WriterRetrySettings retrySettings = new WriterRetrySettings(); + retrySettings.initialRetryDelay = this.initialRetryDelay; + retrySettings.retryDelayMultiplier = this.retryDelayMultiplier; + retrySettings.maxRetryAttempts = this.maxRetryAttempts; + retrySettings.maxRetryDelay = this.maxRetryDelay; + return retrySettings; + } + } +} + + diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java index d96eb5b..1e55a1d 100644 --- a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java +++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java @@ -13,12 +13,14 @@ public class WriterSettings implements Serializable { private Duration timeout; private int retryCount; private Duration retryPause; - private Long maxInflightRequests; - private Long maxInflightBytes; + private long maxInflightRequests; + private long maxInflightBytes; private Duration maxRetryDuration; private Duration maxRequestWaitCallbackTime; - private Boolean enableConnectionPool; + private boolean enableConnectionPool; + + private WriterRetrySettings retrySettings; public int getStreamsPerTable() { return streamsPerTable; @@ -79,6 +81,14 @@ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) { this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime; } + public WriterRetrySettings getRetrySettings() { + return retrySettings; + } + + public void setRetrySettings(WriterRetrySettings retrySettings) { + this.retrySettings = retrySettings; + } + public static final class WriterSettingsBuilder implements Serializable { private int streamsPerTable = 1; private int writerThreads = 1; @@ -91,6 +101,8 @@ public static final class WriterSettingsBuilder implements Serializable { private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5); private Boolean enableConnectionPool = false; + private WriterRetrySettings retrySettings = null; + private WriterSettingsBuilder() { } @@ -144,6 +156,11 @@ public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPo return this; } + public WriterSettingsBuilder withRetrySettings(WriterRetrySettings retrySettings) { + this.retrySettings = retrySettings; + return this; + } + public WriterSettings build() { WriterSettings writerSettings = new WriterSettings(); writerSettings.writerThreads = this.writerThreads; @@ -156,6 +173,7 @@ public WriterSettings build() { writerSettings.retryPause = this.retryPause; writerSettings.maxRetryDuration = this.maxRetryDuration; writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime; + writerSettings.retrySettings = this.retrySettings; return writerSettings; } } diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index 982a449..ed1ee4d 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -44,6 +44,7 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn } @Test + @Disabled("Retry logic causes locking") public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL); @@ -60,6 +61,7 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam } @Test + @Disabled("Retry logic causes locking") public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { var cause = new Exceptions.MaximumRequestCallbackWaitTimeExceededException(Duration.ofMinutes(6), "id", Duration.ofMinutes(5)); mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN.withCause(cause)); @@ -145,6 +147,7 @@ public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.Flink } @Test + @Disabled("Retry logic causes locking") public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE); mockClientProvider.givenRetryCount(2); @@ -162,6 +165,7 @@ public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTe } @Test + @Disabled("Retry logic causes locking") public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { mockClientProvider.givenAppendingTooLargeBatch(); From c78b2b27f11ba4022d856ee840c862ce4302b649 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Thu, 7 Mar 2024 16:21:52 +0200 Subject: [PATCH 4/4] test: fix failing testS --- .../flink/bigquery/BigQueryBufferedSinkTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java index d69c14e..36ba548 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java @@ -38,7 +38,7 @@ public void shouldAppendRows(@FlinkParam PipelineRunner runner, @FlinkParam Mock givenRows(1) )))); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); } @Test @@ -52,7 +52,7 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkParam PipelineRu )))); - verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any()); + verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any(), anyLong()); } @Test @@ -61,14 +61,14 @@ public void shouldRetryOnRecoverableException(@FlinkParam PipelineRunner runner, assertThatThrownBy(() -> { runner - .withRetryCount(0) + .withRetryCount(4) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( givenRows(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any()); + verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any(), anyLong()); } @Test @@ -84,7 +84,7 @@ public void shouldNotRetryOnNonRecoverableException(@FlinkParam PipelineRunner r }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); } private Rows givenRows(int count) { @@ -102,7 +102,7 @@ private Function>> pipeline( private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { var sink = BigQueryStreamSink.newBuilder() .withClientProvider(mockClientProvider) - .withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .withRowValueSerializer((JsonRowValueSerializer) String::getBytes) .build();