Skip to content

Commit

Permalink
fix(discovery): k8s discovery synchronization for stale lost targets
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Oct 9, 2024
1 parent 2d4631b commit 7cd8f49
Showing 1 changed file with 44 additions and 39 deletions.
83 changes: 44 additions & 39 deletions src/main/java/io/cryostat/discovery/KubeApiDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import org.jboss.logging.Logger;

@ApplicationScoped
public class KubeApiDiscovery {
public class KubeApiDiscovery implements ResourceEventHandler<Endpoints> {
public static final String REALM = "KubernetesApi";

public static final String DISCOVERY_NAMESPACE_LABEL_KEY = "discovery.cryostat.io/namespace";
Expand Down Expand Up @@ -98,6 +98,8 @@ public class KubeApiDiscovery {
@ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period")
Duration informerResyncPeriod;

private final Object txLock = new Object();

private final LazyInitializer<HashMap<String, SharedIndexInformer<Endpoints>>> nsInformers =
new LazyInitializer<HashMap<String, SharedIndexInformer<Endpoints>>>() {
@Override
Expand All @@ -116,12 +118,12 @@ protected HashMap<String, SharedIndexInformer<Endpoints>> initialize()
client.endpoints()
.inNamespace(ns)
.inform(
new EndpointsHandler(),
KubeApiDiscovery.this,
informerResyncPeriod.toMillis()));
logger.debugv(
"Started Endpoints SharedInformer for"
+ " namespace \"{0}\"",
ns);
"Started Endpoints SharedInformer for namespace"
+ " \"{0}\" with resync period {1}",
ns, informerResyncPeriod);
});
return result;
}
Expand Down Expand Up @@ -178,6 +180,43 @@ boolean available() {
return false;
}

@Override
public void onAdd(Endpoints endpoints) {
synchronized (txLock) {
logger.debugv(
"Endpoint {0} created in namespace {1}",
endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace());
QuarkusTransaction.joiningExisting()
.run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace()));
}
}

@Override
public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
synchronized (txLock) {
logger.debugv(
"Endpoint {0} modified in namespace {1}",
newEndpoints.getMetadata().getName(),
newEndpoints.getMetadata().getNamespace());
QuarkusTransaction.joiningExisting()
.run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace()));
}
}

@Override
public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
synchronized (txLock) {
logger.debugv(
"Endpoint {0} deleted in namespace {1}",
endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace());
if (deletedFinalStateUnknown) {
logger.warnv("Deleted final state unknown: {0}", endpoints);
}
QuarkusTransaction.joiningExisting()
.run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace()));
}
}

private boolean isCompatiblePort(EndpointPort port) {
return jmxPortNames.orElse(EMPTY_PORT_NAMES).contains(port.getName())
|| jmxPortNumbers.orElse(EMPTY_PORT_NUMBERS).contains(port.getPort());
Expand Down Expand Up @@ -531,40 +570,6 @@ boolean kubeApiAvailable() {
}
}

private final class EndpointsHandler implements ResourceEventHandler<Endpoints> {
@Override
public void onAdd(Endpoints endpoints) {
logger.debugv(
"Endpoint {0} created in namespace {1}",
endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace());
QuarkusTransaction.joiningExisting()
.run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace()));
}

@Override
public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
logger.debugv(
"Endpoint {0} modified in namespace {1}",
newEndpoints.getMetadata().getName(),
newEndpoints.getMetadata().getNamespace());
QuarkusTransaction.joiningExisting()
.run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace()));
}

@Override
public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
logger.debugv(
"Endpoint {0} deleted in namespace {1}",
endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace());
if (deletedFinalStateUnknown) {
logger.warnv("Deleted final state unknown: {0}", endpoints);
return;
}
QuarkusTransaction.joiningExisting()
.run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace()));
}
}

private static record EndpointDiscoveryEvent(
String namespace, Target target, ObjectReference objRef, EventKind eventKind) {
static EndpointDiscoveryEvent from(
Expand Down

0 comments on commit 7cd8f49

Please sign in to comment.