Skip to content

Commit

Permalink
[LI-HOTFIX] Add log de-duplicate for ConfigException() in parseAndVal…
Browse files Browse the repository at this point in the history
…idateAddresses() (#504) (#505)

* add log dedup for ConfigException() in parseAndValidateAddresses

* dedup warn
  • Loading branch information
Q1Liu authored Mar 6, 2024
1 parent 84e7571 commit 1c0e2f9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
31 changes: 27 additions & 4 deletions clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
Expand All @@ -39,6 +41,8 @@

public final class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
private static final long DUPLICATE_WINDOW_MS = 1000; // 1 second
private static final Map<String, Long> ERROR_DEDUPLICATION_CACHE = new ConcurrentHashMap<>();

private ClientUtils() {
}
Expand Down Expand Up @@ -71,15 +75,18 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url
String resolvedCanonicalName = inetAddress.getCanonicalHostName();
InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
String message = String.format("Couldn't resolve server %s from %s as DNS resolution of the canonical hostname %s failed for %s",
url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
dedupeAndHandleMessage(message, false);
} else {
addresses.add(address);
}
}
} else {
InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
String message = String.format("Couldn't resolve server %s from %s as DNS resolution failed for %s", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
dedupeAndHandleMessage(message, false);
} else {
addresses.add(address);
}
Expand All @@ -93,11 +100,27 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url
}
}
if (addresses.isEmpty())
throw new ConfigException("No resolvable bootstrap server in provided urls: " +
String.join(",", urls));
dedupeAndHandleMessage("No resolvable bootstrap server in provided urls: " + String.join(",", urls), true);
return addresses;
}

public static void dedupeAndHandleMessage(String message, Boolean isError) {
long currentTime = System.currentTimeMillis();
if (!isDuplicateError(message, currentTime)) {
ERROR_DEDUPLICATION_CACHE.put(message, currentTime);
if (isError) {
throw new ConfigException(message);
} else {
log.warn(message);
}
}
}

private static boolean isDuplicateError(String message, long currentTime) {
Long previousTime = ERROR_DEDUPLICATION_CACHE.get(message);
return previousTime != null && (currentTime - previousTime) < DUPLICATE_WINDOW_MS;
}

/**
* @param config client configs
* @return configured ChannelBuilder based on the configs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -113,6 +114,25 @@ public void testResolveDnsLookupAllIps() throws UnknownHostException {
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
}

@Test
public void testParseAndValidateAddressesDedupesErrors() {
int expectedNumberOfErrors = 1;
int actualNumberOfErrors = 0;
String expectedErrorMessage = "No resolvable bootstrap server in provided urls: ";

for (int i = 0; i < 10; i++) {
try {
ClientUtils.parseAndValidateAddresses(Collections.emptyList());
} catch (ConfigException e) {
assertEquals(expectedErrorMessage, e.getMessage());
actualNumberOfErrors++;
}
}

// Verify that only one error was thrown during the loop
assertEquals(expectedNumberOfErrors, actualNumberOfErrors);
}

private List<InetSocketAddress> checkWithoutLookup(String... url) {
return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT);
}
Expand Down

0 comments on commit 1c0e2f9

Please sign in to comment.