中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Java多線程:線程池(chi)

一、 背景

線(xian)(xian)程是稀缺(que)資源,如果無限制的(de)創建,不僅(jin)會消耗(hao)系統(tong)資源,還會降低(di)系統(tong)的(de)穩(wen)定性,合理的(de)使用線(xian)(xian)程池可(ke)以對線(xian)(xian)程進行(xing)統(tong)一的(de)分配、調優和監(jian)控(kong),并(bing)有以下好(hao)處:
    第(di)一(yi):降低(di)資源(yuan)消(xiao)(xiao)耗。通過(guo)重復利用已創建的(de)(de)線程降低(di)線程創建和銷毀造成的(de)(de)消(xiao)(xiao)耗。
    第二(er):提(ti)高響(xiang)應速度。當(dang)任務到達(da)時,任務可以(yi)不需要等到線程創建就能立即(ji)執行(xing)。
    第三:提(ti)高線程的可管理性。
 

二、線程池的架(jia)構

三、Executors

    用(yong)于創建線程池(chi), 包含五(wu)種創建線程池(chi)的(de)方法(fa)

newFixedThreadPool(固定大小線(xian)程(cheng)池)

   
    初始化一個定長線程數的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程,超出的線程會在隊列中等待
 
public static ExecutorService newFixedThreadPool(int nThreads) {  
      return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());  
}
 

newCachedThreadPool(無(wu)界線(xian)程(cheng)池,可以進(jin)行自動線(xian)程(cheng)回收)

    1、初(chu)始化一個可以緩存線程(cheng)的線程(cheng)池,默認緩存60s,使(shi)用(yong)SynchronousQueue作為阻塞隊(dui)列;
    2、newCachedThreadPool在沒有任務(wu)執(zhi)行時(shi),當線程的空閑時(shi)間超過keepAliveTime,會自(zi)動釋放線程資源,當提交新(xin)任務(wu)時(shi),如果(guo)沒有空閑線程,則(ze)創建(jian)新(xin)線程執(zhi)行任務(wu),會導(dao)致一定的系統開銷;
所以,該方(fang)法返回的線(xian)(xian)程(cheng)(cheng)池(chi)是沒(mei)有線(xian)(xian)程(cheng)(cheng)上(shang)限(xian)的,在使用(yong)時一定要(yao)(yao)當心(xin),因為沒(mei)有辦法控制總體的線(xian)(xian)程(cheng)(cheng)數量,而每個線(xian)(xian)程(cheng)(cheng)都是消耗內存(cun)的,這(zhe)可能會導致過(guo)多(duo)的內存(cun)被占用(yong)。建議(yi)盡量不(bu)要(yao)(yao)用(yong)這(zhe)個方(fang)法返回的線(xian)(xian)程(cheng)(cheng)池(chi),而要(yao)(yao)使用(yong)有上(shang)限(xian)的線(xian)(xian)程(cheng)(cheng)池(chi)
 
public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());  
    } 
 
 

newSingleThreadExecutor(單個后(hou)臺線程)

   
    初(chu)始化的(de)(de)線(xian)(xian)程(cheng)池中只有(you)一個(ge)線(xian)(xian)程(cheng),如果該線(xian)(xian)程(cheng)異常結束,會重(zhong)新(xin)創(chuang)建一個(ge)新(xin)的(de)(de)線(xian)(xian)程(cheng)繼續執行任務,唯一的(de)(de)線(xian)(xian)程(cheng)可以保證所提(ti)交(jiao)任務的(de)(de)順序(xu)執行,內部使用LinkedBlockingQueue作為阻塞隊列。
 
public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));  
    } 
 
 

newScheduledThreadPool

  創建(jian)一個固定長度的線程池,而且以延遲或定時的方式來執(zhi)行任務。
 
通過(guo)如上配置(zhi)的(de)線程池的(de)創建方法(fa)源代碼,我(wo)們可以發現:
   1. 除了(le)CachedThreadPool使用(yong)的(de)是直接提交策略的(de)緩沖隊列以外(wai),其(qi)余兩個采用(yong)的(de)都是無界緩沖隊列
   2. 三個線(xian)程池(chi)采用的(de)ThreadPoolExecutor構造方法都是(shi)同一個,使用的(de)都是(shi)默(mo)認的(de)ThreadFactory和handler:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();  
   
 public ThreadPoolExecutor(int corePoolSize,  
                     int maximumPoolSize,  
                     long keepAliveTime,  
                     TimeUnit unit,  
                     BlockingQueue<Runnable> workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
        Executors.defaultThreadFactory(), defaultHandler);  
}
 

