Skip to content

Commit

Permalink
revert[event]: revert vent
Browse files Browse the repository at this point in the history
  • Loading branch information
jaysunxiao committed Apr 10, 2024
1 parent ad67c2f commit 966fe14
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 91 deletions.
5 changes: 1 addition & 4 deletions event/src/main/java/com/zfoo/event/anno/Bus.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ public enum Bus {

AsyncThread,

VirtualThread,

ManualThread
;
VirtualThread;

}
62 changes: 49 additions & 13 deletions event/src/main/java/com/zfoo/event/manager/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import com.zfoo.event.model.IEvent;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.collection.concurrent.CopyOnWriteHashMapLongObject;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.RandomUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.util.ThreadUtils;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,6 +30,9 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -38,21 +44,55 @@ public abstract class EventBus {
private static final Logger logger = LoggerFactory.getLogger(EventBus.class);

/**
* event mapping
* EN: The size of the thread pool. Event's thread pool is often used to do time-consuming operations, so set it a little bigger
* CN: 线程池的大小. event的线程池经常用来做一些耗时的操作,所以要设置大一点
*/
private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap = new HashMap<>();
private static final int EXECUTORS_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 4) * 2 + 1;

private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE];

private static final CopyOnWriteHashMapLongObject<ExecutorService> threadMap = new CopyOnWriteHashMapLongObject<>(EXECUTORS_SIZE);
/**
* custom thread event receiver
* event mapping
*/
public static BiConsumer<IEventReceiver, IEvent> manualThreadHandler = EventBus::doReceiver;

private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap = new HashMap<>();
/**
* event exception handler
*/
public static BiConsumer<IEventReceiver, IEvent> exceptionHandler = null;
public static Consumer<IEvent> noEventReceiverHandler = null;

static {
for (int i = 0; i < executors.length; i++) {
var namedThreadFactory = new EventThreadFactory(i);
var executor = Executors.newSingleThreadExecutor(namedThreadFactory);
executors[i] = executor;
}
}

public static class EventThreadFactory implements ThreadFactory {
private final int poolNumber;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;

public EventThreadFactory(int poolNumber) {
this.group = Thread.currentThread().getThreadGroup();
this.poolNumber = poolNumber;
}

@Override
public Thread newThread(Runnable runnable) {
var threadName = StringUtils.format("event-p{}-t{}", poolNumber + 1, threadNumber.getAndIncrement());
var thread = new FastThreadLocalThread(group, runnable, threadName);
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> logger.error(t.toString(), e));
var executor = executors[poolNumber];
AssertionUtils.notNull(executor);
threadMap.put(thread.getId(), executor);
return thread;
}
}

/**
* Publish the event
Expand All @@ -74,9 +114,8 @@ public static void post(IEvent event) {
for (var receiver : receivers) {
switch (receiver.bus()) {
case CurrentThread -> doReceiver(receiver, event);
case AsyncThread -> asyncExecute(event.executorHash(), () -> doReceiver(receiver, event));
case AsyncThread -> execute(event.executorHash(), () -> doReceiver(receiver, event));
// case VirtualThread -> Thread.ofVirtual().name("virtual-on" + clazz.getSimpleName()).start(() -> doReceiver(receiver, event));
case ManualThread -> manualThreadHandler.accept(receiver, event);
}
}
}
Expand All @@ -93,14 +132,14 @@ private static void doReceiver(IEventReceiver receiver, IEvent event) {
}

public static void asyncExecute(Runnable runnable) {
asyncExecute(RandomUtils.randomInt(), runnable);
execute(RandomUtils.randomInt(), runnable);
}

/**
* Use the event thread specified by the hashcode to execute the task
*/
public static void asyncExecute(int executorHash, Runnable runnable) {
EventExecutors.execute(executorHash, ThreadUtils.safeRunnable(runnable));
public static void execute(int executorHash, Runnable runnable) {
executors[Math.abs(executorHash % EXECUTORS_SIZE)].execute(ThreadUtils.safeRunnable(runnable));
}

/**
Expand All @@ -110,9 +149,6 @@ public static void registerEventReceiver(Class<? extends IEvent> eventType, IEve
receiverMap.computeIfAbsent(eventType, it -> new ArrayList<>(1)).add(receiver);
}


// ------------------------------------------------------------------------------------------------------------------
static final CopyOnWriteHashMapLongObject<ExecutorService> threadMap = new CopyOnWriteHashMapLongObject<>();
public static Executor threadExecutor(long currentThreadId) {
return threadMap.getPrimitive(currentThreadId);
}
Expand Down
70 changes: 0 additions & 70 deletions event/src/main/java/com/zfoo/event/manager/EventExecutors.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void arrayTest() throws InterruptedException {

var countDownLatch = new CountDownLatch(executorSize);
for (var i = 0; i < executorSize; i++) {
EventBus.asyncExecute(i, new Runnable() {
EventBus.execute(i, new Runnable() {
@Override
public void run() {
addAndRemoveArray();
Expand Down
2 changes: 1 addition & 1 deletion orm/src/main/java/com/zfoo/orm/cache/EntityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void accept(Pair<PK, PNode<E>> pair, LazyCache.RemovalCause removalCause)
var entity = pnode.getEntity();
@SuppressWarnings("unchecked")
var entityClass = (Class<E>) entityDef.getClazz();
EventBus.asyncExecute(entityClass.hashCode(), new Runnable() {
EventBus.execute(entityClass.hashCode(), new Runnable() {
@Override
public void run() {
var collection = OrmContext.getOrmManager().getCollection(entityClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private void schedulePersist() {
if (!OrmContext.isStop()) {
SchedulerBus.schedule(() -> {
if (!OrmContext.isStop()) {
EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> {
EventBus.execute(entityDef.getClazz().hashCode(), () -> {
entityCaches.persistAll();
schedulePersist();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public TimeOrmPersister(EntityDef entityDef, EntityCache<?, ?> entityCaches) {
public void start() {
SchedulerBus.scheduleAtFixedRate(() -> {
if (!OrmContext.isStop()) {
EventBus.asyncExecute(entityDef.getClazz().hashCode(), () -> entityCaches.persistAll());
EventBus.execute(entityDef.getClazz().hashCode(), () -> entityCaches.persistAll());
}
}, rate, TimeUnit.MILLISECONDS);
}
Expand Down

0 comments on commit 966fe14

Please sign in to comment.