CountDownLatch和CyclicBarrier区别
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,把它们放在一起介绍是因为它们之间有点像,又很不同。
CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点。
CyclicBarrier 是一组线程之间互相等待,只要有一个线程没有完成,其他线程都要等待,更像是你和你老婆不离不弃。
对于CountDownLatch来说,重点是那一个线程, 是它在等待,而另外那N个线程在把“某个事情”做完之后可以继续等待,可以终止。
而对于CyclicBarrier来说,重点是那**一组(N个)**线程,他们之间任何一个没有完成,所有的线程都必须等待。
除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。
但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
CountDownLatch使用
我们假设旅游团有3个游客,团长要等到游客都到齐了之后才能出发去下一个景点
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CountDownLatch latch = new CountDownLatch(3); ExecutorService executor = Executors.newFixedThreadPool(3);
IntStream.rangeClosed(1, 3).forEach(i -> { executor.execute(() -> { System.out.println("游客" + i + "到了集合地点"); latch.countDown(); }); });
latch.await();
System.out.println("所有人员都已经到齐了,出发去下个景点");
|
首先创建了一个 CountDownLatch,计数器的初始值等于 3,之后每当一个团员到达就对计数器执行减 1操作(latch.countDown()实现)。在主线程中,我们通过调用 latch.await() 来实现对计数器等于 0 的等待。
CountDownLatch源码分析
CountDownLatch是通过AQS来实现的,这里的计数器的值实际上是AQS中State的值,也就是我们的state的值会被初始化为我们传入的值。
当我们调用coutnDown的时候实际上是减去state的值(计数器减1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public void countDown() { sync.releaseShared(1); }
private static final class Sync extends AbstractQueuedSynchronizer {
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
}
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
调用await的时候,会判断当前state的值是否等于0,如果等于0,就代表其他线程已经执行完成了,可以接着往下执行。否则就阻塞当前线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
|
至此逻辑就很清晰了。 当我们调用countDown()方法的时候,会将AQS中的state的值减去1,当state值变为0的时候会唤醒CLH队列中阻塞的线程。当我们调用await()方法的时候,会判断state的值是否等于0,如果等于0则继续往下执行。如果不等于0则线程被阻塞,等待被唤醒(countDown()方法中会唤醒)。
CyclicBarrier使用
周末的时候,我和我老婆一起去吃烧烤,用CyclicBarrier来描述是这样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
ExecutorService executor = Executors.newFixedThreadPool(1);
CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> System.out.println("到齐了,出发吃烧烤")); });
new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("半个小时后,think123媳妇儿准备好了"); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start();
new Thread(() -> { System.out.println("think123准备好了"); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start();
|
CyclicBarrier源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class CyclicBarrier {
private static class Generation { boolean broken = false; }
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count; }
|
上面展示了CyclicBarrier的重要属性,它的属性名称还是挺有趣的。
为了实现一组线程相互等待,使用到了lock和condition,而parties则是表明一组线程的个数(计数器),count表示当前有多少个线程还未执行完成。 barrierCommand表示当所有线程都就绪时,需要回调的函数。而generation是为了实现计数器循环利用,你可以理解为版本。
接下来我们看看await方法是如何实现的,下面的代码我只保留了核心代码逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public int await() { return dowait(false, 0L); } private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
lock.lock(); final Generation g = generation;
int index = --count;
if (index == 0) { final Runnable command = barrierCommand;
if (command != null) command.run();
ranAction = true; nextGeneration(); return 0; }
for (;;) { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); if (g != generation) return index; } lock.unlock();
}
private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
|
await的逻辑很简单,主要就是判断当前线程是否是最后一个执行完成的线程,如果是最后一个,则需要执行回调函数,然后唤醒其他所有被阻塞的线程并重置计数器。
如果不是最后一个执行完的,则阻塞当前线程。
尤其需要注意的CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且是同步调用回调函数,调用完之后,才会开始第二回合。
所以回调函数如果不另开一线程异步执行,就起不到性能优化的作用了。
写到最后
你告诉我,要是你你怎么选?
如果觉得好看,记得三连哟。