diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index f2a813af27a67..e5b62c3bba83c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SaslConfigs; @@ -39,6 +41,8 @@ public final class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); + private static final long DUPLICATE_WINDOW_MS = 1000; // 1 second + private static final Map ERROR_DEDUPLICATION_CACHE = new ConcurrentHashMap<>(); private ClientUtils() { } @@ -71,7 +75,9 @@ public static List parseAndValidateAddresses(List url String resolvedCanonicalName = inetAddress.getCanonicalHostName(); InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); if (address.isUnresolved()) { - log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname {} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); + String message = String.format("Couldn't resolve server %s from %s as DNS resolution of the canonical hostname %s failed for %s", + url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); + dedupeAndHandleMessage(message, false); } else { addresses.add(address); } @@ -79,7 +85,8 @@ public static List parseAndValidateAddresses(List url } else { InetSocketAddress address = new InetSocketAddress(host, port); if (address.isUnresolved()) { - log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + String message = String.format("Couldn't resolve server %s from %s as DNS resolution failed for %s", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + dedupeAndHandleMessage(message, false); } else { addresses.add(address); } @@ -93,11 +100,27 @@ public static List parseAndValidateAddresses(List url } } if (addresses.isEmpty()) - throw new ConfigException("No resolvable bootstrap server in provided urls: " + - String.join(",", urls)); + dedupeAndHandleMessage("No resolvable bootstrap server in provided urls: " + String.join(",", urls), true); return addresses; } + public static void dedupeAndHandleMessage(String message, Boolean isError) { + long currentTime = System.currentTimeMillis(); + if (!isDuplicateError(message, currentTime)) { + ERROR_DEDUPLICATION_CACHE.put(message, currentTime); + if (isError) { + throw new ConfigException(message); + } else { + log.warn(message); + } + } + } + + private static boolean isDuplicateError(String message, long currentTime) { + Long previousTime = ERROR_DEDUPLICATION_CACHE.get(message); + return previousTime != null && (currentTime - previousTime) < DUPLICATE_WINDOW_MS; + } + /** * @param config client configs * @return configured ChannelBuilder based on the configs. diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index 5f545b901d911..9a33e852baef9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigException; @@ -113,6 +114,25 @@ public void testResolveDnsLookupAllIps() throws UnknownHostException { assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1); } + @Test + public void testParseAndValidateAddressesDedupesErrors() { + int expectedNumberOfErrors = 1; + int actualNumberOfErrors = 0; + String expectedErrorMessage = "No resolvable bootstrap server in provided urls: "; + + for (int i = 0; i < 10; i++) { + try { + ClientUtils.parseAndValidateAddresses(Collections.emptyList()); + } catch (ConfigException e) { + assertEquals(expectedErrorMessage, e.getMessage()); + actualNumberOfErrors++; + } + } + + // Verify that only one error was thrown during the loop + assertEquals(expectedNumberOfErrors, actualNumberOfErrors); + } + private List checkWithoutLookup(String... url) { return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT); }