四、ExecutorService任務周期管理(li)接口

    Executor的(de)實現(xian)通常都(dou)會創建線程來執行任務,但(dan)是使用異步方(fang)式來執行任務時(shi),由于之(zhi)前提(ti)交任務的(de)狀態(tai)不是立即(ji)可見的(de),所以如果(guo)要(yao)關閉應用程序時(shi),就需要(yao)將受影響的(de)任務狀態(tai)反(fan)饋給(gei)應用程序。
    為了解(jie)決執行服(fu)務(wu)的生命周期問題,Executor擴展了EecutorService接口,添加了一些(xie)用于生命周期管理的方(fang)法。如(ru)下:
public interface ExecutorService extends Executor {  
    void shutdown();  
    List<Runnable> shutdownNow();  
    boolean isShutdown();  
    boolean isTerminated();  
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
    // 省略部分方法  
}

submit() 與execute()區別

 

1、接收的參數不一樣

  submit()可以接受(shou)runnable無返回值和callable有返回值 
  execute()接受runnable 無(wu)返回值

 

2、submit有返回值,而(er)execute沒有

 

  Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion.

 

  用到返回值的(de)(de)例(li)子,比(bi)如(ru)說我有(you)很多個做validation的(de)(de)task,我希望所有(you)的(de)(de)task執(zhi)行(xing)完(wan),然后每個task告(gao)訴我它的(de)(de)執(zhi)行(xing)結(jie)果,是成(cheng)功還是失敗,如(ru)果是失敗,原因是什么(me)。

 

3、submit方(fang)便Exception處(chu)理

 

  There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don’t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task’s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.

 

  意思就(jiu)是如果你在你的(de)task里會拋(pao)出(chu)checked或者unchecked exception,而你又希望外(wai)面的(de)調用者能夠(gou)感知(zhi)這些exception并(bing)做(zuo)出(chu)及時的(de)處理,那(nei)么就(jiu)需(xu)要(yao)用到submit,通過捕獲Future.get拋(pao)出(chu)的(de)異常(chang)。

public class ExecutorServiceTest {  
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        List<Future<String>> resultList = new ArrayList<Future<String>>();  

        // 創建10個任務并執行  
        for (int i = 0; i < 10; i++) {  
            // 使用ExecutorService執行Callable類(lei)型的(de)任務,并將結(jie)果保存(cun)在future變量中  
            Future<String> future = executorService.submit(new TaskWithResult(i));  
            // 將任(ren)務執行(xing)結果(guo)存儲到(dao)List中  
            resultList.add(future);  
        }  
        executorService.shutdown();  

        // 遍(bian)歷(li)任務的結果  
        for (Future<String> fs : resultList) {  
            try {  
                System.out.println(fs.get()); // 打(da)印各個線程(任務)執行的結(jie)果  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {                    
                e.printStackTrace();    
            }  finally {
        executorService.shutdownNow();
       }
        }  
    }  
}  

class TaskWithResult implements Callable<String> {  
    private int id;  

    public TaskWithResult(int id) {  
        this.id = id;  
    }  

    /** 
     * 任務的具體(ti)過程,一旦任務傳給ExecutorService的submit方法(fa)(fa),則該方法(fa)(fa)自動在(zai)一個(ge)線程上執行。 
     *  
     * @return 
     * @throws Exception 
     */  
    public String call() throws Exception {  
        System.out.println("call()方法被自動(dong)調用(yong),干活!!!             " + Thread.currentThread().getName());  
        if (new Random().nextBoolean())  
            throw new TaskException("Meet error in task." + Thread.currentThread().getName());  
        // 一個模擬耗(hao)時(shi)的操(cao)作(zuo)  
        for (int i = 999999999; i > 0; i--)  
            ;  
        return "call()方(fang)法被自動調(diao)用(yong),任務的(de)結(jie)果是(shi):" + id + "    " + Thread.currentThread().getName();  
    }  
}  

class TaskException extends Exception {  
    public TaskException(String message) {  
        super(message);  
    }  
}

 

 

五、ThreadPoolExecutor提交任務流程

線程池(chi)的主要工作流程如下(xia)圖(tu):
 
 
 
