首頁 > 軟體

ThreadPoolExecutor引數的用法及說明

2023-11-24 22:00:10

ThreadPoolExecutor引數說明

一、ThreadPoolExecutor核心引數說明

1、corePoolSize:核心執行緒數

* 核心執行緒會一直存活,及時沒有任務需要執行

* 當執行緒數小於核心執行緒數時,即使有執行緒空閒,執行緒池也會優先建立新執行緒處理

* 設定allowCoreThreadTimeout=true(預設false)時,核心執行緒會超時關閉

2、queueCapacity:任務佇列容量(阻塞佇列)

* 當核心執行緒數達到最大時,新任務會放在佇列中排隊等待執行

3、maxPoolSize:最大執行緒數

* 當執行緒數>=corePoolSize,且任務佇列已滿時。執行緒池會建立新執行緒來處理任務

* 當執行緒數=maxPoolSize,且任務佇列已滿時,執行緒池會拒絕處理任務而丟擲異常

4、 keepAliveTime:執行緒空閒時間

* 當執行緒空閒時間達到keepAliveTime時,執行緒會退出,直到執行緒數量=corePoolSize

* 如果allowCoreThreadTimeout=true,則會直到執行緒數量=0

5、allowCoreThreadTimeout:允許核心執行緒超時

6、rejectedExecutionHandler:任務拒絕處理器

* 兩種情況會拒絕處理任務:

  • - 當執行緒數已經達到maxPoolSize,切佇列已滿,會拒絕新任務
  • - 當執行緒池被呼叫shutdown()後,會等待執行緒池裡的任務執行完畢,再shutdown。如果在呼叫shutdown()和執行緒池真正shutdown之間提交任務,會拒絕新任務

* 執行緒池會呼叫rejectedExecutionHandler來處理這個任務。如果沒有設定預設是AbortPolicy,會丟擲異常

* ThreadPoolExecutor類有幾個內部實現類來處理這類情況:

  • - AbortPolicy 丟棄任務,拋執行時異常
  • - CallerRunsPolicy 執行任務
  • - DiscardPolicy 忽視,什麼都不會發生
  • - DiscardOldestPolicy 從佇列中踢出最先進入佇列(最後一個執行)的任務

* 實現RejectedExecutionHandler介面,可自定義處理器

二、ThreadPoolExecutor執行順序

執行緒池按以下行為執行任務

1. 當執行緒數小於核心執行緒數時,建立執行緒。

2. 當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。

3. 當執行緒數大於等於核心執行緒數,且任務佇列已滿

  • -1 若執行緒數小於最大執行緒數,建立執行緒
  • -2 若執行緒數等於最大執行緒數,丟擲異常,拒絕任務

三、ThreadPoolExecutor如何設定引數

1、預設值

* corePoolSize=1 * queueCapacity=Integer.MAX_VALUE * maxPoolSize=Integer.MAX_VALUE * keepAliveTime=60s * allowCoreThreadTimeout=false * rejectedExecutionHandler=AbortPolicy()

2、如何來設定

* 需要根據幾個值來決定

  • - tasks :每秒的任務數,假設為1000
  • - taskcost:每個任務花費時間,假設為0.1s
  • - responsetime:系統允許容忍的最大響應時間,假設為1s

* 做幾個計算

- corePoolSize = 每秒需要多少個執行緒處理?

* 一顆CPU核心同一時刻只能執行一個執行緒,然後作業系統切換上下文,核心開始執行另一個執行緒的程式碼,以此類推,超過cpu核心數,就會放入佇列,如果佇列也滿了,就另起一個新的執行緒執行,所有推薦:corePoolSize = ((cpu核心數 * 2) + 有效磁碟數),java可以使用Runtime.getRuntime().availableProcessors()獲取cpu核心數

- queueCapacity = (coreSizePool/taskcost)*responsetime

* 計算可得 queueCapacity = corePoolSize/0.1*1。意思是佇列裡的執行緒可以等待1s,超過了的需要新開執行緒來執行

* 切記不能設定為Integer.MAX_VALUE,這樣佇列會很大,執行緒數只會保持在corePoolSize大小,當任務陡增時,不能新開執行緒來執行,響應時間會隨之陡增。

- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)

* 計算可得 maxPoolSize = (1000-corePoolSize)/10,即(每秒並行數-corePoolSize大小) / 10

* (最大任務數-佇列容量)/每個執行緒每秒處理能力 = 最大執行緒數

  • - rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
  • - keepAliveTime和allowCoreThreadTimeout採用預設通常能滿足

ThreadPoolExecutor引數allowCoreThreadTimeOut

ThreadPoolExecutor的執行流程有一點可能被吐槽過,就是隻有快取佇列已經滿了的時候才會使用到maxPoolSize建立新的執行緒.也就是說如果corePoolSize設為0的時候,要等到佇列滿了,才會建立執行緒去執行任務

之前有被問到,希望沒有任務的時候執行緒池裡的執行緒可以停掉。可能對效能和資源有過考慮的人都會想到這個問題吧

今天看JDK原始碼的時候發現了ThreadPoolExecutor在1.6的時候已經支援了

allowCoreThreadTimeOut引數就是為此設計的

    /**
     * Sets the policy governing whether core threads may time out and
     * terminate if no tasks arrive within the keep-alive time, being
     * replaced if needed when new tasks arrive. When false, core
     * threads are never terminated due to lack of incoming
     * tasks. When true, the same keep-alive policy applying to
     * non-core threads applies also to core threads. To avoid
     * continual thread replacement, the keep-alive time must be
     * greater than zero when setting {@code true}. This method
     * should in general be called before the pool is actively used.
     *
     * @param value {@code true} if should time out, else {@code false}
     * @throws IllegalArgumentException if value is {@code true}
     *         and the current keep-alive time is not greater than zero
     *
     * @since 1.6
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

在ThreadPoolExecutor建構函式的註釋上也有明確說明:corePoolSize 的數量會一直保持,即使這些執行緒是空閒的,除非設定了allowCoreThreadTimeOut

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

當然,在Executors的靜態工廠裡面的newCachedThreadPool提供了另外一種思路

coreSize為0
SynchronousQueue最多隻能有一個任務在佇列裡面

也就是說這個執行緒池的任務會被立即分配一個執行緒去處理,如果沒有空閒的執行緒會立即建立執行緒。

在空閒的時候,執行緒數量會減少直至為0,這一點倒是滿足了要求。可是佇列中最多隻會快取一個任務,當任務的處理速度慢於任務進入執行緒池的速度時,執行緒數量就會不斷膨脹。如果maxPoolSize設定成一個比較小的數位時,可能就會有大量任務被拒絕策略處理。

所以正如註釋中所說,newCachedThreadPool只適合於任務處理速度很快的場景下。比如做一些計算,不需要依賴其它服務

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

總結

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com