Skip to content

Commit

Permalink
perf[executor]: use & bits operation to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
jaysunxiao committed Aug 11, 2024
1 parent 6b4ad5f commit 4527235
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
8 changes: 5 additions & 3 deletions event/src/main/java/com/zfoo/event/manager/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.util.ThreadUtils;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.MathUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +47,8 @@ public abstract class EventBus {
* EN: The size of the thread pool. Event's thread pool is often used to do time-consuming operations, so set it a little bigger
* CN: 线程池的大小. event的线程池经常用来做一些耗时的操作,所以要设置大一点
*/
private static final int EXECUTORS_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 4) * 2 + 1;
public static final int EXECUTORS_SIZE = MathUtil.safeFindNextPositivePowerOfTwo(Math.max(Runtime.getRuntime().availableProcessors(), 4) * 2);
private static final int EXECUTOR_MASK = EXECUTORS_SIZE - 1;

private static final ExecutorService[] executors = new ExecutorService[EXECUTORS_SIZE];

Expand Down Expand Up @@ -144,8 +146,8 @@ public static void asyncExecute(int hash, Runnable runnable) {
executorOf(hash).execute(ThreadUtils.safeRunnable(runnable));
}

public static ExecutorService executorOf(int hash){
return executors[Math.abs(hash % EXECUTORS_SIZE)];
public static ExecutorService executorOf(int hash) {
return executors[hash & EXECUTOR_MASK];
}

/**
Expand Down
23 changes: 13 additions & 10 deletions net/src/main/java/com/zfoo/net/task/TaskBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.util.ThreadUtils;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.MathUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,8 +41,9 @@ public final class TaskBus {
private static final Logger logger = LoggerFactory.getLogger(TaskBus.class);

// EN: The size of the thread pool can also be specified through the provider thread configuration
// CN: 线程池的大小,也可以通过provider thread配置指定
private static final int EXECUTOR_SIZE;
// CN: 线程池的大小,也可以通过provider thread配置指定,会通过EXECUTOR_MASK的与操作提高定位到Executor的性能
public static final int EXECUTOR_SIZE;
private static final int EXECUTOR_MASK;

/**
* EN: Use different thread pools to achieve isolation between thread pools without affecting each other
Expand All @@ -50,11 +52,13 @@ public final class TaskBus {
private static final ExecutorService[] executors;

static {
var localConfig = NetContext.getConfigManager().getLocalConfig();
var providerConfig = localConfig.getProvider();

EXECUTOR_SIZE = (providerConfig == null || StringUtils.isBlank(providerConfig.getThread())) ? (Runtime.getRuntime().availableProcessors() + 1) : Integer.parseInt(providerConfig.getThread());

var providerConfig = NetContext.getConfigManager().getLocalConfig().getProvider();
var expectThreads = (providerConfig == null || StringUtils.isBlank(providerConfig.getThread()))
? Runtime.getRuntime().availableProcessors()
: Integer.parseInt(providerConfig.getThread());
EXECUTOR_SIZE = MathUtil.safeFindNextPositivePowerOfTwo(expectThreads);
// if EXECUTOR_SIZE = 8 then EXECUTOR_MASK = 7 = 0b0111
EXECUTOR_MASK = EXECUTOR_SIZE - 1;
executors = new ExecutorService[EXECUTOR_SIZE];
for (int i = 0; i < executors.length; i++) {
var namedThreadFactory = new TaskThreadFactory(i);
Expand Down Expand Up @@ -89,8 +93,7 @@ public Thread newThread(Runnable runnable) {


private static int calTaskExecutorIndex(int taskExecutorHash) {
// Other hash algorithms can be customized to make the distribution more uniform
return Math.abs(taskExecutorHash % EXECUTOR_SIZE);
return taskExecutorHash & EXECUTOR_MASK;
}

public static int calTaskExecutorHash(Object argument) {
Expand All @@ -113,7 +116,7 @@ public static void execute(Object argument, Runnable runnable) {
execute(calTaskExecutorHash(argument), runnable);
}

public static ExecutorService executorOf(int hash){
public static ExecutorService executorOf(int hash) {
return executors[calTaskExecutorIndex(hash)];
}

Expand Down

0 comments on commit 4527235

Please sign in to comment.