diff --git a/src/main/java/net/snowflake/ingest/connection/TelemetryService.java b/src/main/java/net/snowflake/ingest/connection/TelemetryService.java index 1e69e5799..15e2a0c5a 100644 --- a/src/main/java/net/snowflake/ingest/connection/TelemetryService.java +++ b/src/main/java/net/snowflake/ingest/connection/TelemetryService.java @@ -11,6 +11,8 @@ import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import com.google.common.util.concurrent.RateLimiter; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; @@ -57,7 +59,7 @@ public String toString() { private static final String PERCENTILE99TH = "99thPercentile"; private final TelemetryClient telemetry; private final String clientName; - private final RateLimiter rateLimiter; + private final Map rateLimitersMap; /** * Default constructor @@ -69,8 +71,7 @@ public String toString() { TelemetryService(CloseableHttpClient httpClient, String clientName, String url) { this.clientName = clientName; this.telemetry = (TelemetryClient) TelemetryClient.createSessionlessTelemetry(httpClient, url); - // At most once every second - this.rateLimiter = RateLimiter.create(1.0); + this.rateLimitersMap = new HashMap<>(); } /** Flush the telemetry buffer and close the telemetry service */ @@ -127,12 +128,16 @@ public void reportCpuMemoryUsage(Histogram cpuUsage) { } } + /** Report the offset token mismatch in a batch */ public void reportBatchOffsetMismatch( String channelName, String prevBatchEndOffset, String startOffset, String endOffset, long rowCount) { + // Add a rate limiter to report the mismatch at most once every second per channel + RateLimiter rateLimiter = + rateLimitersMap.computeIfAbsent(channelName, v -> RateLimiter.create(1.0)); if (rateLimiter.tryAcquire()) { ObjectNode msg = MAPPER.createObjectNode(); msg.put("channel_name", channelName);