ReadWriteLock
版本JDK7
看本文之前,需要了解 AQS 和 ReentrantLock。下文如果不做特别说明,AQS 都是指 AbstractQueuedSynchronizer。
- 1. 特性
- 2. 解读 ReentrantReadWriteLock
- 3. 使用ReentrantReadWriteLock
- 4. ReentrantLock 与 ReentrantReadWriteLock 之间的区别
1. 特性
ReadWriteLock 翻译过来就是读写锁。读写锁在 同一时刻 允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。
使用场景:读多写少的环境
ReentrantReadWriteLock 是目前官方 JDK 提供 ReadWriteLock 的唯一实现。
ReentrantReadWriteLock 的特性 | 说明 |
---|---|
公平性的选择 | 支持非公平(默认)和公平的锁获取方式,就吞吐量而言,非公平要优于公平 |
重进入 | 该锁支持重进入。以读写线程为例:读线程在获取了读锁之后,(获取读锁的线程)该线程能够再次获取读锁;而写线程在获取写锁之后,能够再次获取写锁,同时也能获取读锁。 |
锁降级 | 遵循下面的次序:1.获取写锁 2. 获取读锁 3.释放写锁。写锁就能降级成为读锁,反之不行。 |
ReentrantReadWriteLock 展示内部工作状态的方法
方法名称 | 描述 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,例如,仅一个线程, 它连续获取了 n 次读锁,那么占有读锁的线程数是 1,但该方法返回 n。 |
int getReadHoldCount() | 返回当前线程获取读锁的次数。使用 ThreadLocal 保存当前线程获取的次数。 |
boolean isWriteLock() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
2. 解读 ReentrantReadWriteLock
ReentrantReadWriteLock 是 Java 并发包里面提供的唯一的一个读写锁实现。
ReadWriteLock 提供了2个抽象方法 readLock() 和 writeLock() 用于获取读锁和写锁。
下面就让我们来看看这个复杂的锁。
2.1 ReentrantReadWriteLock 的结构
从上面的截图中可以看出,ReentrantReadWriteLock 中有很多的内部静态类。下面我们从这几个内部类来熟悉一下其内部结构。
2.1.1 构造方法
ReentrantReadWriteLock 提供了2个构造方法:
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
从上面的代码可以看出,这2个构造方法决定了 ReentrantReadWriteLock 的获取是否是公平性的,同时初始化了 ReadLock 和 WriteLock。
有趣的是,FairSync 与 NonfairSync 之间的唯一不同之处就是各自实现了父类 Sync 的 readerShouldBlock 和 writerShouldBlock 这两个抽象方法(暂时不必关心,后面后面会介绍)。
2.1.2 ReadLock 与 WriteLock
下面来看看 ReadLock 和 WriteLock 这两个类的结构。
注意:ReadLock 与 WriteLock 采用的是两种完全不同的 lock 方式。
锁 | 类型 |
---|---|
ReadLock | 共享模式 - acquireShared(1) |
WriteLock | 独占模式 - acquire(1) |
2.1.3 小结
-
Sync 是 AbstractQueuedSynchronizer 的子类,然后再衍生出了公平模式和非公平模式。
-
Sync 重写了 AbstractQueuedSynchronizer 的 tryRelease、tryAcquire、tryReleaseShared、tryAcquireShared 和 isHeldExclusively 这5个方法。Sync 既能提供独占式获取与释放锁的功能,也拥有共享式获取与释放锁的功能。
-
ReadLock 和 WriteLock 的内部方法几乎都是由 Sync 来实现的。但是 ReadLock 和 WriteLock 分别继承 Sync 的不同获取锁的方式:ReadLock - 共享式;WriteLock - 独占式。
-
ReadLock 和 WriteLock 一样,都是有公平模式和非公平模式。
2.2.1 Sync - 读、写的同步状态获取方式
前面我们介绍过 Sync 的结构,知道 Sync 是同时拥有独占锁和共享锁的。但是 Sync 是如何快速确定读和写的各自的状态呢?答案就是通过位运算。
abstract static class Sync extends AbstractQueuedSynchronizer {
... ...
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// MAX_COUNT == 2^16 - 1,即读写锁各自最大的同步状态数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 转换成二进制 == 00000000 00000000 11111111 11111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读的同步状态为高16位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写的同步状态的第16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
... ...
}
假设当前同步状态为S,
- 读的同步状态为:S
>>>
0x0000FFFF - 写的同步状态为:S & 0x0000FFFF
2.2.2 Sync - 独占式获取与释放的具体实现
下面可能会出现独占锁和写锁这两个词,如果不做特殊说明,这两个就是一个意思。
- 1.独占锁的获取(写锁的获取)
protected final boolean tryAcquire(int acquires) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取当前的同步状态 int c = getState(); // 计算当前写锁的个数 int w = exclusiveCount(c); if (c != 0) { /** * 1. c != 0 && w == 0:说明这个时候拥有读锁,获取写锁失败。 * 此时是不能够获取写锁的,这也证实 ReentrantReadWriteLock 是不支持锁升级的。 * 2. c != 0 && w != 0 && current != getExclusiveOwnerThread():说明写锁被其他线程占用着, * 这也是独占式锁的体现,只有同一线程才可以重入锁。 */ if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果超过 MAX_COUNT 抛出异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 锁重入 setState(c + acquires); return true; } /** * writerShouldBlock 是抽象方法。 * 1. 如果是公平模式下,writerShouldBlock 调用的是 AQS 的 hasQueuedPredecessors() 来查询是否有任何线程等待获取锁的时间长于当前线程。 * hasQueuedPredecessors()返回true,说明存在线程等待时间长于当前线程。 * 2. 如果是非公平模式下,writerShouldBlock 返回 false,然后进行 CAS 尝试将写的状态更新,如果成功,该方法返回true;失败,返回false。 */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 将当前线程标识为拥有写锁的线程 setExclusiveOwnerThread(current); return true; }
从上面独占锁的获取我们可以归纳出读写锁不能别获取的几种情况:
- getState() != 0:
-
当前线程拥有读锁。
-
当前写锁被正被其他线程占用中。
-
- getState() == 0 且 此时的读写锁是公平模式 且 同步队列中已经有线程在等待获取锁,此时也不能获取写锁。
- getState() != 0:
-
2.独占式的释放(写锁的释放)
/** * @param releases, 要释放的资源的个数 */ protected final boolean tryRelease(int releases) { // 如果当前线程不是拥有写锁的线程,将抛出异常。 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 计算释放之后的同步状态值 int nextc = getState() - releases; // 新写锁的同步状态值如果为0,说明完全释放了。 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); // 更新同步状态值 setState(nextc); return free; }
对于独占锁的释放,你可能会注意到,只有将重入的锁全部释放掉才算是真正的释放了当前线程对写锁的占有。
hasQueuedPredecessors() 解析
前面解析写锁的获取过程的时候,我们可以看到,在公平模式下面,writerShouldBlock() 这个方法内部调用了 hasQueuedPredecessors()。下面,让我们来看看 hasQueuedPredecessors 是如何来判断当前同步队列中是否有线程等待时间长于当前线程。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
代码量很少,但是想要看明白却不太简单。
我归纳了一下 hasQueuedPredecessors 返回值所对应的几种情况:
-
返回 true,只有一种情况:当前同步队列中有等待时间比当前线程还要长的线程在,即存在排在当前线程前面的线程。
-
返回 false,有两种情况:
- 同步队列中没有其他比当前线程等待的久的线程。
- 当前的线程为同步队列中的 head。
简单画了一个图来表示:
2.2.3 Sync - 共享式获取与释放锁的具体实现
如果不做特殊说明,共享锁与读锁代表同一个意思。
-
1.共享式获取锁(读锁的获取)
tryAcquireShared(int) > 0 说明成功获取
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 写锁被其他线程占用,获取读锁失败 return -1; /** * 代码执行到这里有 2 种情况: * 1. 当前同步队列中不存在占用写锁的线程 * 2. 当前同步队列中存在写锁 且 拥有这个写锁的线程就是当前线程 */ // 计算读锁的个数 int r = sharedCount(c); /** * readerShouldBlock() 是一个抽象方法。 * 这个方法得依据现在的同步器是公平模式还是非公平模式。 * 1. 公平模式下 readerShouldBlock() 内部调用的是 hasQueuedPredecessors(),前面已经介绍过,这里就不再多赘述。 * 2. 非公平模式下 readerShouldBlock() 内部调用的是 apparentlyFirstQueuedIsExclusive(),后面会介绍。 */ if (!readerShouldBlock() && r < MAX_COUNT && // 高16位加1 compareAndSetState(c, c + SHARED_UNIT)) { // 进到这里,说明已经CAS成功将同步状态更新 /** 下面开始进行读锁成功获取之后的状态更新操作 **/ if (r == 0) { /** * r == 0,说明当前线程是第一个获取读锁的线程。 * 将 firstReader 设置为当前线程,持有锁数设置为 1。 */ firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // r != 0 且 当前线程为 firstReader,持有锁数加1。 firstReaderHoldCount++; } else { // cachedHoldCounter 是线程本地变量,每个线程都有一个自己的cachedHoldCounter对象。 // cachedHoldCounter 有两个成员变量: // 1. count,记录当前线程的持有读锁的数量。 // 2. tid,记录当前线程的ID HoldCounter rh = cachedHoldCounter; //下面两个if分支线程本地变量都是没有初始化过的 if (rh == null || rh.tid != current.getId()) /** * readHolds 是 ThreadLocal的一个对象。 * 如果当前线程的 cachedHoldCounter 为 null, * 或者,线程本地变量 cachedHoldCounter 储存的 tid 不是当前线程的ID, * 那么给当前线程初始化一个线程本地变量cachedHoldCounter。 */ cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // cachedHoldCounter已经初始化且 count == 0 的时候,将cachedHoldCounter设置为当前线程本地变量。 readHolds.set(rh); rh.count++; } return 1; } // 往下看 return fullTryAcquireShared(current); }
上面的代码中,要进入 if 分支,需要满足:readerShouldBlock() 返回 false 且 同步状态没有溢出(getState() < MAX_COUNT == false) 且 CAS 要成功。
反之,就是进入 fullTryAcquireShared(current) 这个方法。
-
readerShouldBlock() 返回 true的情况:
-
公平模式(FairSync)下,readerShouldBlock() 内部调用 hasQueuedPredecessors()这个方法。 hasQueuedPredecessors() 返回 true,意味着此时同步队列中有其他线程等待锁的时间长于当前线程,需要排队,当前线程是新来的,不能插队,所以不能直接获取锁。
-
非公平模式(NonfaireSync)下,readerShouldBlock() 内部调用了 apparentlyFirstQueuedIsExclusive()。当 apparentlyFirstQueuedIsExclusive() 返回 true,意味着此时head的后继节点是来获取写锁的。
-
-
compareAndSetState(c, c + SHARED_UNIT) CAS 失败,说明此时存在竞争,可能是和另一个读锁获取竞争,也可能是和另一个写锁获取操作竞争。
-
同步状态溢出(getState() > MAX_COUNT)
接着我们来看一下 fullTryAcquireShared(current) 的具体实现:
fullTryAcquireShared 看起来与 tryAcquireShared 的代码有重复的部分,但总体来说,它更加简单。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 进入无限循环 for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) // 其他线程占有写锁,当前线程获取读锁失败,当前线程进入同步队列排队。 return -1; } else if (readerShouldBlock()) { /** * 这个分支就是确保当前应该被阻塞的线程没有重复获取读锁。 * 进入到这里说明 2 点: * 1. exclusiveCount(c) == 0,说明没有线程占有写锁 * 2. readerShouldBlock() 返回ture: * 2.1 公平模式下:在同步队列中有其他线程比当前线程等待的时间还长 * 2.2 非公平模式下:同步队列中 head.next 存在且其对应的线程是打算获取写锁的,这个时候当前获取读锁的线程就应该被阻塞 */ if (firstReader == current) { // 直接跳出分支,进行尝试CAS 获取读锁 // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { // 获取线程本地变量HoldCounter rh = readHolds.get(); // 如果count == 0,说明HoldCounter是刚初始化出来的,将线程本地变量移除 if (rh.count == 0) readHolds.remove(); } } // 只要count为0,就老老实实进入同步队列排队 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 进入到这里,说明CAS成功了,当前线程成功获取了读锁 if (sharedCount(c) == 0) { // 此时没有获取写锁的线程存在 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 下面和tryAcquireShared(int unused)一致,不在赘述。 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
总结一下读锁的获取:
-
从读锁的获取细节中可以看出,写锁比读锁优先级更高。
// 写锁被其他线程占有这时,其他尝试获取读锁的线程被阻塞 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
在非公平模式下,如果head的后继节点为获取写锁的线程时,当前线程获取读锁的线程也需要进入同步队列中等待。
final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
-
对于同一个线程来说,读写锁支持锁的降级,即先拥有写锁,在获取读锁,最后释放写锁。
-
-
2.共享式释放锁(读锁的释放)
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 当前线程为fisrtReader时 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) // 获取当前线程的线程本地变量 rh = readHolds.get(); // 获取读锁数 int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } // 读锁数递减 --rh.count; } // 无限循环直到CAS成功 for (;;) { int c = getState(); // nextc就是当前读锁数减一 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // nextc == 0,不仅仅意味着读锁全部释放,也意味着写锁全部被释放。 return nextc == 0; } }
读锁的释放要比读锁的获取简单的多,这里简单的总结一下读锁的释放:
读锁的释放主要释放两个方面:
-
更新拥有当前线程的线程本地变量 - HoldCounter.count 减一。如果更新之后 HoldCounter.count 为0,那么需要移除线程本地变量。
-
还需要更新同步状态 state。
-
apparentlyFirstQueuedIsExclusive() 解析
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
apparentlyFirstQueuedIsExclusive 方法就是用来检查同步队列中head的后继节点是否为独占模式。
ReentrantReadWriteLock 的作者 Doug Lea 给写锁更高的优先级:如果遇上获取写锁的线程马上就要获取到锁了(head.next为获取写锁的线程),获取读锁的线程不应该和它抢。否则,可以随便抢。
3. 使用ReentrantReadWriteLock
- 锁降级
public void processData() {
readLock.lock();
if(!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if(!update) {
// 装备数据的流程(略)
update = true;
}
// 再获取读锁
readLock.lock();
} finally {
// 写锁释放
writeLock.unlock();
}
// 锁降级完成
}
try {
// 使用数据的流程(略)
} finally {
readLock.unlock();
}
}
-
下面是一个对ArrayList添加并发功能的例子。
ReadWriteList.java
public class ReadWriteList<E> { private List<E> list = new ArrayList<>(); private ReadWriteLock rwLock = new ReentrantReadWriteLock(); public ReadWriteList(E... initialElements) { list.addAll(Arrays.asList(initialElements)); } public void add(E element) { Lock writeLock = rwLock.writeLock(); writeLock.lock(); try { list.add(element); } finally { writeLock.unlock(); } } public E get(int index) { Lock readLock = rwLock.readLock(); readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } public int size() { Lock readLock = rwLock.readLock(); readLock.lock(); try { return list.size(); } finally { readLock.unlock(); } } }
如你所见,该类将 ArrayList 包装为底层数据结构。ReadWriteList 使用读锁来保护对读操作的并发访问(get()和size()方法),并使用写锁来保护对写操作的并发访问(add()方法)。
下面创建一个写线程类往 List 中添加 100 以内的随机数。
Writer.java
public class Writer extends Thread { private ReadWriteList<Integer> sharedList; public Writer(ReadWriteList<Integer> sharedList) { this.sharedList = sharedList; } public void run() { Random random = new Random(); int number = random.nextInt(100); sharedList.add(number); try { Thread.sleep(100); System.out.println(getName() + " -> put: " + number); } catch (InterruptedException ie ) { ie.printStackTrace(); } } }
再创建一个读线程来随机访问 List 已有的元素。
Read.java
public class Reader extends Thread { private ReadWriteList<Integer> sharedList; public Reader(ReadWriteList<Integer> sharedList) { this.sharedList = sharedList; } public void run() { Random random = new Random(); int index = random.nextInt(sharedList.size()); Integer number = sharedList.get(index); System.out.println(getName() + " -> get: " + number); try { Thread.sleep(100); } catch (InterruptedException ie) { ie.printStackTrace(); } } }
下面是测试代码:
ReadWriteLockTest.java
public class ReadWriteLockTest { static final int READER_SIZE = 10; static final int WRITER_SIZE = 2; public static void main(String[] args) { Integer[] initialElements = {33, 28, 86, 99}; ReadWriteList<Integer> sharedList = new ReadWriteList<>(initialElements); for (int i = 0; i < WRITER_SIZE; i++) { new Writer(sharedList).start(); } for (int i = 0; i < READER_SIZE; i++) { new Reader(sharedList).start(); } } }
4. ReentrantLock 与 ReentrantReadWriteLock 之间的区别
-
ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念。显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生。但是同样需要强调的一个概念是,锁是有一定的开销的,当并发比较大的时候,锁的开销就比较客观了。所以如果可能的话就尽量少用锁,非要用锁的话就尝试看能否改造为读写锁。
-
ReadWriteLock 描述的是:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。也就是说读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据)。
参考
- 《Java并发编程的艺术 - 第五章》
- 通过ReadWriteReentrantLock源代码分析AbstractQueuedSynchronizer共享模式
- ReentrantReadWriteLock深入分析