ReetrantWriteReadLock读写锁

####ReetrantWriteReadLock读写锁

​ Java的同步器,都基于AQS自定义同步器,那么如果设计一个自定义同步器。需要去适应不同的场景,例如我们在讲到ReetrantLock的实现时,它是基于AQS实现的一个可重入,可中断的锁,获取到锁的线程可以在同步块进行竞争对象的读写操作。但是我们可以设想,如果当前请求的线程多为读操作,那么加锁,释放锁的操作就会非常频繁,A线程在读的时候,B线程只能循环等待A线程释放锁(实际上此时此刻并没有线程进行写操作,并不会造城脏读),这样的话实际上是极大的牺牲了程序响应读的性能。

等待/通知机制

​ 在介绍读写锁的实现之前,介绍下java的线程通信机制wait/notify机制。之前有一篇博客,对比了CountDownLatch和CyclicBarrier,其中一个区别便是CountDownLatch 的主线程需要所有等待子线程完成。而CyclicBarrier建立在等待/通知机制上,实现了线程在等待之后重新被唤醒。

​ 这里我们主要介绍synchronized+object的等待通知机制,建立在两个线程(等待线程和通知线程),可以类比为消费者线程和生产者线程。等待线程(消费者)在生产者完成生产操作之后从wait处继续执行。这里提供一种实现。定义一个消费者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package NotifyWait;

