中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Java多線程:隊(dui)列與阻塞隊(dui)列

1. 什么(me)是阻塞(sai)隊列

阻(zu)(zu)塞(sai)(sai)隊列(lie)(BlockingQueue)是 Java 5 并發新(xin)特性(xing)中(zhong)的(de)(de)內容,阻(zu)(zu)塞(sai)(sai)隊列(lie)的(de)(de)接口是 java.util.concurrent.BlockingQueue,它提供了(le)兩個(ge)附加操作(zuo):當(dang)隊列(lie)中(zhong)為空時,從隊列(lie)中(zhong)獲取元素(su)(su)的(de)(de)操作(zuo)將被阻(zu)(zu)塞(sai)(sai);當(dang)隊列(lie)滿時,向隊列(lie)中(zhong)添加元素(su)(su)的(de)(de)操作(zuo)將被阻(zu)(zu)塞(sai)(sai)。

阻塞(sai)隊列常用(yong)于生(sheng)產者(zhe)和(he)消費者(zhe)的場(chang)景,生(sheng)產者(zhe)是(shi)往(wang)隊列里添加元(yuan)素(su)的線程(cheng),消費者(zhe)是(shi)從(cong)隊列里拿元(yuan)素(su)的線程(cheng)。阻塞(sai)隊列就是(shi)生(sheng)產者(zhe)存放元(yuan)素(su)的容器。

阻塞隊列(lie)提供了四種操作(zuo)方法:

  • 拋(pao)(pao)出(chu)(chu)異(yi)常(chang):當隊列(lie)滿時(shi),再(zai)向隊列(lie)中插入元素,則(ze)會(hui)拋(pao)(pao)出(chu)(chu)IllegalStateException異(yi)常(chang)。當隊列(lie)空時(shi),再(zai)向隊列(lie)中獲(huo)取元素,則(ze)會(hui)拋(pao)(pao)出(chu)(chu)NoSuchElementException異(yi)常(chang)。
  • 返回(hui)特殊值:當(dang)隊(dui)列(lie)滿(man)時,向(xiang)隊(dui)列(lie)中(zhong)添加元(yuan)(yuan)素,則(ze)返回(hui)false,否則(ze)返回(hui)true。當(dang)隊(dui)列(lie)為空時,向(xiang)隊(dui)列(lie)中(zhong)獲取元(yuan)(yuan)素,則(ze)返回(hui)null,否則(ze)返回(hui)元(yuan)(yuan)素。
  • 一直(zhi)阻(zu)(zu)塞(sai):當(dang)(dang)阻(zu)(zu)塞(sai)隊列(lie)滿時,如(ru)果生產者向(xiang)隊列(lie)中插(cha)入元素,則隊列(lie)會一直(zhi)阻(zu)(zu)塞(sai)當(dang)(dang)前線(xian)程(cheng),直(zhi)到隊列(lie)可用或響(xiang)應中斷退(tui)出(chu)。當(dang)(dang)阻(zu)(zu)塞(sai)隊列(lie)為空時,如(ru)果消費者線(xian)程(cheng)向(xiang)阻(zu)(zu)塞(sai)隊列(lie)中獲(huo)取數據,則隊列(lie)會一直(zhi)阻(zu)(zu)塞(sai)當(dang)(dang)前線(xian)程(cheng),直(zhi)到隊列(lie)空閑或響(xiang)應中斷退(tui)出(chu)。
  • 超(chao)時(shi)退(tui)出(chu):當隊列滿時(shi),如果(guo)生產(chan)線(xian)程向隊列中添(tian)加元素,則隊列會阻塞(sai)生產(chan)線(xian)程一(yi)段時(shi)間(jian),超(chao)過(guo)指(zhi)定(ding)的時(shi)間(jian)則退(tui)出(chu)返回false。當隊列為空時(shi),消費線(xian)程從隊列中移除元素,則隊列會阻塞(sai)一(yi)段時(shi)間(jian),如果(guo)超(chao)過(guo)指(zhi)定(ding)時(shi)間(jian)退(tui)出(chu)返回null。

2. Java中(zhong)的阻塞(sai)隊列

JDK7提供了(le)7個阻(zu)塞隊列。分別是

下面(mian)分別簡單介紹一下:

  1. ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】

  2. LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度為Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
  3. PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
  4. DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
  5. SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
  6. LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當于其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
  7. LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程并發時,可以將鎖的競爭最多降到一半。

