CountDownLatch和CyclicBarrier

CountDownLatch是基于AQS队列同步器实现的共享锁。CyclicBarrier是基于同步/等待队列实现。

CountDownLatch是单纯通过维持一个state的计数数值,这个数值在线程调用countDownLatch.countDown()的时候会减1,当计数器的值为零的时候,等待的线程会向前执行。假如我们需要保证在主线程继续往前执行的时候,子任务都已经被启动,可以通过CountDownLatch来进行控制。实际上和ReetrantLock是基于同样的原理实现,不过是实现和覆盖了tryAccquireShare和tryReleaseShare的区别。

CyclicBarrier则和CountDownLatch不相同,它是通过Lock和Condition等待/通知机制实现的。在CountDownLatch实现的多线程模型中,各个线程之间是通过同步状态进行通信。CyclicBarrier通过lock+Condition来维持一个同步队列和等待队列(通常将等待在同步块外面的队列称之为同步队列,将等待在某个对象上的队列称之为等待队列),Lock中的Condition对象可以有多个,也就是维护多个等待队列。我们观察CyclicBarrier的实现可以看到它的内部只维持了一个等待队列。

1
2
3
4
5
6
7
8
9
10
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();//维持了一个等待队列,可以通过trip.signal唤醒等待队列中所有的节点加入到同步队列中。
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

CountDownLatch的使用

我们通过一段代码查看CountDownLatch的主要使用方式,我们定义一个woker线程,作为子任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package CountDownLatch;

import java.util.concurrent.CountDownLatch;

/**
* Created by yqz on 4/3/18.
*/
public class Worker implements Runnable {
private String workerName;
private CountDownLatch countDownLatch;
public Worker(String workerName,CountDownLatch countDownLatch){
this.workerName=workerName;
this.countDownLatch=countDownLatch;
}
@Override
public void run() {
doWorker();
}
private void doWorker(){
try {
Thread.sleep(5000);

} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(this.getWorkerName()+"释放一次共享锁");
countDownLatch.countDown();
}
}

public String getWorkerName() {
return workerName;
}

public void setWorkerName(String workerName) {
this.workerName = workerName;
}
}

通过main方法调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package CountDownLatch;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;

/**
* Created by yqz on 4/3/18.
*/
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch countDownLatch=new CountDownLatch(5);
ArrayList<Thread> arrayListWorker=new ArrayList();
for(int i=0;i<5;i++){
arrayListWorker.add(new Thread(new Worker("worker_"+(i+1),countDownLatch)));
}
for (Thread worker:arrayListWorker){
worker.start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("countDownLatch结束");
}
}

输出的结果:

1
2
3
4
5
6
worker_1释放一次共享锁
worker_4释放一次共享锁
worker_2释放一次共享锁
worker_5释放一次共享锁
worker_3释放一次共享锁
countDownLatch结束

这样的使用方式我们可以看到CountDownLatch实际上保证了当前主线程会等待所有的线程都启动,我们可以通过countDownLatch.countDown()这个方法去动态更改state的数值,主线程只有在state的数值变成0之后才会选择继续向前执行。这样的处理似乎很像thread.join这种等待当前线程执行结束在执行主线程的方式,不同的是使用CountDownLatch你不一定要保证线程已经执行结束进入死亡状态,你可以在线程执行栈中通过countDownLatch.countDown()手动更改这个state的数值,只要这个数值为0,主线程便可以开始运行。

CountDownLatch源代码分析

CountDownLatch实现的几个主要方法,通过内部的sync重写了AQS队列同步器中的获取和释放共享锁的方法。我们通过CountDownLatch的构造方法去设置state的数值。getCount可以获得当前的state值。通过countDown去循环cas减少state的数值,最后的await方法实现循环等待state的值,直到变为零。这里看下await方法的主要源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

CyclicBarrier的使用

对比CountDownLatch的使用我们可以发现CountDownLatch的缺陷是线程是完全互不干扰的,也就是没有相互的协同工作。CyclicBarrier通过lock维护一个同步队列,再通过locks.newCondition获取到的condition对象维护一个等待队列,形成等待/通知机制,conditon的await和signal类似于object的wait和notify。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package CyclicBarrier;

