From 672c917476376e71bcf1b5554c50eb1ac6ab4efc Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 6 Mar 2024 09:44:49 +0200 Subject: [PATCH 1/2] fix: add callback timeout config --- .../client/BigQueryJsonClientProvider.java | 6 ++---- .../client/BigQueryProtoClientProvider.java | 3 ++- .../bigquery/metric/BigQueryStreamMetrics.java | 10 ++++++++++ .../bigquery/model/config/WriterSettings.java | 17 +++++++++++++++++ .../flink/bigquery/sink/BigQuerySinkWriter.java | 1 + .../BigQueryDefaultSinkWriter.java | 2 ++ 6 files changed, 34 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java index 0590b7a..ff4193a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java @@ -3,10 +3,8 @@ import com.google.api.gax.core.FixedExecutorProvider; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.*; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.Descriptors; import com.vinted.flink.bigquery.model.config.Credentials; import com.vinted.flink.bigquery.model.config.WriterSettings; @@ -53,7 +51,7 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) .setExecutorProvider(executorProvider) .build(); - + JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer); } catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) { throw new RuntimeException(e); diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index 85b1759..b4a827a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -51,10 +51,11 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setMaxInflightBytes(this.writerSettings.getMaxInflightBytes()) .setMaxRetryDuration(this.writerSettings.getMaxRetryDuration()) .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) + .setChannelProvider(BigQueryWriteSettings.defaultTransportChannelProvider()) .setExecutorProvider(executorProvider) .setLocation(table.getProject()) .setWriterSchema(protoSchema); - + StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build()); } catch (IOException | Descriptors.DescriptorValidationException e) { throw new RuntimeException(e); diff --git a/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java b/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java index 7a3223e..d49c87c 100644 --- a/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java +++ b/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java @@ -8,6 +8,8 @@ public class BigQueryStreamMetrics { private double batchSizeInMb = 0.0; private long splitBatchCount = 0; + private int timeoutCount = 0; + public BigQueryStreamMetrics(String streamName) { this.streamName = streamName; } @@ -42,4 +44,12 @@ public double getBatchSizeInMb() { public long getSplitBatchCount() { return splitBatchCount; } + + public int getTimeoutCount() { + return timeoutCount; + } + + public void incrementTimeoutCount() { + this.timeoutCount++; + } } diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java index e3d3ecb..72673cc 100644 --- a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java +++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java @@ -16,6 +16,8 @@ public class WriterSettings implements Serializable { private Long maxInflightRequests; private Long maxInflightBytes; private Duration maxRetryDuration; + + private Duration maxRequestWaitCallbackTime; private Boolean enableConnectionPool; public int getStreamsPerTable() { @@ -69,6 +71,14 @@ public static WriterSettingsBuilder newBuilder() { return new WriterSettingsBuilder(); } + public Duration getMaxRequestWaitCallbackTime() { + return maxRequestWaitCallbackTime; + } + + public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) { + this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime; + } + public static final class WriterSettingsBuilder implements Serializable { private int streamsPerTable = 1; private int writerThreads = 1; @@ -78,6 +88,7 @@ public static final class WriterSettingsBuilder implements Serializable { private Long maxInflightRequests = 1000L; private Long maxInflightBytes = 100L * 1024L * 1024L; // 100Mb. private Duration maxRetryDuration = Duration.ofMinutes(5); + private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5); private Boolean enableConnectionPool = false; private WriterSettingsBuilder() { @@ -123,6 +134,11 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) { return this; } + public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) { + this.maxRequestWaitCallbackTime = maxRetryDuration; + return this; + } + public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPool) { this.enableConnectionPool = enableConnectionPool; return this; @@ -139,6 +155,7 @@ public WriterSettings build() { writerSettings.maxInflightRequests = this.maxInflightRequests; writerSettings.retryPause = this.retryPause; writerSettings.maxRetryDuration = this.maxRetryDuration; + writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime; return writerSettings; } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index 35bdc17..c8bf70e 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -111,6 +111,7 @@ public void write(Rows rows, Context context) { group.gauge("batch_count", metric::getBatchCount); group.gauge("batch_size_mb", metric::getBatchSizeInMb); group.gauge("split_batch_count", metric::getSplitBatchCount); + group.gauge("callback_timeouts", metric::getTimeoutCount); return metric; }); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index a7de3a0..d35a744 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -191,6 +191,8 @@ public void onFailure(Throwable t) { case UNKNOWN: if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + Optional.ofNullable(this.parent.metrics.get(rows.getStream())) + .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable()); retryWrite(t, retryCount - 1); } else { From 8d89378b2b105b2e88c020f987a91adc13bedae2 Mon Sep 17 00:00:00 2001 From: gintarasm Date: Wed, 6 Mar 2024 10:49:16 +0200 Subject: [PATCH 2/2] feat: add inlight time metric --- .../client/BigQueryProtoClientProvider.java | 1 + .../flink/bigquery/client/BigQueryStreamWriter.java | 2 ++ .../flink/bigquery/client/JsonStreamWriter.java | 5 +++++ .../flink/bigquery/client/ProtoStreamWriter.java | 5 +++++ .../flink/bigquery/sink/BigQuerySinkWriter.java | 13 ++++++++++++- 5 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index b4a827a..d4e2ec5 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -55,6 +55,7 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setExecutorProvider(executorProvider) .setLocation(table.getProject()) .setWriterSchema(protoSchema); + StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build()); } catch (IOException | Descriptors.DescriptorValidationException e) { diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java index 9485adb..0b934cb 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java @@ -11,5 +11,7 @@ public interface BigQueryStreamWriter extends AutoCloseable { String getStreamName(); String getWriterId(); + + long getInflightWaitSeconds(); boolean isClosed(); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java index c26983b..652cc4a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java @@ -42,6 +42,11 @@ public ApiFuture append(Rows data, long offset) { } } + @Override + public long getInflightWaitSeconds() { + return writer.getInflightWaitSeconds(); + } + @Override public String getStreamName() { return writer.getStreamName(); diff --git a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java index f651476..978b492 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java @@ -38,6 +38,11 @@ public ApiFuture append(Rows data, long offset) { return writer.append(prows, offset); } + @Override + public long getInflightWaitSeconds() { + return writer.getInflightWaitSeconds(); + } + @Override public String getStreamName() { return writer.getStreamName(); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index c8bf70e..b7702ab 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -69,12 +69,22 @@ public BigQuerySinkWriter( } + private void registerInflightMetric(String streamName, BigQueryStreamWriter writer) { + var group = metricGroup + .addGroup("stream", streamName) + .addGroup("writer_id", writer.getWriterId()); + + group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds); + } + protected final BigQueryStreamWriter streamWriter(String traceId, String streamName, TableId table) { var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next()); return streamMap.computeIfAbsent(streamWithIndex, name -> { logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex); // Stream name can't contain index - return this.clientProvider.getWriter(streamName, table, rowSerializer); + var writer = this.clientProvider.getWriter(streamName, table, rowSerializer); + registerInflightMetric(streamName, writer); + return writer; }); } @@ -91,6 +101,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); } newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer); + registerInflightMetric(streamName, writer); } return newWriter; });