Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable replacing the ThreadPoolExecutor implementation #2063

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
public synchronized void shutdown(boolean processRemainingTasks) {
inShutdown = true;
for (AutomaticWorkQueue q : namedQueues.values()) {
if (q instanceof AutomaticWorkQueueImpl) {
AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q;
if (q instanceof AutomaticWorkQueueImpl impl) {
if (impl.isShared()) {
synchronized (impl) {
impl.removeSharedUser();
Expand Down Expand Up @@ -171,15 +170,14 @@ public AutomaticWorkQueue getNamedWorkQueue(String name) {
}
public final void addNamedWorkQueue(String name, AutomaticWorkQueue q) {
namedQueues.put(name, q);
if (q instanceof AutomaticWorkQueueImpl) {
AutomaticWorkQueueImpl impl = (AutomaticWorkQueueImpl)q;
if (q instanceof AutomaticWorkQueueImpl impl) {
if (impl.isShared()) {
synchronized (impl) {
if (impl.getShareCount() == 0
&& imanager != null
&& imanager.getMBeanServer() != null) {
try {
imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q, this));
imanager.register(new WorkQueueImplMBeanWrapper(impl, this));
} catch (JMException jmex) {
LOG.log(Level.WARNING, jmex.getMessage(), jmex);
}
Expand All @@ -188,21 +186,20 @@ public final void addNamedWorkQueue(String name, AutomaticWorkQueue q) {
}
} else if (imanager != null) {
try {
imanager.register(new WorkQueueImplMBeanWrapper((AutomaticWorkQueueImpl)q, this));
imanager.register(new WorkQueueImplMBeanWrapper(impl, this));
} catch (JMException jmex) {
LOG.log(Level.WARNING, jmex.getMessage(), jmex);
}
}
}
}

private AutomaticWorkQueue createAutomaticWorkQueue() {
protected AutomaticWorkQueue createAutomaticWorkQueue() {
AutomaticWorkQueue q = new AutomaticWorkQueueImpl(DEFAULT_QUEUE_NAME);
addNamedWorkQueue(DEFAULT_QUEUE_NAME, q);
return q;
}


class WQLifecycleListener implements BusLifeCycleListener {
public void initComplete() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -78,7 +80,7 @@ public class AutomaticWorkQueueImpl implements AutomaticWorkQueue {
boolean shared;
int sharedCount;

private List<PropertyChangeListener> changeListenerList;
private final List<PropertyChangeListener> changeListenerList;

public AutomaticWorkQueueImpl() {
this(DEFAULT_MAX_QUEUE_SIZE);
Expand All @@ -104,6 +106,7 @@ public AutomaticWorkQueueImpl(int mqs,
long dequeueTimeout) {
this(mqs, initialThreads, highWaterMark, lowWaterMark, dequeueTimeout, "default");
}

public AutomaticWorkQueueImpl(int mqs,
int initialThreads,
int highWaterMark,
Expand Down Expand Up @@ -152,24 +155,13 @@ public int getShareCount() {
protected synchronized ThreadPoolExecutor getExecutor() {
if (executor == null) {
threadFactory = createThreadFactory(name);
executor = new ThreadPoolExecutor(lowWaterMark,
executor = createThreadPoolExecutor(lowWaterMark,
highWaterMark,
TimeUnit.MILLISECONDS.toMillis(dequeueTimeout),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueueSize),
threadFactory) {
@Override
protected void terminated() {
ThreadFactory f = executor.getThreadFactory();
if (f instanceof AWQThreadFactory) {
((AWQThreadFactory)f).shutdown();
}
if (watchDog != null) {
watchDog.shutdown();
}
}
};

new LinkedBlockingQueue<>(maxQueueSize),
threadFactory,
watchDog);

if (LOG.isLoggable(Level.FINE)) {
StringBuilder buf = new StringBuilder(128).append("Constructing automatic work queue with:\n")
Expand Down Expand Up @@ -262,11 +254,13 @@ static class DelayedTaskWrapper implements Delayed, Runnable {
trigger = System.currentTimeMillis() + delay;
}

@Override
public long getDelay(TimeUnit unit) {
long n = trigger - System.currentTimeMillis();
return unit.convert(n, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed delayed) {
long other = ((DelayedTaskWrapper)delayed).trigger;
int returnValue;
Expand All @@ -280,13 +274,14 @@ public int compareTo(Delayed delayed) {
return returnValue;
}

@Override
public void run() {
work.run();
}

}

class WatchDog extends Thread {
protected class WatchDog extends Thread {
DelayQueue<DelayedTaskWrapper> delayQueue;
AtomicBoolean shutdown = new AtomicBoolean(false);

Expand All @@ -300,6 +295,7 @@ public void shutdown() {
interrupt();
}

@Override
public void run() {
DelayedTaskWrapper task;
try {
Expand All @@ -322,7 +318,8 @@ public void run() {
}

}
class AWQThreadFactory implements ThreadFactory {

protected class AWQThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
ThreadGroup group;
String name;
Expand Down Expand Up @@ -386,6 +383,7 @@ public void setName(String s) {
threadFactory.setName(s);
}
}

public String getName() {
return name;
}
Expand Down Expand Up @@ -421,7 +419,7 @@ public void run() {
//The ThreadPoolExecutor in the JDK doesn't expand the number
//of threads until the queue is full. However, we would
//prefer the number of threads to expand immediately and
//only uses the queue if we've reached the maximum number
//only use the queue if we've reached the maximum number
//of threads.
ThreadPoolExecutor ex = getExecutor();
ex.execute(r);
Expand Down Expand Up @@ -487,7 +485,6 @@ public void shutdown(boolean processRemainingWorkItems) {
}
}


/**
* Gets the maximum size (capacity) of the backing queue.
* @return the maximum size (capacity) of the backing queue.
Expand All @@ -504,7 +501,6 @@ public long getSize() {
return executor == null ? 0 : executor.getQueue().size();
}


public boolean isEmpty() {
return executor == null || executor.getQueue().isEmpty();
}
Expand Down Expand Up @@ -566,24 +562,28 @@ public boolean isShutdown() {
}
return executor.isShutdown();
}

public int getLargestPoolSize() {
if (executor == null) {
return 0;
}
return executor.getLargestPoolSize();
}

public int getPoolSize() {
if (executor == null) {
return 0;
}
return executor.getPoolSize();
}

public int getActiveCount() {
if (executor == null) {
return 0;
}
return executor.getActiveCount();
}

public void update(Dictionary<String, String> config) {
String s = config.get("highWaterMark");
if (s != null) {
Expand All @@ -606,6 +606,7 @@ public void update(Dictionary<String, String> config) {
this.maxQueueSize = Integer.parseInt(s);
}
}

public Dictionary<String, String> getProperties() {
Dictionary<String, String> properties = new Hashtable<>();
NumberFormat nf = NumberFormat.getIntegerInstance();
Expand All @@ -617,4 +618,33 @@ public Dictionary<String, String> getProperties() {
properties.put("queueSize", nf.format(getLowWaterMark()));
return properties;
}

protected ThreadPoolExecutor createThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
WatchDog watchDog
) {
return new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
unit.toMillis(keepAliveTime),
unit,
workQueue,
threadFactory) {

@Override
protected void terminated() {
ThreadFactory f = this.getThreadFactory();
if (f instanceof AWQThreadFactory awqThreadFactory) {
awqThreadFactory.shutdown();
}
if (watchDog != null) {
watchDog.shutdown();
}
}
};
}
}
2 changes: 1 addition & 1 deletion core/src/main/resources/META-INF/cxf/bus-extensions.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
org.apache.cxf.bus.managers.PhaseManagerImpl:org.apache.cxf.phase.PhaseManager:true
org.apache.cxf.bus.managers.WorkQueueManagerImpl:org.apache.cxf.workqueue.WorkQueueManager:true
org.apache.cxf.bus.managers.CXFBusLifeCycleManager:org.apache.cxf.buslifecycle.BusLifeCycleManager:true
org.apache.cxf.bus.managers.CXFBusLifeCycleManager:org.apache.cxf.buslifecycle.BusLifeCycleManager:true
org.apache.cxf.bus.managers.ServerRegistryImpl:org.apache.cxf.endpoint.ServerRegistry:true
org.apache.cxf.bus.managers.EndpointResolverRegistryImpl:org.apache.cxf.endpoint.EndpointResolverRegistry:true
org.apache.cxf.bus.managers.HeaderManagerImpl:org.apache.cxf.headers.HeaderManager:true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.cxf.tracing.opentelemetry;

import org.apache.cxf.tracing.opentelemetry.internal.CurrentContextThreadPoolExecutor;
import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryAutomaticWorkQueueImpl extends AutomaticWorkQueueImpl {

@Override
protected ThreadPoolExecutor createThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
AutomaticWorkQueueImpl.WatchDog watchDog
) {
return new CurrentContextThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory) {

@Override
protected void terminated() {
ThreadFactory f = this.getThreadFactory();
if (f instanceof AWQThreadFactory awqThreadFactory) {
awqThreadFactory.shutdown();
}
if (watchDog != null) {
watchDog.shutdown();
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.cxf.tracing.opentelemetry;

import org.apache.cxf.Bus;
import org.apache.cxf.bus.managers.WorkQueueManagerImpl;
import org.apache.cxf.workqueue.AutomaticWorkQueue;

public class OpenTelemetryWorkQueueManagerImpl extends WorkQueueManagerImpl {

public OpenTelemetryWorkQueueManagerImpl(Bus bus) {
super(bus);
}

@Override
public synchronized AutomaticWorkQueue getAutomaticWorkQueue() {
return new OpenTelemetryAutomaticWorkQueueImpl();
}
}
Loading