前言

ReentrantLock是一个重入锁,提供了公平锁和非公平锁的选择,通过构造方法接受参数fair,为true为公平锁,反之为非公平锁。默认为非公平锁。

问题

在进行源码阅读之前,我有几个问题,带着问题,我们来进一步学习源码,会更加有目的性。

1 如何实现加锁的,lock()方法都做了些什么?

2 是如何实现公平锁的,如何让等待更久的线程,可以优先获取锁呢?好奇

3 lock()、tryLock()、lockInterruptibly()这三个方法有什么区别呢?

整体结构

ReentrantLock实现了Lock接口,Sync抽象类继承了AQS,公平锁类和非公平锁类继承了Sync抽象类。

分析思路

从使用的角度来分析,每一步它都是怎么实现的,抽丝剥茧那么整个流程通了。而后再重点分析核心类。即「构造方法」-> 「lock」->「unlock」-> 「Sync」

构造方法

1
2
3
4
5
6
7
8
9
10
11
private final Sync sync;

// 默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}

// fair == true,为公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

lock()

若初始化为非公平锁,调用lock方法,其实是调用了NonfairSync的lock方法。

ReetrantLock#lock

1
2
3
public void lock() {
sync.lock();
}

compareAndSetState为CAS操作,即:如果同步状态state的值为0,就更新为1,此操作是原子性的。如果更新成功,则设置当前线程为同步锁的独占线程。反之,则执行#acquire(1)操作。

NonfairSync#lock

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

NonfairSync#tryAcquire,重载了AQS中的tryAcquire方法,而其实现为抽象类Sync#nonfairTryAcquire,NonfairSync非公平锁继承了Sync抽象类。

通过tryAcquire尝试获取锁,如果获取失败,将当前线程封装为Node,放到同步队列里去,如果此过程没有发生中断操作,则返回false。反之为true,自身需要再中断一次。

AbstractQueuedSynchronizer#acquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

NonfairTryAcquire#tryAcquire

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

获取当前同步状态state,如果state == 0, 没有线程独占同步对象Sync,更新当前同步状态由0 -> acquires,成功则设置同步对象的独占线程为当前线程。如果当前线程 == 同步对象Sync的独占线程,则当前设置同步状态为「当前值 + acquires」,如果重入的次数溢出,则抛出异常。

Sync#nonfairTryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

AbstractQueuedSynchronizer#acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前继节点「① 前继节点可能为空的吧,假设队列里没有节点呢,添加一个节点不就是首节点?」
final Node p = node.predecessor();
// 前继节点为首节点,那么当前节点可以尝试获取锁,采用自旋的方式
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前继节点状态为SIGNAL,表示lock被其他线程占用,需要调用parkAndCheckInterrupt方法来中断当前线程,休息下。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 清除node节点(清除过程是先给节点打上Node标签,再移除节点)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire方法用于判断是否需要中断,如果前继节点处于SIGNA状态则需要唤醒。

AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// ① ws > 0 的状态只有取消状态;
if (ws > 0) {
// 跳过「直接」前继节点,往前遍历直到找到队列等待状态为SIGNAL的「间接」前继节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前继节点为其他的状态时(非取消、非唤醒状态),CAS设置为唤醒状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

unlock()

若同步锁初始化为非公平锁,即调用NonfairSync的unlock方法,NonfairSync本身没有这个方法,使用继承抽象类Sync的继承抽象类AbstractQueuedSynchronizer的unlock方法。

ReentrantLock#unlock

1
2
3
public void unlock() {
sync.release(1);
}

tryRelease方法,被继承的父类Sync重载了。通过tryRelease方法释放锁,如果返回true,表示锁表成功释放。

AbstractQueuedSynchronizer#release

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒头节点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}

此处是为了唤醒头节点的后继节点

AbstractQueuedSynchronizer#unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点线程
LockSupport.unpark(s.thread);
}

获取当前同步状态state,如果当前线程不是lock对象的独占线程则抛错。若「state - releases == 0」,设置释放lock的独占线程并更新同步状态state为「state - releases」并返回true,表示锁已被释放,反之返回false,表示当前锁未被释放。

Sync#tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

如何实现公平锁的?

FairSync#lock

1
2
3
final void lock() {
acquire(1);
}

FairSync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// hasQueuedPredecessors判断是否有前继节点,如果没有才能获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

lock()、tryLock()、lockInterruptibly()的区别