Java多線程:線程池(chi)
一、 背景
二、線程池的架(jia)構

三、Executors
newFixedThreadPool(固定大小線(xian)程(cheng)池)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool(無(wu)界線(xian)程(cheng)池,可以進(jin)行自動線(xian)程(cheng)回收)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor(單個后(hou)臺線程)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
四、ExecutorService任務周期管理(li)接口
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 省略部分方法
}
submit() 與execute()區別
1、接收的參數不一樣
submit()可以接受(shou)runnable無返回值和callable有返回值
execute()接受runnable 無(wu)返回值
2、submit有返回值,而(er)execute沒有
Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion.
用到返回值的(de)(de)例(li)子,比(bi)如(ru)說我有(you)很多個做validation的(de)(de)task,我希望所有(you)的(de)(de)task執(zhi)行(xing)完(wan),然后每個task告(gao)訴我它的(de)(de)執(zhi)行(xing)結(jie)果,是成(cheng)功還是失敗,如(ru)果是失敗,原因是什么(me)。
3、submit方(fang)便Exception處(chu)理
There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don’t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task’s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.
意思就(jiu)是如果你在你的(de)task里會拋(pao)出(chu)checked或者unchecked exception,而你又希望外(wai)面的(de)調用者能夠(gou)感知(zhi)這些exception并(bing)做(zuo)出(chu)及時的(de)處理,那(nei)么就(jiu)需(xu)要(yao)用到submit,通過捕獲Future.get拋(pao)出(chu)的(de)異常(chang)。
public class ExecutorServiceTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); // 創建10個任務并執行 for (int i = 0; i < 10; i++) { // 使用ExecutorService執行Callable類(lei)型的(de)任務,并將結(jie)果保存(cun)在future變量中 Future<String> future = executorService.submit(new TaskWithResult(i)); // 將任(ren)務執行(xing)結果(guo)存儲到(dao)List中 resultList.add(future); } executorService.shutdown(); // 遍(bian)歷(li)任務的結果 for (Future<String> fs : resultList) { try { System.out.println(fs.get()); // 打(da)印各個線程(任務)執行的結(jie)果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally {
executorService.shutdownNow();
} } } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } /** * 任務的具體(ti)過程,一旦任務傳給ExecutorService的submit方法(fa)(fa),則該方法(fa)(fa)自動在(zai)一個(ge)線程上執行。 * * @return * @throws Exception */ public String call() throws Exception { System.out.println("call()方法被自動(dong)調用(yong),干活!!! " + Thread.currentThread().getName()); if (new Random().nextBoolean()) throw new TaskException("Meet error in task." + Thread.currentThread().getName()); // 一個模擬耗(hao)時(shi)的操(cao)作(zuo) for (int i = 999999999; i > 0; i--) ; return "call()方(fang)法被自動調(diao)用(yong),任務的(de)結(jie)果是(shi):" + id + " " + Thread.currentThread().getName(); } } class TaskException extends Exception { public TaskException(String message) { super(message); } }
五、ThreadPoolExecutor提交任務流程

1. private final BlockingQueue<Runnable> workQueue; // 阻塞(sai)隊列
2. private final ReentrantLock mainLock = new ReentrantLock(); // 互斥鎖
3. private final HashSet<Worker> workers = new HashSet<Worker>();// 線(xian)程集(ji)合.一(yi)個Worker對應一(yi)個線(xian)程
4. private final Condition termination = mainLock.newCondition();// 終(zhong)止條(tiao)件
5. private int largestPoolSize; // 線程池中線程數量曾經達(da)到過的(de)最大值。
6. private long completedTaskCount; // 已完成任務數量
7. private volatile ThreadFactory threadFactory; // ThreadFactory對象,用于創(chuang)建(jian)線程。
8. private volatile RejectedExecutionHandler handler;// 拒絕策(ce)略的處理(li)句柄
9. private volatile long keepAliveTime; // 線程池維護線程所允許的空閑時間
10. private volatile boolean allowCoreThreadTimeOut;
11. private volatile int corePoolSize; // 線(xian)程池維護線(xian)程的最小數量,哪怕是空閑的
12. private volatile int maximumPoolSize; // 線程池維護的最大線程數量
1、 corePoolSize與maximumPoolSize
- 如果(guo)當前(qian)線(xian)程(cheng)池中的線(xian)程(cheng)數目<corePoolSize,則(ze)每來一個任(ren)務(wu)(wu),就會創建一個線(xian)程(cheng)去(qu)執行(xing)這個任(ren)務(wu)(wu);
- 如(ru)果(guo)當前線(xian)程池中(zhong)的線(xian)程數目>=corePoolSize,則每來一個任(ren)務,會嘗試將(jiang)(jiang)其(qi)添加到任(ren)務緩存隊列當中(zhong),若添加成(cheng)功(gong),則該(gai)任(ren)務會等(deng)待空閑線(xian)程將(jiang)(jiang)其(qi)取出去(qu)執行(xing);當隊列滿時才創建新線(xian)程去(qu)處(chu)理請求;
- 如果當前(qian)線程池中的線程數目達到maximumPoolSize,即(ji)隊列已經(jing)滿了,則通過handler所指定的任務拒絕(jue)策略(lve)來處理新請求;
- 如果線(xian)(xian)程(cheng)(cheng)池中的線(xian)(xian)程(cheng)(cheng)數(shu)量大于(yu)corePoolSize時(shi),并且(qie)某線(xian)(xian)程(cheng)(cheng)空閑時(shi)間超過keepAliveTime,線(xian)(xian)程(cheng)(cheng)將(jiang)被終止,直至線(xian)(xian)程(cheng)(cheng)池中的線(xian)(xian)程(cheng)(cheng)數(shu)目不大于(yu)corePoolSize;
- 1. 核心線(xian)程corePoolSize > 任務(wu)隊列workQueue > 最大(da)線(xian)程maximumPoolSize,如(ru)果三者都滿了,使用handler處理被拒絕(jue)的(de)任務(wu)。
- 2. 當池中的(de)(de)線程數(shu)大于(yu)corePoolSize的(de)(de)時(shi)候,多余的(de)(de)線程會等待keepAliveTime長的(de)(de)時(shi)間(jian),如果無請(qing)求(qiu)可處理(li)就自(zi)行銷(xiao)毀。
2、 workQueue
3、ThreadFactory
public interface ThreadFactory { Thread newThread(Runnable r); }
