diff --git a/event/src/main/java/com/zfoo/event/anno/Bus.java b/event/src/main/java/com/zfoo/event/anno/Bus.java index 0a5b488e0..6f7c60c0e 100644 --- a/event/src/main/java/com/zfoo/event/anno/Bus.java +++ b/event/src/main/java/com/zfoo/event/anno/Bus.java @@ -22,9 +22,6 @@ public enum Bus { AsyncThread, - VirtualThread, - - ManualThread - ; + VirtualThread; } diff --git a/event/src/main/java/com/zfoo/event/manager/EventBus.java b/event/src/main/java/com/zfoo/event/manager/EventBus.java index b2ae2bd79..721a7bd76 100644 --- a/event/src/main/java/com/zfoo/event/manager/EventBus.java +++ b/event/src/main/java/com/zfoo/event/manager/EventBus.java @@ -16,8 +16,11 @@ import com.zfoo.event.model.IEvent; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject; +import com.zfoo.protocol.util.AssertionUtils; import com.zfoo.protocol.util.RandomUtils; +import com.zfoo.protocol.util.StringUtils; import com.zfoo.protocol.util.ThreadUtils; +import io.netty.util.concurrent.FastThreadLocalThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +30,9 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,21 +44,55 @@ public abstract class EventBus { private static final Logger logger = LoggerFactory.getLogger(EventBus.class); /** - * event mapping + * EN: The size of the thread pool. Event's thread pool is often used to do time-consuming operations, so set it a little bigger + * CN: 线程池的大小. event的线程池经常用来做一些耗时的操作,所以要设置大一点 */ - private static final Map, List> receiverMap = new HashMap<>(); + private static final int EXECUTORS_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 4) * 2 + 1; + private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE]; + + private static final CopyOnWriteHashMapLongObject threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTORS_SIZE); /** - * custom thread event receiver + * event mapping */ - public static BiConsumer manualThreadHandler = EventBus::doReceiver; - + private static final Map, List> receiverMap = new HashMap<>(); /** * event exception handler */ public static BiConsumer exceptionHandler = null; public static Consumer noEventReceiverHandler = null; + static { + for (int i = 0; i < executors.length; i++) { + var namedThreadFactory = new EventThreadFactory(i); + var executor = Executors.newSingleThreadExecutor(namedThreadFactory); + executors[i] = executor; + } + } + + public static class EventThreadFactory implements ThreadFactory { + private final int poolNumber; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final ThreadGroup group; + + public EventThreadFactory(int poolNumber) { + this.group = Thread.currentThread().getThreadGroup(); + this.poolNumber = poolNumber; + } + + @Override + public Thread newThread(Runnable runnable) { + var threadName = StringUtils.format("event-p{}-t{}", poolNumber + 1, threadNumber.getAndIncrement()); + var thread = new FastThreadLocalThread(group, runnable, threadName); + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY); + thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); + var executor = executors[poolNumber]; + AssertionUtils.notNull(executor); + threadMap.put(thread.getId(), executor); + return thread; + } + } /** * Publish the event @@ -74,9 +114,8 @@ public static void post(IEvent event) { for (var receiver : receivers) { switch (receiver.bus()) { case CurrentThread -> doReceiver(receiver, event); - case AsyncThread -> asyncExecute(event.executorHash(), () -> doReceiver(receiver, event)); + case AsyncThread -> execute(event.executorHash(), () -> doReceiver(receiver, event)); // case VirtualThread -> Thread.ofVirtual().name("virtual-on" + clazz.getSimpleName()).start(() -> doReceiver(receiver, event)); - case ManualThread -> manualThreadHandler.accept(receiver, event); } } } @@ -93,14 +132,14 @@ private static void doReceiver(IEventReceiver receiver, IEvent event) { } public static void asyncExecute(Runnable runnable) { - asyncExecute(RandomUtils.randomInt(), runnable); + execute(RandomUtils.randomInt(), runnable); } /** * Use the event thread specified by the hashcode to execute the task */ - public static void asyncExecute(int executorHash, Runnable runnable) { - EventExecutors.execute(executorHash, ThreadUtils.safeRunnable(runnable)); + public static void execute(int executorHash, Runnable runnable) { + executors[Math.abs(executorHash % EXECUTORS_SIZE)].execute(ThreadUtils.safeRunnable(runnable)); } /** @@ -110,9 +149,6 @@ public static void registerEventReceiver(Class eventType, IEve receiverMap.computeIfAbsent(eventType, it -> new ArrayList<>(1)).add(receiver); } - - // ------------------------------------------------------------------------------------------------------------------ - static final CopyOnWriteHashMapLongObject threadMap = new CopyOnWriteHashMapLongObject<>(); public static Executor threadExecutor(long currentThreadId) { return threadMap.getPrimitive(currentThreadId); } diff --git a/event/src/main/java/com/zfoo/event/manager/EventExecutors.java b/event/src/main/java/com/zfoo/event/manager/EventExecutors.java deleted file mode 100644 index 030fa309f..000000000 --- a/event/src/main/java/com/zfoo/event/manager/EventExecutors.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.zfoo.event.manager; - -import com.zfoo.protocol.util.AssertionUtils; -import com.zfoo.protocol.util.StringUtils; -import com.zfoo.protocol.util.ThreadUtils; -import io.netty.util.concurrent.FastThreadLocalThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author godotg - */ -public abstract class EventExecutors { - - private static final Logger logger = LoggerFactory.getLogger(EventExecutors.class); - - /** - * EN: The size of the thread pool. Event's thread pool is often used to do time-consuming operations, so set it a little bigger - * CN: 线程池的大小. event的线程池经常用来做一些耗时的操作,所以要设置大一点 - */ - private static final int EXECUTORS_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 4) * 2 + 1; - - private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE]; - - - static { - for (int i = 0; i < executors.length; i++) { - var namedThreadFactory = new EventThreadFactory(i); - var executor = Executors.newSingleThreadExecutor(namedThreadFactory); - executors[i] = executor; - } - } - - public static class EventThreadFactory implements ThreadFactory { - private final int poolNumber; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final ThreadGroup group; - - public EventThreadFactory(int poolNumber) { - this.group = Thread.currentThread().getThreadGroup(); - this.poolNumber = poolNumber; - } - - @Override - public Thread newThread(Runnable runnable) { - var threadName = StringUtils.format("event-p{}-t{}", poolNumber + 1, threadNumber.getAndIncrement()); - var thread = new FastThreadLocalThread(group, runnable, threadName); - thread.setDaemon(false); - thread.setPriority(Thread.NORM_PRIORITY); - thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e)); - var executor = executors[poolNumber]; - AssertionUtils.notNull(executor); - EventBus.threadMap.put(thread.getId(), executor); - return thread; - } - } - - /** - * Use the event thread specified by the hashcode to execute the task - */ - public static void execute(int executorHash, Runnable runnable) { - executors[Math.abs(executorHash % EXECUTORS_SIZE)].execute(ThreadUtils.safeRunnable(runnable)); - } - -} diff --git a/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java b/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java index 69f6b5740..dedb40246 100644 --- a/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java +++ b/net/src/test/java/com/zfoo/net/router/SignalBridgeTest.java @@ -45,7 +45,7 @@ public void arrayTest() throws InterruptedException { var countDownLatch = new CountDownLatch(executorSize); for (var i = 0; i < executorSize; i++) { - EventBus.asyncExecute(i, new Runnable() { + EventBus.execute(i, new Runnable() { @Override public void run() { addAndRemoveArray(); diff --git a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java index 9416af254..14abbe299 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java +++ b/orm/src/main/java/com/zfoo/orm/cache/EntityCache.java @@ -69,7 +69,7 @@ public void accept(Pair> pair, LazyCache.RemovalCause removalCause) var entity = pnode.getEntity(); @SuppressWarnings("unchecked") var entityClass = (Class) entityDef.getClazz(); - EventBus.asyncExecute(entityClass.hashCode(), new Runnable() { + EventBus.execute(entityClass.hashCode(), new Runnable() { @Override public void run() { var collection = OrmContext.getOrmManager().getCollection(entityClass); diff --git a/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java b/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java index 47938ee40..4cb6757f7 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/cache/persister/CronOrmPersister.java @@ -75,7 +75,7 @@ private void schedulePersist() { if (!OrmContext.isStop()) { SchedulerBus.schedule(() -> { if (!OrmContext.isStop()) { - EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> { + EventBus.execute(entityDef.getClazz().hashCode(), () -> { entityCaches.persistAll(); schedulePersist(); }); diff --git a/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java b/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java index 4efe6cae1..723ad5d1c 100644 --- a/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java +++ b/orm/src/main/java/com/zfoo/orm/cache/persister/TimeOrmPersister.java @@ -44,7 +44,7 @@ public TimeOrmPersister(EntityDef entityDef, EntityCache entityCaches) { public void start() { SchedulerBus.scheduleAtFixedRate(() -> { if (!OrmContext.isStop()) { - EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> entityCaches.persistAll()); + EventBus.execute(entityDef.getClazz().hashCode(), () -> entityCaches.persistAll()); } }, rate, TimeUnit.MILLISECONDS); }