類(lei)中(zhong)定義的重要(yao)變量,如下:
 
  1. private final BlockingQueue<Runnable> workQueue;              // 阻塞(sai)隊列  
  2. private final ReentrantLock mainLock = new ReentrantLock();   // 互斥鎖  
  3. private final HashSet<Worker> workers = new HashSet<Worker>();// 線(xian)程集(ji)合.一(yi)個Worker對應一(yi)個線(xian)程  
  4. private final Condition termination = mainLock.newCondition();// 終(zhong)止條(tiao)件  
  5. private int largestPoolSize;           // 線程池中線程數量曾經達(da)到過的(de)最大值。  
  6. private long completedTaskCount;       // 已完成任務數量  
  7. private volatile ThreadFactory threadFactory;     // ThreadFactory對象,用于創(chuang)建(jian)線程。  
  8. private volatile RejectedExecutionHandler handler;// 拒絕策(ce)略的處理(li)句柄  
  9. private volatile long keepAliveTime;   // 線程池維護線程所允許的空閑時間  
  10. private volatile boolean allowCoreThreadTimeOut;  
  11. private volatile int corePoolSize;     // 線(xian)程池維護線(xian)程的最小數量,哪怕是空閑的  
  12. private volatile int maximumPoolSize;  // 線程池維護的最大線程數量

 

其中有幾個(ge)重要(yao)的規則(ze)需要(yao)說明(ming)一下:

1、 corePoolSize與maximumPoolSize  

   線程池(chi)將根據 corePoolSize和(he) maximumPoolSize設置的邊界自(zi)動(dong)調整池(chi)大(da)小,當新任務(wu)在方法 execute() 中提交時:
  • 如果(guo)當前(qian)線(xian)程(cheng)池中的線(xian)程(cheng)數目<corePoolSize,則(ze)每來一個任(ren)務(wu)(wu),就會創建一個線(xian)程(cheng)去(qu)執行(xing)這個任(ren)務(wu)(wu);
  • 如(ru)果(guo)當前線(xian)程池中(zhong)的線(xian)程數目>=corePoolSize,則每來一個任(ren)務,會嘗試將(jiang)(jiang)其(qi)添加到任(ren)務緩存隊列當中(zhong),若添加成(cheng)功(gong),則該(gai)任(ren)務會等(deng)待空閑線(xian)程將(jiang)(jiang)其(qi)取出去(qu)執行(xing);當隊列滿時才創建新線(xian)程去(qu)處(chu)理請求;
  • 如果當前(qian)線程池中的線程數目達到maximumPoolSize,即(ji)隊列已經(jing)滿了,則通過handler所指定的任務拒絕(jue)策略(lve)來處理新請求;
  • 如果線(xian)(xian)程(cheng)(cheng)池中的線(xian)(xian)程(cheng)(cheng)數(shu)量大于(yu)corePoolSize時(shi),并且(qie)某線(xian)(xian)程(cheng)(cheng)空閑時(shi)間超過keepAliveTime,線(xian)(xian)程(cheng)(cheng)將(jiang)被終止,直至線(xian)(xian)程(cheng)(cheng)池中的線(xian)(xian)程(cheng)(cheng)數(shu)目不大于(yu)corePoolSize;
 
也就是說,處理任務的優先級(ji)為: 
  • 1. 核心線(xian)程corePoolSize > 任務(wu)隊列workQueue > 最大(da)線(xian)程maximumPoolSize,如(ru)果三者都滿了,使用handler處理被拒絕(jue)的(de)任務(wu)。
  • 2. 當池中的(de)(de)線程數(shu)大于(yu)corePoolSize的(de)(de)時(shi)候,多余的(de)(de)線程會等待keepAliveTime長的(de)(de)時(shi)間(jian),如果無請(qing)求(qiu)可處理(li)就自(zi)行銷(xiao)毀。
 

2、 workQueue 

