Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fill jvm.thread.state attribute for jvm.thread.count metric on jdk8 #12724

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -55,11 +57,34 @@ public final class Threads {

/** Register observers for java runtime class metrics. */
public static List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry) {
return INSTANCE.registerObservers(openTelemetry, ManagementFactory.getThreadMXBean());
return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer());
}

private List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, boolean useThread) {
if (useThread) {
return registerObservers(openTelemetry, Threads::getThreads);
}
return registerObservers(openTelemetry, ManagementFactory.getThreadMXBean());
}

// Visible for testing
List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) {
return registerObservers(
openTelemetry,
isJava9OrNewer() ? Threads::java9AndNewerCallback : Threads::java8Callback,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the java8Callback is only used in tests

threadBean);
}

// Visible for testing
List<AutoCloseable> registerObservers(
OpenTelemetry openTelemetry, Supplier<Thread[]> threadSupplier) {
return registerObservers(openTelemetry, Threads::java8ThreadCallback, threadSupplier);
}

private static <T> List<AutoCloseable> registerObservers(
OpenTelemetry openTelemetry,
Function<T, Consumer<ObservableLongMeasurement>> callbackProvider,
T threadInfo) {
Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry);
List<AutoCloseable> observables = new ArrayList<>();

Expand All @@ -68,8 +93,7 @@ List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean
.upDownCounterBuilder("jvm.thread.count")
.setDescription("Number of executing platform threads.")
.setUnit("{thread}")
.buildWithCallback(
isJava9OrNewer() ? java9AndNewerCallback(threadBean) : java8Callback(threadBean)));
.buildWithCallback(callbackProvider.apply(threadInfo)));

return observables;
}
Expand Down Expand Up @@ -104,6 +128,36 @@ private static Consumer<ObservableLongMeasurement> java8Callback(ThreadMXBean th
};
}

private static Consumer<ObservableLongMeasurement> java8ThreadCallback(
Supplier<Thread[]> supplier) {
return measurement -> {
Map<Attributes, Long> counts = new HashMap<>();
for (Thread thread : supplier.get()) {
Attributes threadAttributes = threadAttributes(thread);
counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1);
}
counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes));
};
}

// Visible for testing
static Thread[] getThreads() {
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially I planned to use reflection to call Thread.getThreads(), but that method is not available on openj9

while (threadGroup.getParent() != null) {
threadGroup = threadGroup.getParent();
}
// use a slightly larger array in case new threads are created
int count = threadGroup.activeCount() + 10;
Thread[] threads = new Thread[count];
int resultSize = threadGroup.enumerate(threads);
if (resultSize == threads.length) {
return threads;
}
Thread[] result = new Thread[resultSize];
System.arraycopy(threads, 0, result, 0, resultSize);
return result;
}

private static Consumer<ObservableLongMeasurement> java9AndNewerCallback(
ThreadMXBean threadBean) {
return measurement -> {
Expand Down Expand Up @@ -132,5 +186,12 @@ private static Attributes threadAttributes(ThreadInfo threadInfo) {
JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState);
}

private static Attributes threadAttributes(Thread thread) {
boolean isDaemon = thread.isDaemon();
String threadState = thread.getState().name().toLowerCase(Locale.ROOT);
return Attributes.of(
JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState);
}

private Threads() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.EnabledOnJre;
Expand All @@ -41,7 +44,7 @@ class ThreadsStableSemconvTest {

@Test
@EnabledOnJre(JRE.JAVA_8)
void registerObservers_Java8() {
void registerObservers_Java8Jmx() {
when(threadBean.getThreadCount()).thenReturn(7);
when(threadBean.getDaemonThreadCount()).thenReturn(2);

Expand Down Expand Up @@ -75,6 +78,45 @@ void registerObservers_Java8() {
equalTo(JVM_THREAD_DAEMON, false))))));
}

@Test
void registerObservers_Java8Thread() {
Thread threadInfo1 = mock(Thread.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE));
Thread threadInfo2 = mock(Thread.class, new ThreadInfoAnswer(true, Thread.State.WAITING));

Thread[] threads = new Thread[] {threadInfo1, threadInfo2};

Threads.INSTANCE
.registerObservers(testing.getOpenTelemetry(), () -> threads)
.forEach(cleanup::deferCleanup);

testing.waitAndAssertMetrics(
"io.opentelemetry.runtime-telemetry-java8",
"jvm.thread.count",
metrics ->
metrics.anySatisfy(
metricData ->
assertThat(metricData)
.hasInstrumentationScope(EXPECTED_SCOPE)
.hasDescription("Number of executing platform threads.")
.hasUnit("{thread}")
.hasLongSumSatisfying(
sum ->
sum.isNotMonotonic()
.hasPointsSatisfying(
point ->
point
.hasValue(1)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, false),
equalTo(JVM_THREAD_STATE, "runnable")),
point ->
point
.hasValue(1)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, true),
equalTo(JVM_THREAD_STATE, "waiting"))))));
}

@Test
@EnabledForJreRange(min = JRE.JAVA_9)
void registerObservers_Java9AndNewer() {
Expand Down Expand Up @@ -120,6 +162,13 @@ void registerObservers_Java9AndNewer() {
equalTo(JVM_THREAD_STATE, "waiting"))))));
}

@Test
void getThreads() {
Thread[] threads = Threads.getThreads();
Set<Thread> set = new HashSet<>(Arrays.asList(threads));
assertThat(set).contains(Thread.currentThread());
}

static final class ThreadInfoAnswer implements Answer<Object> {

private final boolean isDaemon;
Expand All @@ -135,7 +184,7 @@ public Object answer(InvocationOnMock invocation) {
String methodName = invocation.getMethod().getName();
if (methodName.equals("isDaemon")) {
return isDaemon;
} else if (methodName.equals("getThreadState")) {
} else if (methodName.equals("getThreadState") || methodName.equals("getState")) {
return state;
}
return null;
Expand Down
Loading