diff --git a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java index e9ccd2abe..9fe573fbb 100644 --- a/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java +++ b/src/main/java/io/cryostat/discovery/KubeApiDiscovery.java @@ -69,7 +69,7 @@ import org.jboss.logging.Logger; @ApplicationScoped -public class KubeApiDiscovery { +public class KubeApiDiscovery implements ResourceEventHandler { public static final String REALM = "KubernetesApi"; public static final String DISCOVERY_NAMESPACE_LABEL_KEY = "discovery.cryostat.io/namespace"; @@ -98,6 +98,8 @@ public class KubeApiDiscovery { @ConfigProperty(name = "cryostat.discovery.kubernetes.resync-period") Duration informerResyncPeriod; + private final Object txLock = new Object(); + private final LazyInitializer>> nsInformers = new LazyInitializer>>() { @Override @@ -116,12 +118,12 @@ protected HashMap> initialize() client.endpoints() .inNamespace(ns) .inform( - new EndpointsHandler(), + KubeApiDiscovery.this, informerResyncPeriod.toMillis())); logger.debugv( - "Started Endpoints SharedInformer for" - + " namespace \"{0}\"", - ns); + "Started Endpoints SharedInformer for namespace" + + " \"{0}\" with resync period {1}", + ns, informerResyncPeriod); }); return result; } @@ -178,6 +180,43 @@ boolean available() { return false; } + @Override + public void onAdd(Endpoints endpoints) { + synchronized (txLock) { + logger.debugv( + "Endpoint {0} created in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + } + } + + @Override + public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { + synchronized (txLock) { + logger.debugv( + "Endpoint {0} modified in namespace {1}", + newEndpoints.getMetadata().getName(), + newEndpoints.getMetadata().getNamespace()); + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); + } + } + + @Override + public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { + synchronized (txLock) { + logger.debugv( + "Endpoint {0} deleted in namespace {1}", + endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); + if (deletedFinalStateUnknown) { + logger.warnv("Deleted final state unknown: {0}", endpoints); + } + QuarkusTransaction.joiningExisting() + .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); + } + } + private boolean isCompatiblePort(EndpointPort port) { return jmxPortNames.orElse(EMPTY_PORT_NAMES).contains(port.getName()) || jmxPortNumbers.orElse(EMPTY_PORT_NUMBERS).contains(port.getPort()); @@ -531,40 +570,6 @@ boolean kubeApiAvailable() { } } - private final class EndpointsHandler implements ResourceEventHandler { - @Override - public void onAdd(Endpoints endpoints) { - logger.debugv( - "Endpoint {0} created in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); - } - - @Override - public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) { - logger.debugv( - "Endpoint {0} modified in namespace {1}", - newEndpoints.getMetadata().getName(), - newEndpoints.getMetadata().getNamespace()); - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(newEndpoints.getMetadata().getNamespace())); - } - - @Override - public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) { - logger.debugv( - "Endpoint {0} deleted in namespace {1}", - endpoints.getMetadata().getName(), endpoints.getMetadata().getNamespace()); - if (deletedFinalStateUnknown) { - logger.warnv("Deleted final state unknown: {0}", endpoints); - return; - } - QuarkusTransaction.joiningExisting() - .run(() -> handleObservedEndpoints(endpoints.getMetadata().getNamespace())); - } - } - private static record EndpointDiscoveryEvent( String namespace, Target target, ObjectReference objRef, EventKind eventKind) { static EndpointDiscoveryEvent from(