ReetrantLock实现源码

ReetrantLock是基于AQS(AbtractQueuedSynchronizer)实现的。本文将逐渐展开分析ReetrantLock的实现机制以及使用场景。

AQS实现源码

AQS提供了一个框架,用于实现阻塞锁和相关的同步器(semaphores, events等),它们依赖于先入先出(FIFO)等待队列。这个类被设计成大多数类型的同步器的基础,它们依赖于单个原子{@ code int}值来表示同步状态。子类必须定义更改此状态的受保护方法(重写相关的方法),并重新定义该状态值在实现类中的定义。考虑到这些,这个类中的其他方法需要控制所有队列的进出和阻塞机制。子类可以维护其他的状态字段,但是只有通过使用方法{@ link # get state}、{@ link # set state}和{@ link # compare和set state}的原子更新{@ code int}值,才会对同步进行跟踪。

AQS框架要么支持默认的独占模式,要么支持共享模式。当以独占模式获得时,试图通过其他线程获取的尝试无法成功。多个线程的共享模式可以(但不需要)成功。共享模式获得成功时,下一个等待线程(如果存在)还必须确定它是否也能获得。在不同模式中等待的线程共享相同的FIFO队列。通常,实现子类只支持其中一种模式,但都可以在{@ link Read Write Lock}中发挥作用。支持唯一或仅共享模式的子类不必定义支持未使用模式的方法。本文分析的ReetrantLock是一种独占模式的实现,同一时间只能有一个线程执行同步代码块,其它线程将被阻塞在队列中,等待state也就是锁释放之后从队列中释放出来执行代码块。

AQS的使用

使用这个类作为一个同步器的基础,通过state的暴露方法来同步修改状态,同时需要定义下列方法:

  • {@link #tryAcquire},获取锁,cas修改state状态。
  • {@link #tryRelease},释放锁,修改state状态同时将线程移除队列。
  • {@link #tryAcquireShared},获取共享锁。
  • {@link #tryReleaseShared},释放共享锁。
  • {@link #isHeldExclusively},锁是否被独占。
1
2
3
4
5
6
7
8
9
10
 Acquire:
while (!tryAcquire(arg)) {
<em>enqueue thread if it is not already queued</em>;
<em>possibly block current thread</em>;
}
Release:
if (tryRelease(arg))
<em>unblock the first queued thread</em>;
</pre>
(Shared mode is similar but may involve cascading signals.)

这里给一个独占锁的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

class Mutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}

// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// Provides a Condition
Condition newCondition() { return new ConditionObject(); }

// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}}

AQS队列同步器提供的接口分析

同步器提供的可供重写的方法:

方法名称 描述
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回>=0的值,表示获取成功,反之获取失败。
protected boolean tryReleaseShared(int arg) 共享式释放同步状态。
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般实现会判断当前线程是否独占锁。
protected boolean tryAcquire(int arg) 独占式获取锁,该方法为获取锁操作方法。公平锁和非公平锁有不同的实现,非公平锁直接调用同步器底层实现。公平锁会先判断当前队列是否有线程在等待锁。
protected boolean tryRelease(int arg) 独占式释放同步锁,其它队列中等待的线程将有机会获取到锁。

我们可以根据这些可以重写的方法去自定义同步组件,类似公平锁和非公平锁的定义,实际上是对FairSync和NonfairSync对Sync的tyAcquire方法进行了不同的实现。另外队列同步器还提供了一些模板方法:

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果获取同步状态成功,进入同步块否则进入等待队列,然后开始循环等待当前线在等待队列中获取到锁,只会记录中断状态不会触发中断。这个方法会调用到tryAcquire方法,可以被自定义同步器重写。
boolean tryAccquireNano(int arg,long nanos) 增加了超时时间限制,如果当前线程在限制的时间内没有获取到锁的话就返回false,否则返回true。
void accqureInrruptibly(int arg) 支持中断模式,当前在,相较于acquire方法该方法同样会进行循环等待队列中获取到锁。但是当它可以响应中断,当其他线程中断当前线程的时候,会抛出InterruptException异常。
Collection getQueuedThreads() 获取等待在同步队列上的线程集合。
boolean tryAccquireSharedNano(int arg,long nanos) 同样的,我们为该方法在共享锁的前提下增加了超时时间。
void acquireShared(int arg) 共享式获取同步状态,主要区别于独占式获取同步状态,这种方式的特别之处在于同一时间可以有多个线程获得同步状态。
void accqureSharedInrruptibly(int arg) 该方法响应中断。
void releasShared(int arg) 共享锁释放同步状态。

队列同步器的方法主要分为三类:

  • 独占式获取与释放同步状态
  • 共享式获取与释放同步状态
  • 查询同步队列中线程的等待情况

同样的,我们提供一个共享锁的实例,类似java.util.concurrent.CountDownLatch CountDownLatch的一种实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() { return getState() != 0; }

protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}

protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}

private final Sync sync = new Sync();
public boolean isSignalled() { return sync.isSignalled(); }
public void signal() { sync.releaseShared(1); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}}

ReetrantLock主要方法

我们通常会用定义一个ReetrantLock,通过调用lock方法和unlock方法来对同步代码块进行加锁和释放锁。我们先来看一下lock方法的调用栈。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

可以看到ReetrantLock提供了两种构造方法,一种会默认初始化一个非公平锁,另一个可以指定公平锁或者非公平锁。公平锁和非公平锁中都有对应的加锁和释放锁实现。观察调用栈可以看到两者的区别。

lock方法调用公平锁和非公平锁

我们可以通过源代码看到公平锁和非公平锁的实现都继承了Sync类,实际上这个类是对AQS类的一种独占锁的实现。所以我们最终实现的ReetrantLock是独占的,可重入的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))//非公平锁调用时就会给一次竞争的机会,如果后来的线程刚好在锁释放的时候去获取锁,那么即使等待队列中还有其它的线程也会被直接忽略,这样的话对队列中等待的线程是不公平的。
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);//实际上是直接调用了AQS中的nonfairTryAcquire(acquires)方法。这个方法是不会维持一个等待竞争锁的线程队列的。
}
}