/**
* Created by yqz on 8/7/18.
*/
public class WaitThread implements Runnable {

String threadName;//线程名
Object object;//加锁
WaitThread(String threadName,Object object){
this.threadName=threadName;
this.object=object;
}

@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object) {
//看下是否执行了两次
System.out.println("消费者线程进入同步块,flag:"+NotifyWaitThread.flag);
while (!NotifyWaitThread.flag) {
try {
System.out.println(threadName+"消费者检测到生产者数据未生产完毕,开始等待....");
object.wait();//从同步队列中被唤醒的时候会重新从这里开始执行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(threadName+"消费者检测到生产者得数据已经生产完毕,开始执行生产者线程...");
}
}

定义一个生产者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package NotifyWait;

/**
* Created by yqz on 8/7/18. 通知线程(类比生产者,去通知消费者,线程已经执行完毕,获取到所要的结果)
* 通常在多个网络请求的过程中我们可以通过异步的方式分别去执行请求线程
* 线程之间如果有数据结果依赖可以基于这种模型去等待通知(消费者等待生产者的数据请求结果)
* 这里通过flag来模拟标示生产者是否已经将数据生产完毕
*/

public class NotifyThread implements Runnable {
String threadName;//线程名
Object object;//加锁
NotifyThread(String threadName,Object object){
this.threadName=threadName;
this.object=object;
}
@Override
public void run() {
synchronized (object){
//执行数据的生产过程,过程结束之后进行内存刷新,更新数据刷新成功标志
System.out.println(threadName+"开始生产数据");
NotifyWaitThread.flag=true;
object.notifyAll();
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (object)
{
System.out.println(threadName+"生产者再次尝试获取锁");
}
}
}
}

开启主线程启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
package NotifyWait;

/**
* Created by yqz on 8/7/18.
*/
public class NotifyWaitThread {
protected static volatile boolean flag=false;
private static Object object=new Object();
public static void main(String[] args) {
new Thread(new WaitThread("WaitThread",object)).start();
new Thread(new NotifyThread("NotifyThread",object)).start();
}
}

主线程启动之后看下执行的结果,消费线程休眠10s,生产者线程先获得锁:

1
2
3
4
NotifyThread开始生产数据
NotifyThread生产者再次尝试获取锁
消费者线程进入同步块,flag:true
WaitThread消费者检测到生产者得数据已经生产完毕,开始执行生产者线程...

消费线程先获得锁:

1
2
3
4
5
消费者线程进入同步块,flag:false
WaitThread消费者检测到生产者数据未生产完毕,开始等待....
NotifyThread开始生产数据
NotifyThread生产者再次尝试获取锁
WaitThread消费者检测到生产者得数据已经生产完毕,开始执行生产者线程...

​ 这里需要注意的一点是,在java的等待通知机制中是必须结合synchronized关键字的(锁的获取和释放)。调用object.wait(),object.notify(),object.notifyAll(),这些方法需要先对object对象进行加锁。下面总结一个经典的等待通知机制范式,等待方(消费者)需要遵循如下原则:

1.获取对象的锁。

2.如果条件不满足,那么调用对象的wait方法,被通知到再次进行条件检查。

3.条件满足之后执行相应的逻辑。

1
2
3
4
5
6
synchronized(对象){
while(条件不满足){//volatile修饰,保证变量的线程可见性
对象.wait();
}
对应的处理逻辑
}

通知方需要遵循如下原则:

1.获得对象的锁。

2.改变判断条件。

3.通知所有等待在对象上的线程重新开始执行

1
2
3
4
sychronized(对象){
改变条件
对象.notiryAll();
}

我们可以看到java的等待通知机制完成了单个线程与单个线程的通信,控制代码的执行顺序。这在我们设计一个读写锁的时候,写的时候不能读,读的时候不能写,在写操作完成之后是可以通过这种方式通知读线程可以继续读的。但是基于这种范式的局限性,实际上通过sychronized关键字去修饰的代码块已经是串行执行。而我们的读写锁最终需要保证的是读锁(共享锁)多个线程可重入,而写锁(独占锁)不可重入。获取读锁需要判断当前没有写线程,获取写锁需要保证当前无读线程。

ReetrantWriteReadLock的应用

根据ReetrantWriteReadLock的特性,我们可以封装一个线程不安全的hashMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package ReetrantWriteAndReadLock;

import java.util.HashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 基于读写锁封装一个hashmap
* Created by yqz on 8/9/18.
*/
public class ReetrantWriteAndReadLockExp {
//还是定义为volatile吧,不然在ReetrantWriteAndReadLockExp.get触发类加载,创建对象还未完成,就已经被另一个线程空指针调用。volatile的内存屏障可以保证new成功一个对象,写成功之后才被读。
//这种模式在定义一个可能被并发获取的单例模式对象的时候,也需要定义为volatile,用内存屏障保证不会空指针。
private static volatile HashMap<String,String> hashMap=new HashMap<String, String>();
private static volatile ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static volatile Lock readLock=readWriteLock.readLock();
private static volatile Lock writeLock=readWriteLock.writeLock();

public static final Object get(String key){
readLock.lock();
try {
return hashMap.get(key);
}finally {
readLock.unlock();
}
}

public static final void put(String key,String value){
writeLock.lock();
try {
hashMap.put(key,value);
}finally {
writeLock.unlock();
}
}

}

读写锁的设计

​ 读写锁需要在一个int类型的变量(AQS中的state变量)上维护读锁的状态和写锁的状态。按位分割,高十六位表示读锁的状态,低十六位表示写锁的状态,这两部分在判断读锁和写锁状态的时候需要按位去取。这样只需要保证整个int变量的线程可见性即可。可以看下jdk源码是如何去获取读锁和写锁的,获取读锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.//如果写锁被其他线程持有,获取读锁失败
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.//读线程不阻塞,线程通过cas去累加读锁的count
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//如果当前写锁的状态不为零,并且不是当前的线程持有锁(那么该读锁即使是共享锁,也是获取失败的)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&//readerShouldBlock是判断当前锁是不是公平锁,如果是公平锁的话需要等待前置的线程队列,也就是会完全阻塞掉,循环等待调用。如果获取成功了读锁则增加读锁计数状态。
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//这里循环阻塞,循环获取读锁,循环cas。
return fullTryAcquireShared(current);
}

获取写锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.//如果读锁的数量不为零或者写锁的数量不为零并且不是当前的线程持有这个写锁。
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)//如果持有写锁的线程可重入最大次数,获取失败。
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//如果当前写锁的状态为零,并且是非公平锁,而且通过cas成功获取到了锁。那么将写锁的持有线程改写为当前线程。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

总结

​ 读写锁通过将读写分离,读锁作为一个共享锁,写锁作为一个独占锁,两个锁同时由一个volatile的int state维护,高位维护读锁,低位维护写锁。同时读锁和写锁都可以设置为公平和非公平。通过运用读写锁,可以在并发过程中保证读的正确性以及响应速度。读写锁非常适用于读的频率很高,但是写的频率很小的场景。