線程池(chi)所使用(yong)的緩沖(chong)隊列,該緩沖(chong)隊列的長度決定了(le)能夠緩沖(chong)的最大(da)數量,緩沖(chong)隊列有三種通用(yong)策略:
   1) 直接(jie)提(ti)交。SynchronousQueue,它將任(ren)務(wu)直接(jie)提(ti)交給線(xian)程執(zhi)行而(er)不(bu)保存它們。在(zai)此,如果不(bu)存在(zai)可用于(yu)立即運行任(ren)務(wu)的線(xian)程,則試圖把(ba)任(ren)務(wu)加入隊列將失敗,因此會構造一個新(xin)的線(xian)程。此策略可以(yi)避(bi)免(mian)在(zai)處理可能具(ju)有內(nei)部依賴(lai)性的請求集(ji)時出現鎖。直接(jie)提(ti)交通(tong)常要求無界 maximumPoolSizes 以(yi)避(bi)免(mian)拒絕(jue)新(xin)提(ti)交的任(ren)務(wu)。
   2) 無界隊(dui)列(lie)。使用無界隊(dui)列(lie)將導致在(zai)(zai)所有 corePoolSize 線程都忙時新任(ren)務(wu)在(zai)(zai)隊(dui)列(lie)中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了(le)。)當每(mei)個任(ren)務(wu)完全獨(du)立(li)于(yu)其他任(ren)務(wu),即任(ren)務(wu)執(zhi)行互不影(ying)響時,適合于(yu)使用無界隊(dui)列(lie);
   3) 有(you)界(jie)(jie)隊列。當使(shi)用(yong)(yong)有(you)限的(de) maximumPoolSizes 時,有(you)界(jie)(jie)隊列(如(ru)(ru) ArrayBlockingQueue)有(you)助于防止資源耗盡(jin),但(dan)是(shi)可(ke)能(neng)較難調整(zheng)和控制。隊列大(da)(da)小(xiao)和最(zui)大(da)(da)池(chi)大(da)(da)小(xiao)可(ke)能(neng)需要相互折衷:使(shi)用(yong)(yong)大(da)(da)型隊列和小(xiao)型池(chi)可(ke)以最(zui)大(da)(da)限度地降(jiang)低 CPU 使(shi)用(yong)(yong)率、操作(zuo)系(xi)統(tong)資源和上下文切換(huan)開銷(xiao),但(dan)是(shi)可(ke)能(neng)導致(zhi)人工降(jiang)低吞吐量(liang)。如(ru)(ru)果任務頻繁阻塞(例如(ru)(ru),如(ru)(ru)果它們是(shi) I/O 邊界(jie)(jie)),則系(xi)統(tong)可(ke)能(neng)超過您許可(ke)的(de)更多線程安排時間(jian)。使(shi)用(yong)(yong)小(xiao)型隊列通常要求(qiu)較大(da)(da)的(de)池(chi)大(da)(da)小(xiao),CPU 使(shi)用(yong)(yong)率較高,但(dan)是(shi)可(ke)能(neng)遇(yu)到(dao)不可(ke)接(jie)受的(de)調度開銷(xiao),這樣(yang)也會降(jiang)低吞吐量(liang).

3、ThreadFactory  

  &nbsp; 使用 ThreadFactory 創建新線程(cheng)(cheng)。通過提供不(bu)同的 ThreadFactory,可以改變線程(cheng)(cheng)的名稱、線程(cheng)(cheng)組、優(you)先級、守護進程(cheng)(cheng)狀態等(deng)等(deng)。如果從 newThread 返回(hui) null 時(shi) ThreadFactory 未能(neng)創建線程(cheng)(cheng),則(ze)執行(xing)程(cheng)(cheng)序將繼續運行(xing),但不(bu)能(neng)執行(xing)任何(he)任務。
public interface ThreadFactory {  
    Thread newThread(Runnable r);  
}
  
而構造(zao)方法中的threadFactory對象(xiang),是通過 Executors.defaultThreadFactory()返(fan)回(hui)的。 

4、RejectedExecutionHandler   

    當(dang)Executor已(yi)(yi)經關(guan)閉(即執(zhi)行了executorService.shutdown()方法后(hou)),并且Executor將有限邊界用于最大線(xian)程(cheng)和(he)工(gong)作隊列(lie)容(rong)量,且已(yi)(yi)經飽和(he)時,在方法execute()中提交的新(xin)任務(wu)將被拒(ju)絕.
   在以上述(shu)情(qing)況(kuang)下(xia),execute 方(fang)法(fa)(fa)將調用RejectedExecutionHandler.rejectedExecution() 方(fang)法(fa)(fa)。
下面提供了四種預(yu)定義的處理程序策略:
       1) AbortPolicy            直接拋出異常(chang) RejectedExecutionException;
    2) CallerRunsPolicy   &nbsp;   用調用者(zhe)所在的線(xian)程來執行(xing)任務
    3) DiscardPolicy          不能執行的任務將被(bei)刪除(chu);
    4) DiscardOldestPolicy    如(ru)果(guo)執行程序尚(shang)未(wei)關閉,則位于工作(zuo)隊列頭(tou)部的任務將被刪(shan)除(chu),然后重(zhong)試執行程序(如(ru)果(guo)再次失敗,則重(zhong)復(fu)此過程)。

5、keepAliveTime

線程(cheng)空閑(xian)時的存活時間,即當線程(cheng)沒有(you)任務執行時,繼續存活的時間;默認情(qing)況下,該參數只(zhi)在線程(cheng)數大于corePoolSize時才有(you)用;

六、線程池的關閉