再看下公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);//公平锁线程在获取锁的时候直接加入队列,等待前面的线程被执行完之后顺序调用。
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

我们在进入acquire方法,可以看到非公平锁实际上是直接调用的Sync中nonfairTryAcquire的实现,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//ReetrantLock是可重入锁,同一个线程可以再次进入,但是会将state作累加。
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

而公平锁调用的是自己实现的tryAcquire(int acquires)方法,上面关于公平锁的静态类中可以看到。对比之后发现公平锁在判断的时候多了一句:

1
2
3
4
5
6
7
if (c == 0) {
if (!hasQueuedPredecessors() &&//这个增加的判断也是保证公平
compareAndSetState(0, acquires)) {//这里是通过cas去写state的状态也就是去占用锁
setExclusiveOwnerThread(current);//成功获取锁之后记录独占锁线程为当前线程
return true;
}
}

hasQueuedPredecessors()的实现如下,如果当前线程在队列中还有前置线程返回true,不获得准入条件。如果当前线程处于队列的头部,或者队列为空,则返回false,获得准入条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

总结一下,公平锁和非公平锁相比,保证了FIFO的原则,但是代价是进行了大量的线程切换。而非公平锁虽然可能造成线程 “饥饿”,但是极少的线程切换,同样也保证了更大的吞吐量。

tryLock()方法和lock方法

看一下tryLock()方法,实际上调用Sync的nonfairTryAcquire方法,该方法相较于lock方法精简了一些。对比,一下,少了一段:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//这里如果获取失败会尝试将线程加入等待队列
selfInterrupt();
}
  • tryLock()方法默认是非公平方式去调用的,只是一次简单的cas加锁尝试。如果失败了线程不会再被执行,拥有再次竞争锁的机会。
  • lock()方法的非公平方式如果获取锁失败会将当前线程加入等待队列。如果失败了线程等待之后还有再次获得锁的机会。

这样public boolean tryLock(long timeout, TimeUnit unit),这个方法也就更容易理解了,增加了一个timeout时间。

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));//默认unit.toNanos(timeout)时间内超时,首先会将这个线程加入队列,然后进行循环自旋,如果时间内线程没有排到队首,会返回一个false。
}

sync.tryAcquireNanos(1, unit.toNanos(timeout)),该方法具体实现如下,咋一看代码很容易将其误认为是在时间段内不断cas自旋去获取锁,如果时间内失败返回false。实际上是会将当前的线程加入等待队列的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);//会将该线程作为一个node加入到等待队列。Node.EXCLUSIVE定义为独占模式。
boolean failed = true;
try {
for (;;) {//这里开始循环等待加入的线程排到队首,可能会在时间段内排到也可能排不到。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

下面是addWaiter(Node mode),就是生成一个独占的线程节点加入到等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

unlock方法释放锁

调用unlock方法,实际上调用的是AQS中的release(1)方法,这个只会把state的值-1,源代码如下:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

其中tryRelease(arg)方法执行如下:

1
2
3
4
5
6
7
8
9
10
11
12
13

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//如果当前state保留的值已经变为零,锁释放成功
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

InterruptedException可中断获取锁

ReetrantLock提供了一种可中断获取锁的方式,也是实现了定义在lock中的接口,该方法很容易理解,在获取锁的过程中会响应中断,那么这种方式和普通的lock方式有什么本质区别。我们看一下这种方式的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();//这里会响应中断,会直接抛异常给上层,在lock中只是维护了一个状态值,进行了状态值的修改,并不会跑出异常到上层。最终回返回false
}
} finally {
if (failed)
cancelAcquire(node);
}
}

结论:ReentrantLock的中断和非中断加锁模式的区别在于:线程尝试获取锁操作失败后,在等待过程中,如果该线程被其他线程中断了,它是如何响应中断请求的。lock方法会忽略中断请求,继续获取锁直到成功;而lockInterruptibly则直接抛出中断异常来立即dsa响应中断,由上层调用者处理中断。

总结

本文主要对ReetrantLock的源码进行了一些分析,ReetrantLock是基于AQS实现的一种独占,可重入的锁,可以自己的定义公平锁或者非公平锁。ReetrantLock通过维护AQS类内部的一个volatile state变量来记录锁的状态,通过cas去竞争,同时记录当前持有锁的线程。支持可重入,当当前持有该锁的线程尝试再次进入的时候会通过准入条件,并将state增加1。实际上公平锁和非公平锁主要是当前线程是否会进入线程等待的队列,AQS在底层维护了一个线程的等待队列,基于先进先出的原则去调度这些等待获取锁的队列。如果非公平锁的话直接调用lock去加锁,可能面临失败的情况,如果失败了,就是一次cas自旋失败,同步代码块并不会被执行。如果用trylock方法的话可以尝试增加一个超时时间,会将当前的线程加入到等待队列中然后在超市时间内等待被调度,这种场景在某些情况下还是很适用的。