diff --git a/src/main/java/se/yolean/kafka/keyvalue/KafkaClientOnceLiveness.java b/src/main/java/se/yolean/kafka/keyvalue/KafkaClientOnceLiveness.java new file mode 100644 index 0000000..d17b4c0 --- /dev/null +++ b/src/main/java/se/yolean/kafka/keyvalue/KafkaClientOnceLiveness.java @@ -0,0 +1,39 @@ +package se.yolean.kafka.keyvalue; + +import javax.inject.Inject; +import javax.inject.Singleton; + +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.Liveness; + +/** + * Instead of catching and analyzing org.apache.kafka.common.errors.TimeoutException + * we want to trigger non-liveness if we've never seen proof of a working kafka connection. + * BUT we don't want to go non-live in case kafka fails to respond, + * because such termination could lead to cascading failure. + */ +@Liveness +@Singleton +public class KafkaClientOnceLiveness implements HealthCheck { + + @Inject + ConsumerAtLeastOnce consumer; + + HealthCheckResponse ok = HealthCheckResponse.builder().name("Had a Kafka connection").up().build(); + boolean assigningSuccessWasSeen = false; + + @Override + public HealthCheckResponse call() { + if (consumer != null && consumer.stage != null) { + if (consumer.stage.metricValue > ConsumerAtLeastOnce.Stage.Assigning.metricValue) { + assigningSuccessWasSeen = true; + } + if (!assigningSuccessWasSeen && consumer.stage.equals(ConsumerAtLeastOnce.Stage.Assigning)) { + return HealthCheckResponse.builder().name("Had a Kafka connection").down().build(); + } + } + return ok; + } + +} diff --git a/src/test/java/se/yolean/kafka/keyvalue/KafkaClientOnceLivenessTest.java b/src/test/java/se/yolean/kafka/keyvalue/KafkaClientOnceLivenessTest.java new file mode 100644 index 0000000..83e7ab6 --- /dev/null +++ b/src/test/java/se/yolean/kafka/keyvalue/KafkaClientOnceLivenessTest.java @@ -0,0 +1,32 @@ +package se.yolean.kafka.keyvalue; + +import static org.junit.jupiter.api.Assertions.*; + +import org.eclipse.microprofile.health.HealthCheckResponse.State; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class KafkaClientOnceLivenessTest { + + /** + * This is behavior we no longer want if we instead aim to recover from kafka client connection failures, or terminate programmatically + */ + @Test + void testCallAtAssigned() { + KafkaClientOnceLiveness liveness = new KafkaClientOnceLiveness(); + liveness.consumer = Mockito.mock(ConsumerAtLeastOnce.class); + + assertEquals(true, liveness.call().getState().equals(State.UP), + "Should report live until the opposite is proven"); + liveness.consumer.stage = ConsumerAtLeastOnce.Stage.Assigning; + assertEquals(true, liveness.call().getState().equals(State.DOWN), + "Might be ok to trigger non-liveness on the hopefully brief assigning phase"); + liveness.consumer.stage = ConsumerAtLeastOnce.Stage.Resetting; + assertEquals(true, liveness.call().getState().equals(State.UP), + "As soon as we're out of Assigning we should be live"); + liveness.consumer.stage = ConsumerAtLeastOnce.Stage.Assigning; + assertEquals(true, liveness.call().getState().equals(State.UP), + "From now on we should always be up"); + } + +}