diff --git a/build.gradle b/build.gradle
index ce615cf..474d97f 100755
--- a/build.gradle
+++ b/build.gradle
@@ -80,3 +80,29 @@ publishing {
}
}
+tasks.withType(Test).configureEach {
+ doFirst {
+ jvmArgs = [
+ '--add-exports=java.base/sun.net.util=ALL-UNNAMED',
+ '--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED',
+ '--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED',
+ '--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED',
+ '--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED',
+ '--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED',
+ '--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED',
+ '--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED',
+ '--add-opens=java.base/java.lang=ALL-UNNAMED',
+ '--add-opens=java.base/java.net=ALL-UNNAMED',
+ '--add-opens=java.base/java.io=ALL-UNNAMED',
+ '--add-opens=java.base/java.nio=ALL-UNNAMED',
+ '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED',
+ '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
+ '--add-opens=java.base/java.text=ALL-UNNAMED',
+ '--add-opens=java.base/java.time=ALL-UNNAMED',
+ '--add-opens=java.base/java.util=ALL-UNNAMED',
+ '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
+ '--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED',
+ '--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED'
+ ]
+ }
+}
diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java
index ff4193a..a37a46d 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;
import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
@@ -10,6 +11,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
+import org.threeten.bp.Duration;
import java.io.IOException;
import java.util.Optional;
@@ -46,13 +48,25 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
- var writer = JsonStreamWriter
+ var writerBuilder = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
- .setExecutorProvider(executorProvider)
- .build();
+ .setExecutorProvider(executorProvider);
+
+ if (writerSettings.getRetrySettings() != null) {
+ var settings = writerSettings.getRetrySettings();
+ var retrySettings =
+ RetrySettings.newBuilder()
+ .setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
+ .setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
+ .setMaxAttempts(settings.getMaxRetryAttempts())
+ .setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
+ .build();
+
+ writerBuilder.setRetrySettings(retrySettings);
+ }
JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
- return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
+ return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writerBuilder.build());
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java
index d4e2ec5..bec03ce 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;
import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
@@ -9,6 +10,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
+import org.threeten.bp.Duration;
import java.io.IOException;
import java.util.concurrent.Executors;
@@ -45,6 +47,9 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
+
+
+
var streamWriterBuilder = StreamWriter
.newBuilder(streamName, getClient())
.setMaxInflightRequests(this.writerSettings.getMaxInflightRequests())
@@ -56,6 +61,19 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa
.setLocation(table.getProject())
.setWriterSchema(protoSchema);
+ if (writerSettings.getRetrySettings() != null) {
+ var settings = writerSettings.getRetrySettings();
+ var retrySettings =
+ RetrySettings.newBuilder()
+ .setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
+ .setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
+ .setMaxAttempts(settings.getMaxRetryAttempts())
+ .setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
+ .build();
+
+ streamWriterBuilder.setRetrySettings(retrySettings);
+ }
+
StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java
new file mode 100644
index 0000000..5072fcf
--- /dev/null
+++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterRetrySettings.java
@@ -0,0 +1,94 @@
+package com.vinted.flink.bigquery.model.config;
+
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+
+public class WriterRetrySettings implements Serializable {
+
+ private Duration initialRetryDelay;
+ private double retryDelayMultiplier;
+
+ private int maxRetryAttempts;
+
+ private Duration maxRetryDelay;
+
+ public Duration getInitialRetryDelay() {
+ return initialRetryDelay;
+ }
+
+ public void setInitialRetryDelay(Duration initialRetryDelay) {
+ this.initialRetryDelay = initialRetryDelay;
+ }
+
+ public double getRetryDelayMultiplier() {
+ return retryDelayMultiplier;
+ }
+
+ public void setRetryDelayMultiplier(double retryDelayMultiplier) {
+ this.retryDelayMultiplier = retryDelayMultiplier;
+ }
+
+ public int getMaxRetryAttempts() {
+ return maxRetryAttempts;
+ }
+
+ public void setMaxRetryAttempts(int maxRetryAttempts) {
+ this.maxRetryAttempts = maxRetryAttempts;
+ }
+
+ public Duration getMaxRetryDelay() {
+ return maxRetryDelay;
+ }
+
+ public void setMaxRetryDelay(Duration maxRetryDelay) {
+ this.maxRetryDelay = maxRetryDelay;
+ }
+ public static WriterRetrySettingsBuilder newBuilder() {
+ return new WriterRetrySettingsBuilder();
+ }
+
+ public static final class WriterRetrySettingsBuilder implements Serializable {
+ private Duration initialRetryDelay = Duration.ofMillis(500);
+ private double retryDelayMultiplier = 1.1;
+
+ private int maxRetryAttempts = 5;
+
+ private Duration maxRetryDelay = Duration.ofMinutes(1);
+ private WriterRetrySettingsBuilder() {
+ }
+
+ public WriterRetrySettingsBuilder withInitialRetryDelay(Duration initialRetryDelay) {
+ this.initialRetryDelay = initialRetryDelay;
+ return this;
+ }
+
+ public WriterRetrySettingsBuilder withRetryDelayMultiplier(double retryDelayMultiplier) {
+ this.retryDelayMultiplier = retryDelayMultiplier;
+ return this;
+ }
+
+ public WriterRetrySettingsBuilder withMaxRetryAttempts(int maxRetryAttempts) {
+ this.maxRetryAttempts = maxRetryAttempts;
+ return this;
+ }
+
+ public WriterRetrySettingsBuilder withMaxRetryDelay(Duration maxRetryDelay) {
+ this.maxRetryDelay = maxRetryDelay;
+ return this;
+ }
+
+ public WriterRetrySettings build() {
+ WriterRetrySettings retrySettings = new WriterRetrySettings();
+ retrySettings.initialRetryDelay = this.initialRetryDelay;
+ retrySettings.retryDelayMultiplier = this.retryDelayMultiplier;
+ retrySettings.maxRetryAttempts = this.maxRetryAttempts;
+ retrySettings.maxRetryDelay = this.maxRetryDelay;
+ return retrySettings;
+ }
+ }
+}
+
+
diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java
index 72673cc..1e55a1d 100644
--- a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java
+++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java
@@ -13,12 +13,14 @@ public class WriterSettings implements Serializable {
private Duration timeout;
private int retryCount;
private Duration retryPause;
- private Long maxInflightRequests;
- private Long maxInflightBytes;
+ private long maxInflightRequests;
+ private long maxInflightBytes;
private Duration maxRetryDuration;
private Duration maxRequestWaitCallbackTime;
- private Boolean enableConnectionPool;
+ private boolean enableConnectionPool;
+
+ private WriterRetrySettings retrySettings;
public int getStreamsPerTable() {
return streamsPerTable;
@@ -79,6 +81,14 @@ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
}
+ public WriterRetrySettings getRetrySettings() {
+ return retrySettings;
+ }
+
+ public void setRetrySettings(WriterRetrySettings retrySettings) {
+ this.retrySettings = retrySettings;
+ }
+
public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
@@ -91,6 +101,8 @@ public static final class WriterSettingsBuilder implements Serializable {
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;
+ private WriterRetrySettings retrySettings = null;
+
private WriterSettingsBuilder() {
}
@@ -135,7 +147,7 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
}
public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
- this.maxRequestWaitCallbackTime = maxRetryDuration;
+ this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
return this;
}
@@ -144,6 +156,11 @@ public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPo
return this;
}
+ public WriterSettingsBuilder withRetrySettings(WriterRetrySettings retrySettings) {
+ this.retrySettings = retrySettings;
+ return this;
+ }
+
public WriterSettings build() {
WriterSettings writerSettings = new WriterSettings();
writerSettings.writerThreads = this.writerThreads;
@@ -156,6 +173,7 @@ public WriterSettings build() {
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
+ writerSettings.retrySettings = this.retrySettings;
return writerSettings;
}
}
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
index d35a744..c1ab24c 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
@@ -88,7 +88,7 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro
traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount
);
var result = append(traceId, rows);
- var callback = new AppendCallBack<>(this, result.writerId, traceId, rows, retryCount);
+ var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(result.response, callback, appendExecutor);
inflightRequestCount.register();
} catch (AppendException exception) {
@@ -130,14 +130,12 @@ static class AppendCallBack implements ApiFutureCallback
private final BigQueryDefaultSinkWriter parent;
private final Rows rows;
- private final String writerId;
private final String traceId;
private final int retryCount;
- public AppendCallBack(BigQueryDefaultSinkWriter parent, String writerId, String traceId, Rows rows, int retryCount) {
+ public AppendCallBack(BigQueryDefaultSinkWriter parent, String traceId, Rows rows, int retryCount) {
this.parent = parent;
- this.writerId = writerId;
this.traceId = traceId;
this.rows = rows;
this.retryCount = retryCount;
@@ -155,79 +153,8 @@ public void onSuccess(AppendRowsResponse result) {
@Override
public void onFailure(Throwable t) {
- var status = Status.fromThrowable(t);
- switch (status.getCode()) {
- case INTERNAL:
- case CANCELLED:
- case FAILED_PRECONDITION:
- case DEADLINE_EXCEEDED:
- doPauseBeforeRetry();
- retryWrite(t, retryCount - 1);
- break;
- case ABORTED:
- case UNAVAILABLE: {
- this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
- retryWrite(t, retryCount - 1);
- break;
- }
- case INVALID_ARGUMENT:
- if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
- Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
- logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
- var data = rows.getData();
- var first = data.subList(0, data.size() / 2);
- var second = data.subList(data.size() / 2, data.size());
- try {
- this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
- this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
- } catch (Throwable e) {
- this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
- }
- } else {
- logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
- this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
- }
- break;
- case UNKNOWN:
- if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
- logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
- Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
- .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
- this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
- retryWrite(t, retryCount - 1);
- } else {
- logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
- this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
- }
- break;
- default:
- logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
- this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
- }
+ this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
this.parent.inflightRequestCount.arriveAndDeregister();
}
-
- private void retryWrite(Throwable t, int newRetryCount) {
- var status = Status.fromThrowable(t);
- try {
- if (newRetryCount > 0) {
- logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount);
- this.parent.writeWithRetry(traceId, rows, newRetryCount);
- } else {
- logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
- this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
- }
- } catch (Throwable e) {
- this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
- }
- }
-
- private void doPauseBeforeRetry() {
- try {
- Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
}
}
diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java
index d69c14e..36ba548 100644
--- a/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java
+++ b/src/test/java/com/vinted/flink/bigquery/BigQueryBufferedSinkTest.java
@@ -38,7 +38,7 @@ public void shouldAppendRows(@FlinkParam PipelineRunner runner, @FlinkParam Mock
givenRows(1)
))));
- verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
+ verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
}
@Test
@@ -52,7 +52,7 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkParam PipelineRu
))));
- verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any());
+ verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any(), anyLong());
}
@Test
@@ -61,14 +61,14 @@ public void shouldRetryOnRecoverableException(@FlinkParam PipelineRunner runner,
assertThatThrownBy(() -> {
runner
- .withRetryCount(0)
+ .withRetryCount(4)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRows(1)
))));
}).isInstanceOf(JobExecutionException.class);
- verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any());
+ verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any(), anyLong());
}
@Test
@@ -84,7 +84,7 @@ public void shouldNotRetryOnNonRecoverableException(@FlinkParam PipelineRunner r
}).isInstanceOf(JobExecutionException.class);
- verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
+ verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
}
private Rows givenRows(int count) {
@@ -102,7 +102,7 @@ private Function>> pipeline(
private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) {
var sink = BigQueryStreamSink.newBuilder()
.withClientProvider(mockClientProvider)
- .withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.withRowValueSerializer((JsonRowValueSerializer) String::getBytes)
.build();
diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
index 982a449..ed1ee4d 100644
--- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
+++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
@@ -44,6 +44,7 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn
}
@Test
+ @Disabled("Retry logic causes locking")
public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INTERNAL);
@@ -60,6 +61,7 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam
}
@Test
+ @Disabled("Retry logic causes locking")
public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTimeExceededException(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
var cause = new Exceptions.MaximumRequestCallbackWaitTimeExceededException(Duration.ofMinutes(6), "id", Duration.ofMinutes(5));
mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN.withCause(cause));
@@ -145,6 +147,7 @@ public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.Flink
}
@Test
+ @Disabled("Retry logic causes locking")
public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE);
mockClientProvider.givenRetryCount(2);
@@ -162,6 +165,7 @@ public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTe
}
@Test
+ @Disabled("Retry logic causes locking")
public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenAppendingTooLargeBatch();