Java多線程:隊(dui)列與阻塞隊(dui)列
1. 什么(me)是阻塞(sai)隊列
阻(zu)(zu)塞(sai)(sai)隊列(lie)(BlockingQueue)是 Java 5 并發新(xin)特性(xing)中(zhong)的(de)(de)內容,阻(zu)(zu)塞(sai)(sai)隊列(lie)的(de)(de)接口是 java.util.concurrent.BlockingQueue,它提供了(le)兩個(ge)附加操作(zuo):當(dang)隊列(lie)中(zhong)為空時,從隊列(lie)中(zhong)獲取元素(su)(su)的(de)(de)操作(zuo)將被阻(zu)(zu)塞(sai)(sai);當(dang)隊列(lie)滿時,向隊列(lie)中(zhong)添加元素(su)(su)的(de)(de)操作(zuo)將被阻(zu)(zu)塞(sai)(sai)。
阻塞(sai)隊列常用(yong)于生(sheng)產者(zhe)和(he)消費者(zhe)的場(chang)景,生(sheng)產者(zhe)是(shi)往(wang)隊列里添加元(yuan)素(su)的線程(cheng),消費者(zhe)是(shi)從(cong)隊列里拿元(yuan)素(su)的線程(cheng)。阻塞(sai)隊列就是(shi)生(sheng)產者(zhe)存放元(yuan)素(su)的容器。
阻塞隊列(lie)提供了四種操作(zuo)方法:

- 拋(pao)(pao)出(chu)(chu)異(yi)常(chang):當隊列(lie)滿時(shi),再(zai)向隊列(lie)中插入元素,則(ze)會(hui)拋(pao)(pao)出(chu)(chu)IllegalStateException異(yi)常(chang)。當隊列(lie)空時(shi),再(zai)向隊列(lie)中獲(huo)取元素,則(ze)會(hui)拋(pao)(pao)出(chu)(chu)NoSuchElementException異(yi)常(chang)。
- 返回(hui)特殊值:當(dang)隊(dui)列(lie)滿(man)時,向(xiang)隊(dui)列(lie)中(zhong)添加元(yuan)(yuan)素,則(ze)返回(hui)false,否則(ze)返回(hui)true。當(dang)隊(dui)列(lie)為空時,向(xiang)隊(dui)列(lie)中(zhong)獲取元(yuan)(yuan)素,則(ze)返回(hui)null,否則(ze)返回(hui)元(yuan)(yuan)素。
- 一直(zhi)阻(zu)(zu)塞(sai):當(dang)(dang)阻(zu)(zu)塞(sai)隊列(lie)滿時,如(ru)果生產者向(xiang)隊列(lie)中插(cha)入元素,則隊列(lie)會一直(zhi)阻(zu)(zu)塞(sai)當(dang)(dang)前線(xian)程(cheng),直(zhi)到隊列(lie)可用或響(xiang)應中斷退(tui)出(chu)。當(dang)(dang)阻(zu)(zu)塞(sai)隊列(lie)為空時,如(ru)果消費者線(xian)程(cheng)向(xiang)阻(zu)(zu)塞(sai)隊列(lie)中獲(huo)取數據,則隊列(lie)會一直(zhi)阻(zu)(zu)塞(sai)當(dang)(dang)前線(xian)程(cheng),直(zhi)到隊列(lie)空閑或響(xiang)應中斷退(tui)出(chu)。
- 超(chao)時(shi)退(tui)出(chu):當隊列滿時(shi),如果(guo)生產(chan)線(xian)程向隊列中添(tian)加元素,則隊列會阻塞(sai)生產(chan)線(xian)程一(yi)段時(shi)間(jian),超(chao)過(guo)指(zhi)定(ding)的時(shi)間(jian)則退(tui)出(chu)返回false。當隊列為空時(shi),消費線(xian)程從隊列中移除元素,則隊列會阻塞(sai)一(yi)段時(shi)間(jian),如果(guo)超(chao)過(guo)指(zhi)定(ding)時(shi)間(jian)退(tui)出(chu)返回null。
2. Java中(zhong)的阻塞(sai)隊列
JDK7提供了(le)7個阻(zu)塞隊列。分別是

下面(mian)分別簡單介紹一下:
-
ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】
- LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度為Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
- PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
- DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
- SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
- LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當于其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
-
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程并發時,可以將鎖的競爭最多降到一半。
Java中線程(cheng)安(an)全的(de)(de)(de)內(nei)置(zhi)隊(dui)列(lie)(lie)還(huan)有兩個:ConcurrentLinkedQueue和(he)LinkedTransferQueue,它們使用(yong)了(le)(le)CAS這(zhe)種(zhong)無(wu)(wu)鎖的(de)(de)(de)方式來實現(xian)了(le)(le)線程(cheng)安(an)全的(de)(de)(de)隊(dui)列(lie)(lie)。無(wu)(wu)鎖的(de)(de)(de)方式性(xing)能(neng)好,但是(shi)(shi)隊(dui)列(lie)(lie)是(shi)(shi)無(wu)(wu)界的(de)(de)(de),用(yong)在生(sheng)產系(xi)統(tong)中,生(sheng)產者生(sheng)產速度過(guo)快(kuai),可能(neng)導致(zhi)內(nei)存溢出。有界的(de)(de)(de)阻塞隊(dui)列(lie)(lie)ArrayBlockingQueue和(he)LinkedBlockingQueue,為(wei)了(le)(le)減少(shao)Java的(de)(de)(de)垃圾回收對系(xi)統(tong)性(xing)能(neng)的(de)(de)(de)影響,會盡量(liang)選擇(ze)array/heap格式的(de)(de)(de)數據結構。這(zhe)樣的(de)(de)(de)話就(jiu)只(zhi)剩(sheng)下ArrayBlockingQueue。(先(xian)埋(mai)個坑在這(zhe)兒(er),近來接觸到(dao)了(le)(le)disruptor,感覺妙不可言。)
3. 阻塞隊列(lie)的實現(xian)原理(li)
這里(li)分(fen)析下ArrayBlockingQueue的實現(xian)原理。
構造方法:
ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
ArrayBlockingQueue提供了(le)三種構造(zao)方(fang)法(fa),參數含義(yi)如下:
- capacity:容量,即隊列大小。
- fair:是否公平鎖。
- c:隊列初始化元(yuan)素,順序按(an)照Collection遍歷順序。
插入元素:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
從(cong)源(yuan)碼可以看出,生(sheng)產(chan)者(zhe)首先獲得鎖(suo)lock,然后(hou)(hou)判斷(duan)隊列是否已經滿(man)了,如果滿(man)了,則等待,直到被喚(huan)醒,然后(hou)(hou)調(diao)用enqueue插入(ru)元(yuan)素。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
以上是enqueue的實現,實現的操(cao)作是插入(ru)元素到一個環(huan)形數組,然后喚(huan)醒notEmpty上阻塞的線(xian)程。
獲取元素:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
從源碼可以看出,消費者首先(xian)獲(huo)得鎖(suo),然后判斷隊(dui)列是(shi)否(fou)為空,為空,則等待(dai),直(zhi)到被喚醒,然后調用(yong)dequeue獲(huo)取(qu)元素。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
以(yi)上是dequeue的實現(xian),獲取環形數組當前(qian)takeIndex的元素,并及(ji)時將當前(qian)元素置為(wei)null,設置下一次takeIndex的值takeIndex++,然后(hou)喚醒notFull上阻塞(sai)的線(xian)程。
還有其他方法offer(E e)、poll()、add(E e)、remove()、 offer(E e, long timeout, TimeUnit unit)等的實現,因為常用take和put,這些方法就不一一贅述了。
4. 阻塞隊列的基本使用
使(shi)用阻塞(sai)隊列實現生產(chan)者-消費者模式:
/**
* Created by noly on 2017/5/19.
*/
public class BlockingQueueTest {
public static void main (String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Consumer consumer = new Consumer(queue);
Producer producer = new Producer(queue);
producer.start();
consumer.start();
}
}
class Consumer extends Thread {
private ArrayBlockingQueue<Integer> queue;
public Consumer(ArrayBlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
Integer i = queue.take();
System.out.println("消(xiao)費者從隊列(lie)取(qu)出元(yuan)素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
private ArrayBlockingQueue<Integer> queue;
public Producer(ArrayBlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.put(i);
System.out.println("生產者向隊(dui)列插入元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
如(ru)果不使用阻(zu)塞(sai)隊列,使用Object.wait()和Object.notify()、非阻(zu)塞(sai)隊列實現生產者(zhe)-消(xiao)費者(zhe)模式(shi),考慮線程間(jian)的通訊,會非常(chang)麻煩。
參考資料(liao):