import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* Created by yqz on 4/4/18.
*/
public class CyclicBarrierWorker implements Runnable {
CyclicBarrier cyclicBarrier;
int index;
public CyclicBarrierWorker(CyclicBarrier cyclicBarrier,int index) {
this.cyclicBarrier = cyclicBarrier;
this.index=index;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 用户开始抽奖,持续"+(index+1)+"秒" + new Date());
try {
Thread.sleep((index+1) * 1000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有用户抽奖完毕,颁发奖项。为用户" + Thread.currentThread().getName() + "颁奖。" + new Date());
}
}

main函数启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package CyclicBarrier;

import java.util.Date;
import java.util.concurrent.CyclicBarrier;

/**
* Created by yqz on 4/3/18.
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
System.out.println("5个用户开始抽奖" + new Date());
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for(int i = 0; i < 5; i ++) {
final int index = i;
new Thread(new Thread(new CyclicBarrierWorker(cyclicBarrier,index)), "Thread-" + i).start();
}
}
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
5个用户开始抽奖Wed Apr 04 14:31:18 CST 2018
Thread-0 用户开始抽奖,持续1秒Wed Apr 04 14:31:19 CST 2018
Thread-1 用户开始抽奖,持续2秒Wed Apr 04 14:31:19 CST 2018
Thread-2 用户开始抽奖,持续3秒Wed Apr 04 14:31:19 CST 2018
Thread-3 用户开始抽奖,持续4秒Wed Apr 04 14:31:19 CST 2018
Thread-4 用户开始抽奖,持续5秒Wed Apr 04 14:31:19 CST 2018
所有用户抽奖完毕,颁发奖项。为用户Thread-4颁奖。Wed Apr 04 14:31:24 CST 2018
所有用户抽奖完毕,颁发奖项。为用户Thread-0颁奖。Wed Apr 04 14:31:24 CST 2018
所有用户抽奖完毕,颁发奖项。为用户Thread-1颁奖。Wed Apr 04 14:31:24 CST 2018
所有用户抽奖完毕,颁发奖项。为用户Thread-2颁奖。Wed Apr 04 14:31:24 CST 2018
所有用户抽奖完毕,颁发奖项。为用户Thread-3颁奖。Wed Apr 04 14:31:24 CST 2018

CyclicBarrier源码分析

这里主要分析cyclicBarrier.await方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//所有进入同步队列获取锁
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;//计数器的值减1
if (index == 0) { // tripped,如果计数值已经为零代表所有的线程都已经进入了condition对象的等待队列,可以开始通知等待队列进入同步队列。
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();//内部会调用signalAll方法通知等待队列。
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await(); //放到Conditon的等待队列里,同时释放锁,让其他线程执行await方法
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)//说明执行了nextGeneration方法,计数器到了0
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// 唤醒Conditon等待队列上的所有线程
trip.signalAll();
// 计数器值变成原始值,重新开始
count = parties;
// generation被重新构造
generation = new Generation();
}

整个执行的过程,线程在调用了await方法之后,会通过lock方式上锁,然后线程将计数器的值减1。如果计数器的值到达0,说明conditon维护的等待队列已满,调用nextGeneration唤醒condition等待队列上的所有线程。加入到同步队列中,然后返回index,释放锁,之后同步队列中的节点线程就可以被唤醒了。如果线程的计数器没有到达0,那更简单,这个时候其他线程会调用condition的await方法进入等待队列。

总结

这里总结下两者的主要区别:

  1. 使用方式:CountDownLatch使用时主线程等待子线程将同步状态置为0,CyclicBarrier所有线程在等待队列上等待被通知。

  2. 实现方式:CountDownLatch底层使用的是共享锁,CyclicBarrier底层使用的是ReentrantLock和这个lock的条件对象Condition,通过等待通知机制(同步队列+等待队列)实现。

  3. 主要用途:CountDownLatch用完之后就不能再次使用了,CyclicBarrier用完之后可以通过reset再次使用。例如在多线程计算的时候如果失败了,CyclicBarrier可以重新计算。

  4. API:CyclicBarrier可以通过isBroken方法查看阻塞的线程是否被中断。