Java中線程(cheng)安(an)全的(de)(de)(de)內(nei)置(zhi)隊(dui)列(lie)(lie)還(huan)有兩個:ConcurrentLinkedQueue和(he)LinkedTransferQueue,它們使用(yong)了(le)(le)CAS這(zhe)種(zhong)無(wu)(wu)鎖的(de)(de)(de)方式來實現(xian)了(le)(le)線程(cheng)安(an)全的(de)(de)(de)隊(dui)列(lie)(lie)。無(wu)(wu)鎖的(de)(de)(de)方式性(xing)能(neng)好,但是(shi)(shi)隊(dui)列(lie)(lie)是(shi)(shi)無(wu)(wu)界的(de)(de)(de),用(yong)在生(sheng)產系(xi)統(tong)中,生(sheng)產者生(sheng)產速度過(guo)快(kuai),可能(neng)導致(zhi)內(nei)存溢出。有界的(de)(de)(de)阻塞隊(dui)列(lie)(lie)ArrayBlockingQueue和(he)LinkedBlockingQueue,為(wei)了(le)(le)減少(shao)Java的(de)(de)(de)垃圾回收對系(xi)統(tong)性(xing)能(neng)的(de)(de)(de)影響,會盡量(liang)選擇(ze)array/heap格式的(de)(de)(de)數據結構。這(zhe)樣的(de)(de)(de)話就(jiu)只(zhi)剩(sheng)下ArrayBlockingQueue。(先(xian)埋(mai)個坑在這(zhe)兒(er),近來接觸到(dao)了(le)(le)disruptor,感覺妙不可言。)

3. 阻塞隊列(lie)的實現(xian)原理(li)

這里(li)分(fen)析下ArrayBlockingQueue的實現(xian)原理。

構造方法:

ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);
ArrayBlockingQueue(int capacity, boolean fair, Collection<;? extends E> c)

 

ArrayBlockingQueue提供了(le)三種構造(zao)方(fang)法(fa),參數含義(yi)如下:

  • capacity:容量,即隊列大小。
  • fair:是否公平鎖。
  • c:隊列初始化元(yuan)素,順序按(an)照Collection遍歷順序。

插入元素

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

 

從(cong)源(yuan)碼可以看出,生(sheng)產(chan)者(zhe)首先獲得鎖(suo)lock,然后(hou)(hou)判斷(duan)隊列是否已經滿(man)了,如果滿(man)了,則等待,直到被喚(huan)醒,然后(hou)(hou)調(diao)用enqueue插入(ru)元(yuan)素。

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

 

以上是enqueue的實現,實現的操(cao)作是插入(ru)元素到一個環(huan)形數組,然后喚(huan)醒notEmpty上阻塞的線(xian)程。

獲取元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

 

從源碼可以看出,消費者首先(xian)獲(huo)得鎖(suo),然后判斷隊(dui)列是(shi)否(fou)為空,為空,則等待(dai),直(zhi)到被喚醒,然后調用(yong)dequeue獲(huo)取(qu)元素。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

 

以(yi)上是dequeue的實現(xian),獲取環形數組當前(qian)takeIndex的元素,并及(ji)時將當前(qian)元素置為(wei)null,設置下一次takeIndex的值takeIndex++,然后(hou)喚醒notFull上阻塞(sai)的線(xian)程。

還有其他方法offer(E e)poll()add(E e)remove()、 offer(E e, long timeout, TimeUnit unit)等的實現,因為常用take和put,這些方法就不一一贅述了。

4. 阻塞隊列的基本使用

使(shi)用阻塞(sai)隊列實現生產(chan)者-消費者模式:

/**
 * Created by noly on 2017/5/19.
 */
public class BlockingQueueTest {

    public static void main (String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

        Consumer consumer = new Consumer(queue);
        Producer producer = new Producer(queue);

        producer.start();
        consumer.start();
    }
}

class Consumer extends Thread {
    private ArrayBlockingQueue<Integer> queue;
    public Consumer(ArrayBlockingQueue<Integer> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        while(true) {
            try {
                Integer i = queue.take();
                System.out.println("消(xiao)費者從隊列(lie)取(qu)出元(yuan)素:" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Producer extends Thread {
    private ArrayBlockingQueue<Integer> queue;
    public Producer(ArrayBlockingQueue<Integer> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                queue.put(i);
                System.out.println("生產者向隊(dui)列插入元素:" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

如(ru)果不使用阻(zu)塞(sai)隊列,使用Object.wait()和Object.notify()、非阻(zu)塞(sai)隊列實現生產者(zhe)-消(xiao)費者(zhe)模式(shi),考慮線程間(jian)的通訊,會非常(chang)麻煩。

參考資料(liao):

    

    

    

    

posted @ 2017-09-28 16:27  ^_TONY_^  閱讀(2559)  評論(0)    收藏  舉報