From d743405c966a7e4ed234b1cc6f7f767cf41bfc17 Mon Sep 17 00:00:00 2001 From: Adriano Machado <60320+ammachado@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:55:54 -0400 Subject: [PATCH 1/2] Enable replacing the ThreadPoolExecutor implementation for OpenTelemetry thread context propagation --- .../bus/managers/WorkQueueManagerImpl.java | 16 ++--- .../cxf/workqueue/AutomaticWorkQueueImpl.java | 59 +++++++++++++------ 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java b/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java index f66948e8c61..85441d583ab 100644 --- a/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java +++ b/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java @@ -74,7 +74,7 @@ public final void setBus(Bus bus) { imanager = bus.getExtension(InstrumentationManager.class); if (null != imanager) { try { - imanager.register(new WorkQueueManagerImplMBeanWrapper(this)); + imanager.register(createManagedBeanWrapper()); } catch (JMException jmex) { LOG.log(Level.WARNING, jmex.getMessage(), jmex); } @@ -101,6 +101,10 @@ public final void setBus(Bus bus) { } } + protected WorkQueueManagerImplMBeanWrapper createManagedBeanWrapper() { + return new WorkQueueManagerImplMBeanWrapper(this); + } + public synchronized AutomaticWorkQueue getAutomaticWorkQueue() { AutomaticWorkQueue defaultQueue = getNamedWorkQueue(DEFAULT_QUEUE_NAME); if (defaultQueue == null) { @@ -171,15 +175,14 @@ public AutomaticWorkQueue getNamedWorkQueue(String name) { } public final void addNamedWorkQueue(String name, AutomaticWorkQueue q) { namedQueues.put(name, q); - if (q instanceof AutomaticWorkQueueImpl) { - AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q; + if (q instanceof AutomaticWorkQueueImpl impl) { if (impl.isShared()) { synchronized (impl) { if (impl.getShareCount() == 0 && imanager != null && imanager.getMBeanServer() != null) { try { - imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q, this)); + imanager.register(new WorkQueueImplMBeanWrapper(impl, this)); } catch (JMException jmex) { LOG.log(Level.WARNING, jmex.getMessage(), jmex); } @@ -188,7 +191,7 @@ public final void addNamedWorkQueue(String name, AutomaticWorkQueue q) { } } else if (imanager != null) { try { - imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q, this)); + imanager.register(new WorkQueueImplMBeanWrapper(impl, this)); } catch (JMException jmex) { LOG.log(Level.WARNING, jmex.getMessage(), jmex); } @@ -196,13 +199,12 @@ public final void addNamedWorkQueue(String name, AutomaticWorkQueue q) { } } - private AutomaticWorkQueue createAutomaticWorkQueue() { + protected AutomaticWorkQueue createAutomaticWorkQueue() { AutomaticWorkQueue q = new AutomaticWorkQueueImpl(DEFAULT_QUEUE_NAME); addNamedWorkQueue(DEFAULT_QUEUE_NAME, q); return q; } - class WQLifecycleListener implements BusLifeCycleListener { public void initComplete() { diff --git a/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java b/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java index 67dd3b4ddb8..716943c37ff 100644 --- a/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java +++ b/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java @@ -29,6 +29,8 @@ import java.util.Dictionary; import java.util.Hashtable; import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.LinkedBlockingQueue; @@ -78,7 +80,7 @@ public class AutomaticWorkQueueImpl implements AutomaticWorkQueue { boolean shared; int sharedCount; - private List changeListenerList; + private final List changeListenerList; public AutomaticWorkQueueImpl() { this(DEFAULT_MAX_QUEUE_SIZE); @@ -104,6 +106,7 @@ public AutomaticWorkQueueImpl(int mqs, long dequeueTimeout) { this(mqs, initialThreads, highWaterMark, lowWaterMark, dequeueTimeout, "default"); } + public AutomaticWorkQueueImpl(int mqs, int initialThreads, int highWaterMark, @@ -152,24 +155,13 @@ public int getShareCount() { protected synchronized ThreadPoolExecutor getExecutor() { if (executor == null) { threadFactory = createThreadFactory(name); - executor = new ThreadPoolExecutor(lowWaterMark, + executor = createThreadPoolExecutor(lowWaterMark, highWaterMark, TimeUnit.MILLISECONDS.toMillis(dequeueTimeout), TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(maxQueueSize), - threadFactory) { - @Override - protected void terminated() { - ThreadFactory f = executor.getThreadFactory(); - if (f instanceof AWQThreadFactory) { - ((AWQThreadFactory)f).shutdown(); - } - if (watchDog != null) { - watchDog.shutdown(); - } - } - }; - + new LinkedBlockingQueue<>(maxQueueSize), + threadFactory, + watchDog); if (LOG.isLoggable(Level.FINE)) { StringBuilder buf = new StringBuilder(128).append("Constructing automatic work queue with:\n") @@ -262,11 +254,13 @@ static class DelayedTaskWrapper implements Delayed, Runnable { trigger = System.currentTimeMillis() + delay; } + @Override public long getDelay(TimeUnit unit) { long n = trigger - System.currentTimeMillis(); return unit.convert(n, TimeUnit.MILLISECONDS); } + @Override public int compareTo(Delayed delayed) { long other = ((DelayedTaskWrapper)delayed).trigger; int returnValue; @@ -280,6 +274,7 @@ public int compareTo(Delayed delayed) { return returnValue; } + @Override public void run() { work.run(); } @@ -300,6 +295,7 @@ public void shutdown() { interrupt(); } + @Override public void run() { DelayedTaskWrapper task; try { @@ -421,7 +417,7 @@ public void run() { //The ThreadPoolExecutor in the JDK doesn't expand the number //of threads until the queue is full. However, we would //prefer the number of threads to expand immediately and - //only uses the queue if we've reached the maximum number + //only use the queue if we've reached the maximum number //of threads. ThreadPoolExecutor ex = getExecutor(); ex.execute(r); @@ -617,4 +613,33 @@ public Dictionary getProperties() { properties.put("queueSize", nf.format(getLowWaterMark())); return properties; } + + protected ThreadPoolExecutor createThreadPoolExecutor( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + WatchDog watchDog + ) { + return new ThreadPoolExecutor(corePoolSize, + maximumPoolSize, + unit.toMillis(keepAliveTime), + unit, + workQueue, + threadFactory) { + + @Override + protected void terminated() { + ThreadFactory f = this.getThreadFactory(); + if (f instanceof AWQThreadFactory awqThreadFactory) { + awqThreadFactory.shutdown(); + } + if (watchDog != null) { + watchDog.shutdown(); + } + } + }; + } } From a977e689d59b7815b616b4afed281d94721fea61 Mon Sep 17 00:00:00 2001 From: Adriano Machado <60320+ammachado@users.noreply.github.com> Date: Wed, 11 Sep 2024 20:36:02 -0400 Subject: [PATCH 2/2] Add OpenTelemetry support on WorkQueueManager Signed-off-by: Adriano Machado <60320+ammachado@users.noreply.github.com> --- .../bus/managers/WorkQueueManagerImpl.java | 9 +- .../cxf/workqueue/AutomaticWorkQueueImpl.java | 13 ++- .../resources/META-INF/cxf/bus-extensions.txt | 2 +- .../OpenTelemetryAutomaticWorkQueueImpl.java | 37 +++++++++ .../OpenTelemetryWorkQueueManagerImpl.java | 17 ++++ .../CurrentContextThreadPoolExecutor.java | 83 +++++++++++++++++++ .../resources/META-INF/cxf/bus-extensions.txt | 1 + 7 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryAutomaticWorkQueueImpl.java create mode 100644 integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryWorkQueueManagerImpl.java create mode 100644 integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/internal/CurrentContextThreadPoolExecutor.java create mode 100644 integration/tracing/tracing-opentelemetry/src/main/resources/META-INF/cxf/bus-extensions.txt diff --git a/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java b/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java index 85441d583ab..fa9a53b8c54 100644 --- a/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java +++ b/core/src/main/java/org/apache/cxf/bus/managers/WorkQueueManagerImpl.java @@ -74,7 +74,7 @@ public final void setBus(Bus bus) { imanager = bus.getExtension(InstrumentationManager.class); if (null != imanager) { try { - imanager.register(createManagedBeanWrapper()); + imanager.register(new WorkQueueManagerImplMBeanWrapper(this)); } catch (JMException jmex) { LOG.log(Level.WARNING, jmex.getMessage(), jmex); } @@ -101,10 +101,6 @@ public final void setBus(Bus bus) { } } - protected WorkQueueManagerImplMBeanWrapper createManagedBeanWrapper() { - return new WorkQueueManagerImplMBeanWrapper(this); - } - public synchronized AutomaticWorkQueue getAutomaticWorkQueue() { AutomaticWorkQueue defaultQueue = getNamedWorkQueue(DEFAULT_QUEUE_NAME); if (defaultQueue == null) { @@ -116,8 +112,7 @@ public synchronized AutomaticWorkQueue getAutomaticWorkQueue() { public synchronized void shutdown(boolean processRemainingTasks) { inShutdown = true; for (AutomaticWorkQueue q : namedQueues.values()) { - if (q instanceof AutomaticWorkQueueImpl) { - AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q; + if (q instanceof AutomaticWorkQueueImpl impl) { if (impl.isShared()) { synchronized (impl) { impl.removeSharedUser(); diff --git a/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java b/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java index 716943c37ff..0bf0b172396 100644 --- a/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java +++ b/core/src/main/java/org/apache/cxf/workqueue/AutomaticWorkQueueImpl.java @@ -281,7 +281,7 @@ public void run() { } - class WatchDog extends Thread { + protected class WatchDog extends Thread { DelayQueue delayQueue; AtomicBoolean shutdown = new AtomicBoolean(false); @@ -318,7 +318,8 @@ public void run() { } } - class AWQThreadFactory implements ThreadFactory { + + protected class AWQThreadFactory implements ThreadFactory { final AtomicInteger threadNumber = new AtomicInteger(1); ThreadGroup group; String name; @@ -382,6 +383,7 @@ public void setName(String s) { threadFactory.setName(s); } } + public String getName() { return name; } @@ -483,7 +485,6 @@ public void shutdown(boolean processRemainingWorkItems) { } } - /** * Gets the maximum size (capacity) of the backing queue. * @return the maximum size (capacity) of the backing queue. @@ -500,7 +501,6 @@ public long getSize() { return executor == null ? 0 : executor.getQueue().size(); } - public boolean isEmpty() { return executor == null || executor.getQueue().isEmpty(); } @@ -562,24 +562,28 @@ public boolean isShutdown() { } return executor.isShutdown(); } + public int getLargestPoolSize() { if (executor == null) { return 0; } return executor.getLargestPoolSize(); } + public int getPoolSize() { if (executor == null) { return 0; } return executor.getPoolSize(); } + public int getActiveCount() { if (executor == null) { return 0; } return executor.getActiveCount(); } + public void update(Dictionary config) { String s = config.get("highWaterMark"); if (s != null) { @@ -602,6 +606,7 @@ public void update(Dictionary config) { this.maxQueueSize = Integer.parseInt(s); } } + public Dictionary getProperties() { Dictionary properties = new Hashtable<>(); NumberFormat nf = NumberFormat.getIntegerInstance(); diff --git a/core/src/main/resources/META-INF/cxf/bus-extensions.txt b/core/src/main/resources/META-INF/cxf/bus-extensions.txt index 3677c28b5a6..71874261045 100644 --- a/core/src/main/resources/META-INF/cxf/bus-extensions.txt +++ b/core/src/main/resources/META-INF/cxf/bus-extensions.txt @@ -1,6 +1,6 @@ org.apache.cxf.bus.managers.PhaseManagerImpl:org.apache.cxf.phase.PhaseManager:true org.apache.cxf.bus.managers.WorkQueueManagerImpl:org.apache.cxf.workqueue.WorkQueueManager:true -org.apache.cxf.bus.managers.CXFBusLifeCycleManager:org.apache.cxf.buslifecycle.BusLifeCycleManager:true +org.apache.cxf.bus.managers.CXFBusLifeCycleManager:org.apache.cxf.buslifecycle.BusLifeCycleManager:true org.apache.cxf.bus.managers.ServerRegistryImpl:org.apache.cxf.endpoint.ServerRegistry:true org.apache.cxf.bus.managers.EndpointResolverRegistryImpl:org.apache.cxf.endpoint.EndpointResolverRegistry:true org.apache.cxf.bus.managers.HeaderManagerImpl:org.apache.cxf.headers.HeaderManager:true diff --git a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryAutomaticWorkQueueImpl.java b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryAutomaticWorkQueueImpl.java new file mode 100644 index 00000000000..e3a2bb6576a --- /dev/null +++ b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryAutomaticWorkQueueImpl.java @@ -0,0 +1,37 @@ +package org.apache.cxf.tracing.opentelemetry; + +import org.apache.cxf.tracing.opentelemetry.internal.CurrentContextThreadPoolExecutor; +import org.apache.cxf.workqueue.AutomaticWorkQueueImpl; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class OpenTelemetryAutomaticWorkQueueImpl extends AutomaticWorkQueueImpl { + + @Override + protected ThreadPoolExecutor createThreadPoolExecutor( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + AutomaticWorkQueueImpl.WatchDog watchDog + ) { + return new CurrentContextThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory) { + + @Override + protected void terminated() { + ThreadFactory f = this.getThreadFactory(); + if (f instanceof AWQThreadFactory awqThreadFactory) { + awqThreadFactory.shutdown(); + } + if (watchDog != null) { + watchDog.shutdown(); + } + } + }; + } +} diff --git a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryWorkQueueManagerImpl.java b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryWorkQueueManagerImpl.java new file mode 100644 index 00000000000..58786fb6323 --- /dev/null +++ b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/OpenTelemetryWorkQueueManagerImpl.java @@ -0,0 +1,17 @@ +package org.apache.cxf.tracing.opentelemetry; + +import org.apache.cxf.Bus; +import org.apache.cxf.bus.managers.WorkQueueManagerImpl; +import org.apache.cxf.workqueue.AutomaticWorkQueue; + +public class OpenTelemetryWorkQueueManagerImpl extends WorkQueueManagerImpl { + + public OpenTelemetryWorkQueueManagerImpl(Bus bus) { + super(bus); + } + + @Override + public synchronized AutomaticWorkQueue getAutomaticWorkQueue() { + return new OpenTelemetryAutomaticWorkQueueImpl(); + } +} diff --git a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/internal/CurrentContextThreadPoolExecutor.java b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/internal/CurrentContextThreadPoolExecutor.java new file mode 100644 index 00000000000..b30461f037d --- /dev/null +++ b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/internal/CurrentContextThreadPoolExecutor.java @@ -0,0 +1,83 @@ +package org.apache.cxf.tracing.opentelemetry.internal; + +import io.opentelemetry.context.Context; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class CurrentContextThreadPoolExecutor extends ThreadPoolExecutor { + + public CurrentContextThreadPoolExecutor( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory + ) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return super.newTaskFor(Context.current().wrap(runnable), value); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + return super.newTaskFor(Context.current().wrap(callable)); + } + + @Override + public Future submit(Runnable task) { + return super.submit(Context.current().wrap(task)); + } + + @Override + public Future submit(Runnable task, T result) { + return super.submit(Context.current().wrap(task), result); + } + + @Override + public Future submit(Callable task) { + return super.submit(Context.current().wrap(task)); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return super.invokeAny(wrap(tasks)); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return super.invokeAny(wrap(tasks), timeout, unit); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return super.invokeAll(wrap(tasks)); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return super.invokeAll(wrap(tasks), timeout, unit); + } + + @Override + public void execute(Runnable command) { + super.execute(Context.current().wrap(command)); + } + + protected static Collection> wrap(Collection> tasks) { + return tasks.stream().map(task -> Context.current().wrap(task)).toList(); + } +} diff --git a/integration/tracing/tracing-opentelemetry/src/main/resources/META-INF/cxf/bus-extensions.txt b/integration/tracing/tracing-opentelemetry/src/main/resources/META-INF/cxf/bus-extensions.txt new file mode 100644 index 00000000000..460aede153b --- /dev/null +++ b/integration/tracing/tracing-opentelemetry/src/main/resources/META-INF/cxf/bus-extensions.txt @@ -0,0 +1 @@ +org.apache.cxf.tracing.opentelemetry.OpenTelemetryWorkQueueManagerImpl:org.apache.cxf.workqueue.WorkQueueManager:true