Skip to content

Commit

Permalink
chore[router]: comment
Browse files Browse the repository at this point in the history
  • Loading branch information
jaysunxiao committed Nov 24, 2023
1 parent 8dcd0c3 commit b5112ef
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions net/src/main/java/com/zfoo/net/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,19 @@ public void receive(Session session, Object packet, @Nullable Object attachment)
* <p>
* zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。
* 为了简单,可以把Actor可以理解为一个用户或者一个玩家。
* 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果,
* 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果(如果没有uid就用session id)
* 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
* <p>
* zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。
* <p>
* 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理,
* TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。
* <p>
* IAttachment的不同,taskExecutorHash也不同:
* GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
* SignalAttachment:taskExecutorHash通过IRouter和IConsumer的argument参数指定
* 这种流水线做法对cpu缓存非常友好,java线程能大部分时间跑在一个cpu核心,而玩家又和线程一一对应,这样就可以最大限度提高cpu缓存命中率。
* cpu的cache越大命中率就越高,性能提高就越明显。
* <p>
* 单线程热点问题,在负载足够大的情况下,比如5000人同时在线的8核服务器,因为样本足够大每个核心分配的人数差距并不会太大。
* 概率论告诉我们样本大的话分布是均匀的,小概率事件的单线程问题可以忽略,实在不行就加线程。
*/
public void dispatchBySession(PacketReceiverTask task) {
var session = task.getSession();
Expand Down Expand Up @@ -297,8 +299,8 @@ public <T> AsyncAnswer<T> asyncAsk(Session session, Object packet, @Nullable Cla
asyncAnswer.setSignalAttachment(clientSignalAttachment);

clientSignalAttachment.getResponseFuture()
// 因此超时的情况,返回的是null
.completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS).thenApply(answer -> {
.completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS) // 因此超时的情况,返回的是null
.thenApply(answer -> {
if (answer == null) {
throw new NetTimeOutException("async ask [{}] timeout exception", packet.getClass().getSimpleName());
}
Expand All @@ -311,7 +313,8 @@ public <T> AsyncAnswer<T> asyncAsk(Session session, Object packet, @Nullable Cla
throw new UnexpectedProtocolException("client expect protocol:[{}], but found protocol:[{}]", answerClass, answer.getClass().getName());
}
return answer;
}).whenCompleteAsync((answer, throwable) -> {
})
.whenCompleteAsync((answer, throwable) -> {
// 注意:进入这个方法的时机是:在上面的receive方法中,由于是asyncAsk的消息,attachment不为空,会调用CompletableFuture的complete方法
try {
SignalBridge.removeSignalAttachment(clientSignalAttachment);
Expand Down

0 comments on commit b5112ef

Please sign in to comment.