Skip to content

Commit

Permalink
Introduces a liveness check as fix/workaround for #29
Browse files Browse the repository at this point in the history
  • Loading branch information
solsson committed Oct 3, 2019
1 parent e28c90f commit c0e5a33
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package se.yolean.kafka.keyvalue;

import javax.inject.Inject;
import javax.inject.Singleton;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Liveness;

/**
* Instead of catching and analyzing org.apache.kafka.common.errors.TimeoutException
* we want to trigger non-liveness if we've never seen proof of a working kafka connection.
* BUT we don't want to go non-live in case kafka fails to respond,
* because such termination could lead to cascading failure.
*/
@Liveness
@Singleton
public class KafkaClientOnceLiveness implements HealthCheck {

@Inject
ConsumerAtLeastOnce consumer;

HealthCheckResponse ok = HealthCheckResponse.builder().name("Had a Kafka connection").up().build();
boolean assigningSuccessWasSeen = false;

@Override
public HealthCheckResponse call() {
if (consumer != null && consumer.stage != null) {
if (consumer.stage.metricValue > ConsumerAtLeastOnce.Stage.Assigning.metricValue) {
assigningSuccessWasSeen = true;
}
if (!assigningSuccessWasSeen && consumer.stage.equals(ConsumerAtLeastOnce.Stage.Assigning)) {
return HealthCheckResponse.builder().name("Had a Kafka connection").down().build();
}
}
return ok;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package se.yolean.kafka.keyvalue;

import static org.junit.jupiter.api.Assertions.*;

import org.eclipse.microprofile.health.HealthCheckResponse.State;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class KafkaClientOnceLivenessTest {

/**
* This is behavior we no longer want if we instead aim to recover from kafka client connection failures, or terminate programmatically
*/
@Test
void testCallAtAssigned() {
KafkaClientOnceLiveness liveness = new KafkaClientOnceLiveness();
liveness.consumer = Mockito.mock(ConsumerAtLeastOnce.class);

assertEquals(true, liveness.call().getState().equals(State.UP),
"Should report live until the opposite is proven");
liveness.consumer.stage = ConsumerAtLeastOnce.Stage.Assigning;
assertEquals(true, liveness.call().getState().equals(State.DOWN),
"Might be ok to trigger non-liveness on the hopefully brief assigning phase");
liveness.consumer.stage = ConsumerAtLeastOnce.Stage.Resetting;
assertEquals(true, liveness.call().getState().equals(State.UP),
"As soon as we're out of Assigning we should be live");
liveness.consumer.stage = ConsumerAtLeastOnce.Stage.Assigning;
assertEquals(true, liveness.call().getState().equals(State.UP),
"From now on we should always be up");
}

}

0 comments on commit c0e5a33

Please sign in to comment.