From 23c5c12ba889653859372152e7e1bf4141602d66 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 13 Nov 2024 15:37:34 +0200 Subject: [PATCH 1/3] Fill jvm.thread.state attribute for jvm.thread.count metric on jdk8 --- .../runtimemetrics/java8/Threads.java | 70 ++++++++++++++++++- .../java8/ThreadsStableSemconvTest.java | 46 ++++++++++-- 2 files changed, 109 insertions(+), 7 deletions(-) diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java index 238c15e37a74..cd0672048cd2 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java @@ -19,12 +19,15 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.Nullable; /** @@ -55,11 +58,34 @@ public final class Threads { /** Register observers for java runtime class metrics. */ public static List registerObservers(OpenTelemetry openTelemetry) { - return INSTANCE.registerObservers(openTelemetry, ManagementFactory.getThreadMXBean()); + return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer() && GET_THREADS != null); + } + + private List registerObservers(OpenTelemetry openTelemetry, boolean useThread) { + if (useThread) { + return registerObservers(openTelemetry, Threads::getThreads); + } + return registerObservers(openTelemetry, ManagementFactory.getThreadMXBean()); } // Visible for testing List registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) { + return registerObservers( + openTelemetry, + isJava9OrNewer() ? Threads::java9AndNewerCallback : Threads::java8Callback, + threadBean); + } + + // Visible for testing + List registerObservers( + OpenTelemetry openTelemetry, Supplier threadSupplier) { + return registerObservers(openTelemetry, Threads::java8ThreadCallback, threadSupplier); + } + + private static List registerObservers( + OpenTelemetry openTelemetry, + Function> callbackProvider, + T threadInfo) { Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry); List observables = new ArrayList<>(); @@ -68,13 +94,13 @@ List registerObservers(OpenTelemetry openTelemetry, ThreadMXBean .upDownCounterBuilder("jvm.thread.count") .setDescription("Number of executing platform threads.") .setUnit("{thread}") - .buildWithCallback( - isJava9OrNewer() ? java9AndNewerCallback(threadBean) : java8Callback(threadBean))); + .buildWithCallback(callbackProvider.apply(threadInfo))); return observables; } @Nullable private static final MethodHandle THREAD_INFO_IS_DAEMON; + @Nullable private static final Method GET_THREADS; static { MethodHandle isDaemon; @@ -86,6 +112,17 @@ List registerObservers(OpenTelemetry openTelemetry, ThreadMXBean isDaemon = null; } THREAD_INFO_IS_DAEMON = isDaemon; + Method getThreads = null; + // only on jdk8 + if (THREAD_INFO_IS_DAEMON == null) { + try { + getThreads = Thread.class.getDeclaredMethod("getThreads"); + getThreads.setAccessible(true); + } catch (Exception e) { + getThreads = null; + } + } + GET_THREADS = getThreads; } private static boolean isJava9OrNewer() { @@ -104,6 +141,26 @@ private static Consumer java8Callback(ThreadMXBean th }; } + private static Consumer java8ThreadCallback( + Supplier supplier) { + return measurement -> { + Map counts = new HashMap<>(); + for (Thread thread : supplier.get()) { + Attributes threadAttributes = threadAttributes(thread); + counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1); + } + counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes)); + }; + } + + private static Thread[] getThreads() { + try { + return requireNonNull((Thread[]) GET_THREADS.invoke(null)); + } catch (Exception e) { + throw new IllegalStateException("Unexpected error happened during Thread#getThreads()", e); + } + } + private static Consumer java9AndNewerCallback( ThreadMXBean threadBean) { return measurement -> { @@ -132,5 +189,12 @@ private static Attributes threadAttributes(ThreadInfo threadInfo) { JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState); } + private static Attributes threadAttributes(Thread thread) { + boolean isDaemon = thread.isDaemon(); + String threadState = thread.getState().name().toLowerCase(Locale.ROOT); + return Attributes.of( + JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState); + } + private Threads() {} } diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java index 55fb6b35e802..d4e9f7d9ff83 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java @@ -20,7 +20,6 @@ import java.lang.management.ThreadMXBean; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; -import org.junit.jupiter.api.condition.EnabledOnJre; import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; @@ -40,8 +39,7 @@ class ThreadsStableSemconvTest { @Mock private ThreadMXBean threadBean; @Test - @EnabledOnJre(JRE.JAVA_8) - void registerObservers_Java8() { + void registerObservers_Java8Jmx() { when(threadBean.getThreadCount()).thenReturn(7); when(threadBean.getDaemonThreadCount()).thenReturn(2); @@ -75,6 +73,46 @@ void registerObservers_Java8() { equalTo(JVM_THREAD_DAEMON, false)))))); } + @Test + @EnabledForJreRange(min = JRE.JAVA_9) + void registerObservers_Java8Thread() { + Thread threadInfo1 = mock(Thread.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE)); + Thread threadInfo2 = mock(Thread.class, new ThreadInfoAnswer(true, Thread.State.WAITING)); + + Thread[] threads = new Thread[] {threadInfo1, threadInfo2}; + + Threads.INSTANCE + .registerObservers(testing.getOpenTelemetry(), () -> threads) + .forEach(cleanup::deferCleanup); + + testing.waitAndAssertMetrics( + "io.opentelemetry.runtime-telemetry-java8", + "jvm.thread.count", + metrics -> + metrics.anySatisfy( + metricData -> + assertThat(metricData) + .hasInstrumentationScope(EXPECTED_SCOPE) + .hasDescription("Number of executing platform threads.") + .hasUnit("{thread}") + .hasLongSumSatisfying( + sum -> + sum.isNotMonotonic() + .hasPointsSatisfying( + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, false), + equalTo(JVM_THREAD_STATE, "runnable")), + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, true), + equalTo(JVM_THREAD_STATE, "waiting")))))); + } + @Test @EnabledForJreRange(min = JRE.JAVA_9) void registerObservers_Java9AndNewer() { @@ -135,7 +173,7 @@ public Object answer(InvocationOnMock invocation) { String methodName = invocation.getMethod().getName(); if (methodName.equals("isDaemon")) { return isDaemon; - } else if (methodName.equals("getThreadState")) { + } else if (methodName.equals("getThreadState") || methodName.equals("getState")) { return state; } return null; From 9983930f468f16dc49f4a68f4eecfcec74881942 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 13 Nov 2024 16:00:17 +0200 Subject: [PATCH 2/3] remove reflection --- .../runtimemetrics/java8/Threads.java | 35 +++++++++---------- .../java8/ThreadsStableSemconvTest.java | 10 ++++++ 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java index cd0672048cd2..d3af5bf00ea9 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java @@ -19,7 +19,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -58,7 +57,7 @@ public final class Threads { /** Register observers for java runtime class metrics. */ public static List registerObservers(OpenTelemetry openTelemetry) { - return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer() && GET_THREADS != null); + return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer()); } private List registerObservers(OpenTelemetry openTelemetry, boolean useThread) { @@ -100,7 +99,6 @@ private static List registerObservers( } @Nullable private static final MethodHandle THREAD_INFO_IS_DAEMON; - @Nullable private static final Method GET_THREADS; static { MethodHandle isDaemon; @@ -112,17 +110,6 @@ private static List registerObservers( isDaemon = null; } THREAD_INFO_IS_DAEMON = isDaemon; - Method getThreads = null; - // only on jdk8 - if (THREAD_INFO_IS_DAEMON == null) { - try { - getThreads = Thread.class.getDeclaredMethod("getThreads"); - getThreads.setAccessible(true); - } catch (Exception e) { - getThreads = null; - } - } - GET_THREADS = getThreads; } private static boolean isJava9OrNewer() { @@ -153,12 +140,22 @@ private static Consumer java8ThreadCallback( }; } - private static Thread[] getThreads() { - try { - return requireNonNull((Thread[]) GET_THREADS.invoke(null)); - } catch (Exception e) { - throw new IllegalStateException("Unexpected error happened during Thread#getThreads()", e); + // Visible for testing + static Thread[] getThreads() { + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + // use a slightly larger array in case new threads are created + int count = threadGroup.activeCount() + 10; + Thread[] threads = new Thread[count]; + int resultSize = threadGroup.enumerate(threads); + if (resultSize == threads.length) { + return threads; } + Thread[] result = new Thread[resultSize]; + System.arraycopy(threads, 0, result, 0, resultSize); + return result; } private static Consumer java9AndNewerCallback( diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java index d4e9f7d9ff83..3a909a17c824 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java @@ -18,6 +18,9 @@ import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.JRE; @@ -158,6 +161,13 @@ void registerObservers_Java9AndNewer() { equalTo(JVM_THREAD_STATE, "waiting")))))); } + @Test + void getThreads() { + Thread[] threads = Threads.getThreads(); + Set set = new HashSet<>(Arrays.asList(threads)); + assertThat(set).contains(Thread.currentThread()); + } + static final class ThreadInfoAnswer implements Answer { private final boolean isDaemon; From 6d81538468e7ffbf92214e3b872333571308e85a Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 13 Nov 2024 16:09:41 +0200 Subject: [PATCH 3/3] fix tests --- .../runtimemetrics/java8/ThreadsStableSemconvTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java index 3a909a17c824..7d9eb8aa994e 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/test/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java @@ -23,6 +23,7 @@ import java.util.Set; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.EnabledOnJre; import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; @@ -42,6 +43,7 @@ class ThreadsStableSemconvTest { @Mock private ThreadMXBean threadBean; @Test + @EnabledOnJre(JRE.JAVA_8) void registerObservers_Java8Jmx() { when(threadBean.getThreadCount()).thenReturn(7); when(threadBean.getDaemonThreadCount()).thenReturn(2); @@ -77,7 +79,6 @@ void registerObservers_Java8Jmx() { } @Test - @EnabledForJreRange(min = JRE.JAVA_9) void registerObservers_Java8Thread() { Thread threadInfo1 = mock(Thread.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE)); Thread threadInfo2 = mock(Thread.class, new ThreadInfoAnswer(true, Thread.State.WAITING));