Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Feb 27, 2024
1 parent e74c5b3 commit 94bd539
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.RateLimiter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -57,7 +59,7 @@ public String toString() {
private static final String PERCENTILE99TH = "99thPercentile";
private final TelemetryClient telemetry;
private final String clientName;
private final RateLimiter rateLimiter;
private final Map<String, RateLimiter> rateLimitersMap;

/**
* Default constructor
Expand All @@ -69,8 +71,7 @@ public String toString() {
TelemetryService(CloseableHttpClient httpClient, String clientName, String url) {
this.clientName = clientName;
this.telemetry = (TelemetryClient) TelemetryClient.createSessionlessTelemetry(httpClient, url);
// At most once every second
this.rateLimiter = RateLimiter.create(1.0);
this.rateLimitersMap = new HashMap<>();
}

/** Flush the telemetry buffer and close the telemetry service */
Expand Down Expand Up @@ -127,12 +128,16 @@ public void reportCpuMemoryUsage(Histogram cpuUsage) {
}
}

/** Report the offset token mismatch in a batch */
public void reportBatchOffsetMismatch(
String channelName,
String prevBatchEndOffset,
String startOffset,
String endOffset,
long rowCount) {
// Add a rate limiter to report the mismatch at most once every second per channel
RateLimiter rateLimiter =
rateLimitersMap.computeIfAbsent(channelName, v -> RateLimiter.create(1.0));
if (rateLimiter.tryAcquire()) {
ObjectNode msg = MAPPER.createObjectNode();
msg.put("channel_name", channelName);
Expand Down

0 comments on commit 94bd539

Please sign in to comment.