Skip to content

Commit

Permalink
add simple retry strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainJuge committed Feb 1, 2024
1 parent f9cd0ce commit 4147e98
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package co.elastic.apm.agent.jmx;

import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import co.elastic.apm.agent.tracer.configuration.TimeDurationValueConverter;
import org.stagemonitor.configuration.ConfigurationOption;
import org.stagemonitor.configuration.ConfigurationOptionProvider;

import java.util.Collections;
import java.util.List;

import static co.elastic.apm.agent.tracer.configuration.RangeValidator.isNotInRange;

public class JmxConfiguration extends ConfigurationOptionProvider {

private ConfigurationOption<List<JmxMetric>> captureJmxMetrics = ConfigurationOption.<List<JmxMetric>>builder(JmxMetric.TokenValueConverter.INSTANCE, List.class)
Expand Down Expand Up @@ -137,4 +141,16 @@ public class JmxConfiguration extends ConfigurationOptionProvider {
ConfigurationOption<List<JmxMetric>> getCaptureJmxMetrics() {
return captureJmxMetrics;
}

private final ConfigurationOption<TimeDuration> delayFailedRegistrationRetry = TimeDurationValueConverter.durationOption("m")
.key("jmx_failed_retry_interval")
.tags("internal")
.description("If set to a value greater or equal to 1m, the agent will retry failed JMX metric registrations.")
.addValidator(isNotInRange(TimeDuration.of("1ms"), TimeDuration.of("59s")))
.dynamic(true)
.buildWithDefault(TimeDuration.of("0m"));

public ConfigurationOption<TimeDuration> getDelayFailedRegistrationRetry() {
return delayFailedRegistrationRetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import co.elastic.apm.agent.metrics.DoubleSupplier;
import co.elastic.apm.agent.metrics.Labels;
import co.elastic.apm.agent.metrics.MetricRegistry;
import co.elastic.apm.agent.tracer.GlobalLocks;
import co.elastic.apm.agent.sdk.internal.util.ExecutorUtils;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import co.elastic.apm.agent.tracer.GlobalLocks;
import co.elastic.apm.agent.tracer.configuration.TimeDuration;
import org.stagemonitor.configuration.ConfigurationOption;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -54,6 +56,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class JmxMetricTracker extends AbstractLifecycleListener {
Expand All @@ -68,9 +71,17 @@ public class JmxMetricTracker extends AbstractLifecycleListener {
@Nullable
private volatile NotificationListener listener;

private final List<JmxMetric> failedMetrics;

@Nullable
private ScheduledExecutorService retryExecutor;

public JmxMetricTracker(ElasticApmTracer tracer) {
jmxConfiguration = tracer.getConfig(JmxConfiguration.class);
metricRegistry = tracer.getMetricRegistry();

// using a synchronized list so adding to the list does not require synchronization
failedMetrics = Collections.synchronizedList(new ArrayList<JmxMetric>());
}

@Override
Expand Down Expand Up @@ -175,19 +186,43 @@ synchronized void init(final MBeanServer platformMBeanServer) {
jmxConfiguration.getCaptureJmxMetrics().addChangeListener(new ConfigurationOption.ChangeListener<List<JmxMetric>>() {
@Override
public void onChange(ConfigurationOption<?> configurationOption, List<JmxMetric> oldValue, List<JmxMetric> newValue) {
List<JmxMetricRegistration> oldRegistrations = compileJmxMetricRegistrations(oldValue, platformMBeanServer);
List<JmxMetricRegistration> newRegistrations = compileJmxMetricRegistrations(newValue, platformMBeanServer);
List<JmxMetric> registrationErrors = new ArrayList<JmxMetric>(); // those are not needed
List<JmxMetricRegistration> oldRegistrations = compileJmxMetricRegistrations(oldValue, platformMBeanServer, registrationErrors);

List<JmxMetricRegistration> newRegistrations;
synchronized (failedMetrics) {
failedMetrics.clear();
newRegistrations = compileJmxMetricRegistrations(newValue, platformMBeanServer, failedMetrics);
}


for (JmxMetricRegistration addedRegistration : removeAll(oldRegistrations, newRegistrations)) {
addedRegistration.register(platformMBeanServer, metricRegistry);
}
for (JmxMetricRegistration deletedRegistration : removeAll(newRegistrations, oldRegistrations)) {
deletedRegistration.unregister(metricRegistry);
}

}
});
register(jmxConfiguration.getCaptureJmxMetrics().get(), platformMBeanServer);

ConfigurationOption<TimeDuration> failedRetryConfig = jmxConfiguration.getDelayFailedRegistrationRetry();
long retryMillis = failedRetryConfig.getValue().getMillis();
if (!failedRetryConfig.isDefault()) {
retryExecutor = ExecutorUtils.createSingleThreadSchedulingDaemonPool("jmx-retry");
retryExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
List<JmxMetric> failed = JmxMetricTracker.this.failedMetrics;
synchronized (failed) {
List<JmxMetric> toRetry = new ArrayList<>(failed);
failed.clear();
register(toRetry, platformMBeanServer, failed);
}
}
}, retryMillis, retryMillis, TimeUnit.MILLISECONDS);
}

register(jmxConfiguration.getCaptureJmxMetrics().get(), platformMBeanServer, failedMetrics);
}

private void registerMBeanNotificationListener(final MBeanServer server) {
Expand Down Expand Up @@ -217,7 +252,7 @@ private void addMBean(ObjectName mBeanName, JmxMetric jmxMetric) {
ObjectName metricName = jmxMetric.getObjectName();
if (metricName.apply(mBeanName) || matchesJbossStatisticsPool(mBeanName, metricName, server)) {
logger.debug("MBean added at runtime: {}", jmxMetric.getObjectName());
register(Collections.singletonList(jmxMetric), server);
register(Collections.singletonList(jmxMetric), server, failedMetrics);
}
}

Expand Down Expand Up @@ -280,25 +315,33 @@ private static <T> List<T> removeAll(List<T> removeFromThis, List<T> toRemove) {
return result;
}

private void register(List<JmxMetric> jmxMetrics, MBeanServer server) {
for (JmxMetricRegistration registration : compileJmxMetricRegistrations(jmxMetrics, server)) {
private void register(List<JmxMetric> jmxMetrics, MBeanServer server, List<JmxMetric> failedMetrics) {
for (JmxMetricRegistration registration : compileJmxMetricRegistrations(jmxMetrics, server, failedMetrics)) {
registration.register(server, metricRegistry);
}
}

/**
* A single {@link JmxMetric} can yield multiple {@link JmxMetricRegistration}s if the {@link JmxMetric} contains multiple attributes
*
* @param jmxMetrics JMX metrics to register
* @param server MBean server
* @param failedMetrics list of JMX metrics that failed to register (out)
*/
private List<JmxMetricRegistration> compileJmxMetricRegistrations(List<JmxMetric> jmxMetrics, MBeanServer server) {
List<JmxMetricRegistration> registrations = new ArrayList<>();
private List<JmxMetricRegistration> compileJmxMetricRegistrations(List<JmxMetric> jmxMetrics, MBeanServer server, List<JmxMetric> failedMetrics) {
List<JmxMetricRegistration> globalRegistrations = new ArrayList<>();
for (JmxMetric jmxMetric : jmxMetrics) {
List<JmxMetricRegistration> metricRegistrations = new ArrayList<>();
try {
addJmxMetricRegistration(jmxMetric, registrations, server);
addJmxMetricRegistration(jmxMetric, metricRegistrations, server);
globalRegistrations.addAll(metricRegistrations);
} catch (Exception e) {
failedMetrics.add(jmxMetric);
logger.error("Failed to register JMX metric {}", jmxMetric.toString(), e);
}

}
return registrations;
return globalRegistrations;
}

private static void addJmxMetricRegistration(final JmxMetric jmxMetric, List<JmxMetricRegistration> registrations, MBeanServer server) throws JMException {
Expand Down Expand Up @@ -471,5 +514,8 @@ public void stop() throws Exception {
if (logManagerPropertyPoller != null) {
logManagerPropertyPoller.interrupt();
}
if (retryExecutor != null) {
ExecutorUtils.shutdownAndWaitTermination(retryExecutor);
}
}
}

0 comments on commit 4147e98

Please sign in to comment.