From d595921f03c23b778bcd5d5a36afe685d6643aae Mon Sep 17 00:00:00 2001 From: Benjamin Billet Date: Thu, 25 Jul 2019 12:41:42 +0200 Subject: [PATCH] Synchronize healtchecks and operator state --- .../java/nb/kafka/operator/HealthServer.java | 26 +++++-------------- .../java/nb/kafka/operator/KafkaOperator.java | 15 +++++++++++ src/main/java/nb/kafka/operator/Main.java | 2 +- .../operator/KafkaOperatorStateTest.java | 23 +++++++++++++++- .../kafka/operator/PartitionedTopicTest.java | 5 ---- 5 files changed, 45 insertions(+), 26 deletions(-) delete mode 100644 src/test/java/nb/kafka/operator/PartitionedTopicTest.java diff --git a/src/main/java/nb/kafka/operator/HealthServer.java b/src/main/java/nb/kafka/operator/HealthServer.java index 03f108b..a850267 100644 --- a/src/main/java/nb/kafka/operator/HealthServer.java +++ b/src/main/java/nb/kafka/operator/HealthServer.java @@ -2,46 +2,34 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sun.net.httpserver.HttpServer; // NOSONAR -import nb.kafka.operator.model.OperatorError; - @SuppressWarnings("restriction") public final class HealthServer { + private final KafkaOperator operator; - private HealthServer() { + public HealthServer(KafkaOperator operator) { + this.operator = operator; } private static final Logger logger = LoggerFactory.getLogger(HealthServer.class); - public static void start(AppConfig config) throws IOException { - HttpServer server = HttpServer.create(new InetSocketAddress(config.getHealthsPort()), 0); + public void start(int port) throws IOException { + HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); server.createContext("/healthy", exchange -> exchange.sendResponseHeaders(200, -1)); server.createContext("/ready", exchange -> { - if (isKafkaReachable(config)) { + if (operator.checkOperatorState()) { exchange.sendResponseHeaders(200, -1); } else { exchange.sendResponseHeaders(500, -1); } }); - logger.info("Starting health checks server on port: {}", config.getHealthsPort()); + logger.info("Starting health checks server on port: {}", port); server.start(); } - - private static boolean isKafkaReachable(AppConfig config) { - try (KafkaAdmin ka = new KafkaAdminImpl(config)) { - ka.listTopics(); - return true; - } catch (TimeoutException | InterruptedException | ExecutionException e) { // NOSONAR - logger.error(String.format(OperatorError.KAFKA_UNREACHABLE.toString(), config.getBootstrapServers())); - return false; - } - } } \ No newline at end of file diff --git a/src/main/java/nb/kafka/operator/KafkaOperator.java b/src/main/java/nb/kafka/operator/KafkaOperator.java index 6167656..b349bb0 100644 --- a/src/main/java/nb/kafka/operator/KafkaOperator.java +++ b/src/main/java/nb/kafka/operator/KafkaOperator.java @@ -15,6 +15,7 @@ import io.micrometer.core.instrument.Gauge; import nb.kafka.operator.importer.ConfigMapImporter; import nb.kafka.operator.importer.TopicImporter; +import nb.kafka.operator.model.OperatorError; import nb.kafka.operator.util.MeterManager; import nb.kafka.operator.util.TopicValidator; import nb.kafka.operator.watch.ConfigMapWatcher; @@ -108,6 +109,20 @@ public void importTopics() { topicImporter.importTopics(); } + public boolean checkOperatorState() { + if (operatorState == State.FAILED) { + return false; + } + try (KafkaAdmin ka = new KafkaAdminImpl(config)) { + ka.listTopics(); + return true; + } catch (TimeoutException | InterruptedException | ExecutionException e) { // NOSONAR + operatorState = State.FAILED; + log.error(String.format(OperatorError.KAFKA_UNREACHABLE.toString(), config.getBootstrapServers())); + return false; + } + } + private void manageTopic(Topic topic) { if (topic == null){ return; diff --git a/src/main/java/nb/kafka/operator/Main.java b/src/main/java/nb/kafka/operator/Main.java index 95373b8..b3ef276 100644 --- a/src/main/java/nb/kafka/operator/Main.java +++ b/src/main/java/nb/kafka/operator/Main.java @@ -27,8 +27,8 @@ public static void main(String[] args) throws IOException { setupJmxRegistry(config.getOperatorId()); Runnable stopHttpServer = setupPrometheusRegistry(config.getMetricsPort()); - HealthServer.start(config); KafkaOperator operator = new KafkaOperator(config); + new HealthServer(operator).start(config.getHealthsPort()); Runtime.getRuntime().addShutdownHook(new Thread(operator::shutdown)); Runtime.getRuntime().addShutdownHook(new Thread(stopHttpServer)); diff --git a/src/test/java/nb/kafka/operator/KafkaOperatorStateTest.java b/src/test/java/nb/kafka/operator/KafkaOperatorStateTest.java index 55c8784..678b011 100644 --- a/src/test/java/nb/kafka/operator/KafkaOperatorStateTest.java +++ b/src/test/java/nb/kafka/operator/KafkaOperatorStateTest.java @@ -1,5 +1,6 @@ package nb.kafka.operator; +import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doThrow; @@ -7,6 +8,7 @@ import static org.mockito.Mockito.when; import java.util.Arrays; +import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -17,10 +19,11 @@ import nb.kafka.operator.watch.ConfigMapWatcher; public class KafkaOperatorStateTest { - private AppConfig config = AppConfig.defaultConfig(); + private AppConfig config; @BeforeEach void setUp() { + config = AppConfig.defaultConfig(); config.setBootstrapServers("localhost:9092"); } @@ -75,4 +78,22 @@ public void failureWatchStateTest() throws Throwable { assertEquals(State.FAILED, operator.getState()); } + + @Test + public void healthCheckFailedStateTest() throws Throwable { + config.setKafkaTimeoutMs(100); + + KubernetesClient kubeClientMock = mock(KubernetesClient.class); + KafkaAdmin kafkaAdminMock = mock(KafkaAdmin.class); + ConfigMapWatcher watcherMock = mock(ConfigMapWatcher.class); + doThrow(ExecutionException.class).when(kafkaAdminMock).listTopics(); + + KafkaOperator operator = new KafkaOperator(config, kubeClientMock, kafkaAdminMock, watcherMock, + MeterManager.defaultMeterManager()); + + assertEquals(State.CREATED, operator.getState()); + + assertFalse(operator.checkOperatorState()); + assertEquals(State.FAILED, operator.getState()); + } } diff --git a/src/test/java/nb/kafka/operator/PartitionedTopicTest.java b/src/test/java/nb/kafka/operator/PartitionedTopicTest.java deleted file mode 100644 index 9504c56..0000000 --- a/src/test/java/nb/kafka/operator/PartitionedTopicTest.java +++ /dev/null @@ -1,5 +0,0 @@ -import static org.junit.Assert.*; - -public class PartitionedTopicTest { - -} \ No newline at end of file