Skip to content

本项目是基于JDK实现的动态线程池,通过它我们可以实现运行时对线程池进行监控与配置。

License

Notifications You must be signed in to change notification settings

kc-co9/dynamic-threadpool

Repository files navigation

动态线程池

概述

在计算机编程中,线程池是一种用于程序并发执行的软件设计模式。在线程池中会维护多个线程(等待被分配),以此来避免频繁地创建、销毁线程(执行短暂任务)而导致的延迟增加和性能下降。

其中,为了更加高效率的使用线程池,我们一般会将执行的任务分为CPU密集型和IO密集型,并基于这种分类设置不同的线程池参数,即:

  • 对于CPU密集型的任务,在N个处理器的系统上,当线程池的大小设置为N+1时,通常能实现最优的利用率(对于额外的线程可以在线程池中某个线程由于某种原因被暂停时发挥作用,以至于CPU时钟周期不会被浪费)。

  • 对于IO密集型的任务,由于线程不会一直在执行,因此线程池相比于CPU密集型规模应该更大。而对于这种类型的线程池可以通过以下公式进行计算:

    线程池大小 = N * U * (1`+ W/C)  
    其中: N = CPU核数, U = CPU利用率, 0 <= U <= 1 ,W/C = 等待时间 / 计算时间
    

从上述公式可得出如果任务等待时间越长,线程池应该设置的越大,以至于更好的利用CPU,因为等待时间越长CPU的空闲时间就越长。其中,对于当前设备的CPU核数可通过Runtime.getRuntime().availableProcessors()方法获得。

此线程池大小的计算方法源自于《Java并发编程实践》。

但是,由于上述计算方式过于理论化,在实际应用中很难确定其计算时间等待时间等,因此这种方式存在一定的争议。

在翻阅大量的资料后,笔者仍然没有找到线程池配置参数的最佳计算方式,并且随着系统运行、项目迭代等各种动态因素的影响下通过一次计算就能得到最合适的参数显然是不现实的。对于这种参数随着环境动态变化的场景,显然我们可以借鉴配置中心的思路,即,将线程池的配置参数动态化。这样我们就可以根据环境的变化,对线程池参数快速地进行调整了。

设计

在阅读动态线程池的设计思路前,我们首先来看看Java中的线程池ThreadPoolExecutor是如何运转的,具体如下所示:

   task:  XXX                          +-----+-----+-----+-----+
          XXX           2.submit to q  |     |     | XXX | XXX |
                       +-------------->+     |     | XXX | XXX +----+
                       |               +-----+-----+-----+-----+    |
                       |                                            |
                       |                                            | poll()/take()
                       |                                            |
                       |                                            v
                       |                                +-----------+-----------+
                       |                                |                       |
                       |                                |  +----------------+   |
+----------+      +----+----+      1.submit to corepool |  |                |   |
|   main   +----->+ execute +-----+---------------------------> XXX  XXX    +--------------------+
+----------+      +----+----+     |                     |  |    XXX  XXX    |   |                |
                       |          |                     |  |                |   |                |
                       |          |                     |  |                |   |                v
                       |          |                     |  |                |   |          +-----+------+
                       |          |                     |  |    corePool    |   |          |            |
                       |          |                     |  +----------------+   |          |    exit    |
                       |          |                     |                       |          |            |
                       |          +---------------------------> XXX  XXX        |          +-----+------+
                       |           3.submit to maxpool  |       XXX  XXX        |                ^
                       |                                |                       |                |
                       |                                |       XXX  XXX        |                |
                       |                                |       XXX  XXX        +----------------+
                       |                                |                       |
                       |                                |      maximumPool      |
                       |                                +-----------------------+
                       |              +--------------+
                       |              |              |
                       +------------->+    reject    |
                        4.reject task |              |
                                      +--------------+

简单来说,当一个新任务提交到execute方法时:

  • 如果正在运行的线程数小于corePoolSize,则会创建一个新的线程去处理这个请求(即使存在其他工作线程处于空闲状态)。
  • 如果正在运行的线程数大于corePoolSize但小于maximumPoolSize,并且任务队列未饱和(未满),则会将任务添加到队列中进行排队(如无空闲线程)。
  • 如果正在运行的线程数大于corePoolSize但小于maximumPoolSize,并且任务队列已饱和(已满),则会创建一个新线程去处理这个请求(如无空闲线程)。
  • 如果正在运行的线程数大于等于maximumPoolSize,并且任务队列已饱和(已满),则会执行拒绝策略(如无空闲线程)。

其中,如果线程池当前具有多于corePoolSize个线程,对于这些超出corePoolSize的线程则会在其空闲时间超过keepAliveTime时被终止。

于此同时,ThreadPoolExecutor也提供了这些参数的getter/setter方法让我们可以进行动态地获取和设置。

方法 说明
getThreadFactory
setThreadFactory
获取/设置用于创建线程的线程工厂。
getRejectedExecutionHandler
setRejectedExecutionHandler
获取/设置用于执行拒绝策略的处理器。
getCorePoolSize
setCorePoolSize
获取/设置core线程数(可覆盖构造时的设置)。如果设置的新值小于当前值,则超过的线程会在下一次空闲时被终止;如果设置的新值大于当前值,则将会开始执行等待队列中的任务(如有)。
prestartCoreThread
prestartAllCoreThreads
预启动一个/所有core线程,默认情况下只有当新任务执行时才会启动core线程。
allowsCoreThreadTimeOut
获取/设置在keep-alive时间内core线程是否允许超时终止。
getMaximumPoolSize
setMaximumPoolSize
获取/设置线程池最大线程数(可覆盖构造时的设置)。如果设置的新值小于当前值,则超出的线程会在空闲状态时被终止。
getKeepAliveTime
setKeepAliveTime
获取/设置线程池中线程的最大空闲时间,默认情况下只作用于非core线程。

另外,ThreadPoolExecutor还提供了一些数据统计字段/方法让我们可以获知线程池的运行状况。

方法 说明
getPoolSize 获取当前线程池的线程数量。
getActiveCount 获取当前线程池正在执行任务的线程数量(大约)。
getLargestPoolSize 获取当前线程池曾经出现过的最大线程数。
getTaskCount 获取当前线程池已经被调度执行的任务总数量(大约)。
getCompletedTaskCount 获取当前线程池已经被执行完成的任务总数量(大约)。

最终,我们通过ThreadPoolExecutor的这些参数/方法进行监控与配置,整体设计思路如下所示:

                                                    +--------------------------------+
                                                    |          managerment           |
                                                    | +------------+  +------------+ |
                                                    | |            |  |            | |
                                                    | | server mng |  |executor mng| |
                                                    | |            |  |            | |
                                                    | +------------+  +------------+ |
                                                    +--------------------------------+
                                                                  |    ^
                                                                  |    |
+----------------------------------+                     configure|    |display
|         business server          |                              |    |                     +------------+
|  +----------------------------+  |                              v    |                     |            |
|  |         exeuctor           |  |  report cfg/stat   +---------+----+----------+  store   |            |
|  | +---------+   +----------+ |  +------------------->+                         +--------->+            |
|  | |         |   |          | |  |                    |  monitor/config center  |          |  database  |
|  | |fetch cfg|   |report cfg| |  +<-------------------+                         +<---------+            |
|  | |         |   |          | |  |    fetch update    +-------------------------+  fetch   |            |
|  | +---------+   +----------+ |  |                                                         |            |
|  +----------------------------+  |                                                         +------------+
+----------------------------------+

架构

在了解完设计思路后,下面我们再来看看整个动态线程池的功能架构,即:

+------------------------------------------------------------------+
|                                                                  |
|   +----------------------------------------------------------+   |
|   |                                                          |   |
|   |                     Spring Security                      |   |
|   |                                                          |   |
|   +----------------------------------------------------------+   |
|                                                                  |
|   +-------------------------+  +-----------------------------+   |
|   |       server mng        |  |       executor mng          |   |
|   | +---------+ +---------+ |  | +-----------+ +-----------+ |   |
|   | |  insert | | update  | |  | |           | |           | |   |
|   | +---------+ +---------+ |  | |           | |           | |   |
|   | +---------------------+ |  | |  monitor  | | configure | |   |
|   | |                     | |  | |           | |           | |   |
|   | |       monitor       | |  | |           | |           | |   |
|   | |                     | |  | +-----------+ +-----------+ |   |
|   | +---------------------+ |  | |-----------| |-----------| |   |
|   +-------------------------+  +-----------------------------+   |
|                                                                  |
|   +----------------------------------------------------------+   |
|   | +-------------------------+ +-------------------------+  |   |
|   | |                         | |                         |  |   |
|   | |         MySQL           | |         Redis           |  |   |
|   | |                         | |                         |  |   |
|   | +-------------------------+ +-------------------------+  |   |
|   +----------------------------------------------------------+   |
|                                                                  |
+------------------------------------------------------------------+

参考

About

本项目是基于JDK实现的动态线程池,通过它我们可以实现运行时对线程池进行监控与配置。

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published