&nbsp;   通過(guo)調用線程(cheng)池的(de)shutdown或shutdownNow方法來關閉(bi)線程(cheng)池,但是(shi)它們的(de)實(shi)現原理不同:
    shutdown只是(shi)將線(xian)程(cheng)池的狀態設置成SHUTDOWN狀態,然后中(zhong)斷(duan)所(suo)有沒有正在(zai)執行(xing)任務的線(xian)程(cheng)。
    shutdownNow是遍(bian)歷線(xian)(xian)(xian)程(cheng)池中(zhong)的(de)工作線(xian)(xian)(xian)程(cheng),然后(hou)逐(zhu)個(ge)調用(yong)線(xian)(xian)(xian)程(cheng)的(de)interrupt方法來中(zhong)斷(duan)線(xian)(xian)(xian)程(cheng),所以無(wu)法響應中(zhong)斷(duan)的(de)任務(wu)(wu)可能永(yong)遠無(wu)法終止。shutdownNow會首先(xian)將線(xian)(xian)(xian)程(cheng)池的(de)狀態設置成STOP,然后(hou)嘗試停止所有的(de)正在(zai)執行或暫停任務(wu)(wu)的(de)線(xian)(xian)(xian)程(cheng),并返回等待(dai)執行任務(wu)(wu)的(de)列表。
    只(zhi)要(yao)調(diao)用(yong)了這兩個(ge)關(guan)(guan)閉(bi)(bi)方法的其(qi)中(zhong)一個(ge),isShutdown方法就(jiu)會返回true。當所(suo)有的任務都已關(guan)(guan)閉(bi)(bi)后,才表示線(xian)(xian)程池關(guan)(guan)閉(bi)(bi)成功,這時調(diao)用(yong)isTerminaed方法會返回true。至于我們應(ying)該(gai)調(diao)用(yong)哪一種方法來(lai)關(guan)(guan)閉(bi)(bi)線(xian)(xian)程池,應(ying)該(gai)由提交到(dao)線(xian)(xian)程池的任務特性決定(ding),通常調(diao)用(yong)shutdown來(lai)關(guan)(guan)閉(bi)(bi)線(xian)(xian)程池,如果任務不(bu)一定(ding)需要(yao)執行完,則(ze)可以調(diao)用(yong)shutdownNow。
 

七、線程池(chi)的配置(zhi)

可以(yi)從以(yi)下幾個角度(du)來進行(xing)分析:
    1. ;任務(wu)的性質(zhi):CPU密集型(xing)(xing)任務(wu),IO密集型(xing)(xing)任務(wu)和混合型(xing)(xing)任務(wu)。
 &nbsp;  2. 任務的優先級:高,中和(he)低(di)。
    3. 任(ren)務(wu)的執行時(shi)間:長,中和短。
    4. 任務的依(yi)賴(lai)性:是否依(yi)賴(lai)其他系(xi)統(tong)資(zi)源(yuan),如數據庫連(lian)接。
    CPU密集(ji)型(xing)任務:配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。
    IO密集(ji)型任務:由于需要等待IO操作,線程并不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。
    混合型的任務:如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高于串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。
    我(wo)們可以通過Runtime.getRuntime().availableProcessors()方法(fa)獲得(de)當前設備(bei)的CPU個數。
    優(you)先級(ji)(ji)不(bu)同(tong)的(de)(de)任(ren)(ren)務可以使用優(you)先級(ji)(ji)隊(dui)列PriorityBlockingQueue來處理。它可以讓優(you)先級(ji)(ji)高的(de)(de)任(ren)(ren)務先得到執行,需要注(zhu)意的(de)(de)是如(ru)果一直有優(you)先級(ji)(ji)高的(de)(de)任(ren)(ren)務提交(jiao)到隊(dui)列里,那么優(you)先級(ji)(ji)低(di)的(de)(de)任(ren)(ren)務可能(neng)永遠不(bu)能(neng)執行。
    執(zhi)行時間(jian)不同的任務(wu)(wu)可以交給(gei)不同規模的線程池(chi)來處理,或(huo)者(zhe)也可以使(shi)用優先級隊列,讓執(zhi)行時間(jian)短的任務(wu)(wu)先執(zhi)行。
    依賴數(shu)據庫連接(jie)池的任務,因為(wei)線程(cheng)提交SQL后需要等(deng)待(dai)數(shu)據庫返回結果,如果等(deng)待(dai)的時(shi)(shi)間(jian)越(yue)長CPU空閑時(shi)(shi)間(jian)就(jiu)越(yue)長,那么線程(cheng)數(shu)應該設(she)置越(yue)大,這樣(yang)才能(neng)更好(hao)的利用CPU。
    建議使用有界隊列,有界隊列能(neng)增加系(xi)統的穩定性和預警能(neng)力,可以(yi)根據需要設大一點
posted @ 2017-09-28 21:50  ^_TONY_^  閱讀(1213)  評論(0)    收藏  舉報