0%

Java - AQS

AQS(AbstractQueuedSynchronizer),是JDK1.5之后出现一种抽象的同步编程框架,主要是利用一个共享资源变量state和一个CLH同步队列,来实现共享资源的竞争,线程的阻塞唤醒逻辑,以及阻塞线程的存储队列。ReentrantLock、CountDownLatch都是基于AQS来实现的。

CLH是一个虚拟的双向队列,队列仅仅保存节点之间的关联关系。在AQS中,即是将每个请求共享资源的线程封装成一个节点Node,然后保存节点之间的关联关系。

更直白一点描述,就是线程通过CAS去改变共享变量state,如果修改成功,则获取锁成功;如果修改失败,则线程进入等待队列中,等待被唤醒。

框架

AQS的实现方式如下图:

1

state

共享资源变量state,是被private volatile修饰的一个int型变量,访问方式有三种:

  • int getState();

  • void setState(int newState);

  • boolean compareAndSetState(int expect, int update);

    基于Unsafe#compareAndSwapInt方法实现。

资源共享的方式有两种:

  • 独占式(Exclusive):只有单个线程能够成功获取资源并执行,例如ReentrantLock。

  • 共享式(Shared):可以有多个线程获取资源并执行同步代码,例如CountDownLatch。

由于AQS只是一个同步框架,因此自定义的同步器,需要实现AQS中的几个方法,以实现对共享资源的操作方式:

  • isHeldExclusively():该线程是否正在独占资源。
  • tryAcquire(int):独占方式尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

自定义同步器,根据自定义的资源共享方式,选择性的实现上述方法,比如独占式的同步器,需要实现tryAcquire-tryRelease,共享式同步器需要实现tryAcquireShared-tryReleaseShared。当然也可以同时支持独占和共享两种方式,例如ReentrantReadWriteLock。

正是由于子类资源共享的方式不同,所以上述方法并没有被设计成abstract,否则一个独占式同步器也非的重写共享式同步器的两个方法,没啥意义。

CLH队列

CLH是一个虚拟的双向队列,队列将线程封装成一个节点,然后保存到队列中,同时记录着节点之间的关联关系,我们先来看看节点的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 隐去部分代码
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 等待状态,初始值为0,其他可选值是上面的4个值
volatile int waitStatus;

volatile Node prev;
volatile Node next;

// 当前节点持有的线程
volatile Thread thread;

Node nextWaiter;

// 当前节点是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

先来看看两组常量定义:

  • 共享/独占

    1
    2
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    当一个同步器是共享式的,那么在封装线程为节点的时候,会用new Node(Thread.currentThread(), Node.SHARED);当一个同步器是独占式的,那么会使用new Node(Thread.currentThread(), Node.EXCLUSIVE),而从上面Node的Node(Thread thread, Node mode)构造函数可以知道,第二个参数被保存在nextWaiter中,同时还提供了final boolean isShared()方法来判断当前node实例是什么模式。

    如果要定义一个类型,常规的做法会用一个Enum变量,或者一个Boolean等,但是这里采用的是用一个node对象来存储常量或者null,与常规做法不太一致,因为这里的nextWaiter有多种用处。这个变量的定义比较复杂,名义上称之为“下一个等待的节点”,而实际上根据此节点的取值不同,意义也不同:

    • 取值为Node.EXCLUUSIVE时,即是为null的时候,表示当前node实例为独占模式;

    • 取值为Node.SHARED时,即是常量的时候,表示当前node实例为共享模式;

    • 取值为非null,非常量的其他Node节点时,表示Condition等待队列中当前节点的下一个等待节点。

      等待队列的知识会在下面说明,此处先了解一下即可。

  • 节点的等待状态

    节点的等待状态保存在waitStatus中,初始值即使int类型的默认初始值0,表示节点在队列中于等待状态,其他几个状态的定义如下:

    • CANCELLED:1

      表示当前节点实例因为超时或者线程中断而被取消,节点进入了取消状态则不再变化。

    • SIGNAL:-1

      表示当前节点的后继节点是阻塞的,当此节点释放或取消时,会通知后继节点,使后继节点的线程得以运行。

    • CONDITION:-2

      表示当前节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中,状态会被重新设置为0。

    • PROPAGATE:-3

      此状态值通常只涉及到调用了doReleaseShared()方法的头节点,确保releaseShared()方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。

获取/释放共享资源

接下来我们通过源码,看看AQS是如何获取/释放共享资源的,不过在这之前,我们先简单介绍一下LockSupport。

LockSupport

LockSupport类中,有两个主要的方法:

1
2
3
4
5
6
7
8
9
10
11
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

park方法用于将当前线程阻塞,unpark用于将指定线程唤醒,两个方法都调用了UNSAFE类的方法,其方法声明为:

1
2
public native void park(boolean var1, long var2);
public native void unpark(Object var1);

在Linux系统下,上面两个native方法使用的是Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。底层实现细节,可以参考这里

注意,进入调用park之后的线程,线程状态为WAIT,而不是BLOCKED。

许可

在底层实现的Parker类中,有一个_counter字段,用来记录许可,park之后,实际上是在等待一个许可,unpark为某个线程提供一个许可,即是当一个线程调用了park方法(不带超时时间),那么线程会一直等待,直到其他线程调用unpark来提供一个许可。

假设某个线程先调用了unpark(T),那么相当于提前给了线程T一个许可,当T线程再调用park的时候,就可以直接消费这个许可,而不进入阻塞状态。但是这个许可是不能叠加的,unpark(T)调用了多次,线程T第一次park可以不用进入阻塞状态,但是第二次park仍然会进入阻塞。

调用park之后的状态

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
Thread t = new Thread(() -> {
LockSupport.park();
});

t.start();
try {
Thread.sleep(30000);
} catch (InterruptedException ignored) {
}
LockSupport.unpark(t);
}

在线程sleep过程中,使用jstack命令查看,得到如下状态:

1
2
3
4
5
6
7
"Thread-0" #10 prio=5 os_prio=31 tid=0x00007fe48089f800 nid=0x5503 waiting on condition [0x00007000107a5000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at net.cllc.Main.lambda$main$0(Main.java:10)
at net.cllc.Main$$Lambda$1/1023892928.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)

说明LockSupport.park调用之后的线程,是处于WAITING状态。

对中断的响应

如果线程在调用了LockSupport.park()之后,遇到了中断调用,则线程会被唤醒,中断标志位被设置成true,然后继续执行,并不会像Object#wait方法那样抛出InterruptedException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
Thread t = new Thread(() -> {
System.out.println("1");
LockSupport.park();
System.out.println("2");
System.out.println("3, " + Thread.currentThread().isInterrupted());

});

t.start();
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}

t.interrupt();
System.out.println("done");
}

上述代码运行之后会打印:

1
2
3
4
1
done
2
3,true

其中,打印了1之后,等待了3秒之后才打印后续三行,并且没有抛出异常。

独占式

获取资源

独占模式下,获取资源的方法源码为:

1
2
3
4
5
6
7
8
9
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

acquire方法为入口方法,首先会调用tryAcquire尝试获取共享资源,如果获取共享资源成功,则直接返回,否则继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

我们先来看看addWaiter方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Node addWaiter(Node mode) {
// 将当前线程封装成一个node节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 若队列不为空,则尝试快速插入队尾
node.prev = pred;
// 利用CAS将当前节点设置为队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}

// 若队列不存在,或者快速插入失败,则调用enq进行常规插入
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 若队列不存在,则先初始化head,即使初始化队列
// 如果队列不存在,则此死循环必定会执行两次,第一次初始化队列,第二次走到下面的else中
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 队列存在,则利用CAS将当前节点设置为队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

线程加入到队列之后,执行acquireQueued函数,自旋线程直到获取锁,或者是将线程阻塞,直到被前一个节点唤醒:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 标记线程是否被中断过
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// 如果前继节点是head节点,则代表队列马上排到自己了,则尝试获取资源
if (p == head && tryAcquire(arg)) {
// 若获取成功,则设置当前节点为head节点,并返回是否中断过
setHead(node);
// 前继节点已经释放了资源,将其next置空,方便GC
p.next = null;
failed = false;
return interrupted;
}
// 若前继节点不是head节点,或者尝试获取资源失败,则做如下逻辑:
// 1. 先调用shouldParkAfterFailedAcquire,判断是否需要阻塞该节点的线程
// 2. 如果不需要,则继续for循环
// 3. 如果需要阻塞,则调用parkAndCheckInterrupt,阻塞线程,并在线程阻塞完毕之后,返回线程是否被中断过
// 4. 根据parkAndCheckInterrupt的返回值,判断是否设置中断变量为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 函数退出时,若当前线程获取资源失败,则调用函数,放弃获取资源
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前继节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前继节点的状态为SIGNAL,则表示前继节点节点结束之后会通知下一个节点
// 则返回true,表示当前节点可以park,然后等待前继节点唤醒
return true;
if (ws > 0) {
// 前继节点的状态大于0,即是CANCELLED,则继续遍历前继节点的前继节点,直到找到一个等待状态不大于0的
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前继节点的状态小于等于0,且不为SIGNAL
// 则将前继节点的等待状态设置为SIGNAL,保证下次自旋时,次函数能够返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
// 返回线程阻塞过程中,是否被中断过
return Thread.interrupted();
}

若acquireQueued函数最终返回true,则执行selfInterrupt函数进行自我中断:

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

除了上面最基础的acquire方法外,AQS还提供了几种其他的独占式获取资源的方式:

  • 获取资源(响应中断)

    上面的acquire方法中,如果遇到线程中断,则最终会调用线程的自我中断方法,除此之外,AQS还提供了一个acquireInterruptibly入口方法,与上面的区别在于,如果线程在park期间遇到了中断,则会抛出一个异常:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null;
    failed = false;
    return;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    // 区别在这里
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
  • 获取资源(带超时时间,响应中断)

    入口方法为tryAcquireNanos,源码为:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (nanosTimeout <= 0L)
    return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return true;
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
    return false;
    if (shouldParkAfterFailedAcquire(p, node) &&
    nanosTimeout > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())
    throw new InterruptedException();
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    区别在于加入了超时的机制,这里不再详细说明。

释放资源

资源释放的入口为如下函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

如果释放失败,则返回false;如果成功,并且head节点不为空,且head节点的等待状态不为0,则唤醒后继节点。

unparkSuccessor的函数源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将当前节点的等待状态重置为0
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 若后继节点为空,或者后继节点的状态大于0(即是CANCELLED)
// 则从尾端开始,查找第一个非空,且状态小于等于0的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 若找到正确的后继节点,则将其唤醒
LockSupport.unpark(s.thread);
}

若被唤醒的线程所在节点的前继节点不是头结点,经过shouldParkAfterFailedAcquire的调整,也会移动到等待队列的前面,直到其前继节点为头结点。

共享式

理解了独占模式下的获取,释放资源之后,共享式的就比较简单了。

获取资源

入口函数:

1
2
3
4
5
6
7
8
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

先说说tryAcquireShared函数,这个函数是需要子类自己去实现对共享资源的获取方式,当返回值大于0,表示共享资源获取成功,并且还有剩余资源;当返回值等于0,表示获取共享资源成功,但是无剩余资源;当返回值小于0,则表示获取失败。

acquireShared函数首先尝试获取资源,如果获取成功,则直接返回;否则调用doAcquireShared进入队列等待资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private void doAcquireShared(int arg) {
// 将当前线程封装成一个节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 记录线程是否中断过
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
if (p == head) {
// 若前继节点为head节点,则尝试获取共享资源
int r = tryAcquireShared(arg);
if (r >= 0) {
// 若获取资源成功,并且还有剩余资源
// 则设置当前节点为head节点,然后唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
// 如果线程被中断过,则执行自我中断
selfInterrupt();
failed = false;
return;
}
}
// 这两个方法上面已经介绍过,这里不再赘述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 函数退出时,若当前线程获取资源失败,则调用函数,放弃获取资源
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
// 记录原来的head节点
Node h = head;
// 设置最新的head节点
setHead(node);

// 判断如果有剩余资源,且其他条件都满足,则唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

上面唤醒后继节点时,是调用doReleaseShared来实现的,我们在释放资源中继续讲解。

至此,共享式获取资源就结束了,与独占式的区别主要在于:

  • 独占式是在释放的时候唤醒后继节点;而共享式是获取资源之后如果有剩余资源,则唤醒后继节点;
  • 独占式是在入口函数的地方,判断如果中断过,则调用自我中断;而共享式是在获取共享资源之后就判断是否中断过。

释放资源

释放资源的入口函数为:

1
2
3
4
5
6
7
8
9
10
11
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

先调用tryReleaseShared释放资源,如果失败,则返回false;否则则唤醒等待队列里的其他线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 获取同步资源成功之后,在setHeadAndPropagate函数中会调用此函数,唤醒其他线程
// 释放资源的releaseShared也会调用此函数,唤醒其他线程
// 因此这里可能会出现并发的情况,所以需要CAS控制
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 等待状态为0,表明是队列的最后一个节点,那么CAS为PROPAGATE,表明下一次tryShared时,需要传播
// 如果失败说明有新后继节点将其改为了SIGNAL后挂起了,那么继续循环传播
continue;
}
// 函数唯一的出口
// 如果head改变了,说明有新的排队的线程获取到了锁,于是继续循环唤醒后续节点
// 如果head没变,说明没有需要唤醒的节点了
if (h == head)
break;
}
}

上面这段代码,可能并没有那么容易理解,更详细的共享锁释放逻辑解读,可以参考这里

AQS子类

举两个比较常用的,基于AQS实现的锁的例子。

独占锁-ReentrantLock

ReentrantLock继承自Lock接口,类中有个内部类Sync,继承自AQS,然后Sync又有两个子类FairSync和NonfairSync,用于实现公平锁和非公平锁,在ReentrantLock的构造函数中可以选择公平还是非公平模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReentrantLock implements Lock {

abstract static class Sync extends AbstractQueuedSynchronizer {
}

// 非公平锁
static final class NonfairSync extends Sync {
}

// 公平锁
static final class FairSync extends Sync {
}

public ReentrantLock() {
sync = new NonfairSync();
}


public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

public void lock() {
sync.lock();
}
}

我们先通过非公平锁来说一下ReentrantLock,非公平锁的资源操作逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果state为0,则表示锁空闲,利用CAS获取锁
if (compareAndSetState(0, acquires)) {
// 如果获取成功,设置owner信息,然后返回
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 如果获取失败,则说明所已经被占用
// 判断占用锁的线程是否是当前线程
// 如果是,则累加state,相当于标记重入次数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 只有当state为0的时候,才表示锁被成功释放
free = true;
setExclusiveOwnerThread(null);
}
// 如果重入了n次,但是只释放了n-1次,则这里的c=1,仍然表示锁未成功释放
setState(c);
return free;
}
}

static final class NonfairSync extends Sync {
final void lock() {
// 非公平锁,调用lock的时候,先直接尝试用CAS获取锁
if (compareAndSetState(0, 1))
// 如果获取成功,则设置owner,然后返回
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则走资源获取逻辑
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
// 非公平锁,tryAcquire直接调用Sync的nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}

公平锁的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 公平锁
static final class FairSync extends Sync {
final void lock() {
// 区别1
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 区别2
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平锁与非公平锁的区别在于:

  • 区别1:公平锁的lock,不会直接用CAS尝试获取锁;
  • 区别2:当state为0的时候,公平锁会通过hasQueuedPredecessors函数判断是否有前继节点,如果没有,才会尝试获取锁。

共享锁-CountDownLatch

CountDownLatch的源码不多,我们直接上关键源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
for (;;) {
// 自旋的方式释放资源,直到这次资源释放成功
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void countDown() {
sync.releaseShared(1);
}
}

CountDownLatch有点将资源的获取-释放逻辑倒过来的意思。初始化的时候,传入一个int值,作为初始count,每调用一次countDown释放资源,则count减1,在count减到0之前,调用await方法,会因为tryAcquireShared返回false而被阻塞,直到资源释放为0之后,唤醒await的线程,此时tryAcquireShared方法便能返回true。

CountDownLatch使用姿势比较简单,网上例子也偏多,这里就不再举例了。

条件队列

AQS框架中,除了state和CLH实现的同步器之外,还有一个基于独占锁实现的条件队列,其结构大致如下图:

1

只有CLH队列中的节点有资格竞争state,条件队列中的节点会在调用了Condition#await方法之后,从CLH队列移动到条件队列;在调用了Condition#signal方法之后,从条件队列移动到CLH队列。

很多阻塞队列的实现都是基于条件队列的,例如LinkedBlockingQueue,以下为take方法的相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public interface Condition {
// 当前线程进入等待状态直到被唤醒或者中断
// 其他变种方法无非是加入了超时时间或者对中断的处理
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;

// 唤醒单个阻塞线程
void signal();
// 唤醒所有阻塞线程
void signalAll();
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
// AQS中的内部类ConditionObject实现了Condition接口
public class ConditionObject implements Condition {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
}

public class ReentrantLock implements Lock {
private final Sync sync;

// 调用ReentrantLock#newCondition,实际上是new了一个AQS中的ConditionObject
public Condition newCondition() {
return sync.newCondition();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
}

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
// 创建一个独占式锁
private final ReentrantLock takeLock = new ReentrantLock();
// 在该锁上,创建一个条件队列。即是AQS中的ConditionObject1
private final Condition notEmpty = takeLock.newCondition();

public E take() throws InterruptedException {
E x;
int c = -1;
// 首先进行加锁
takeLock.lockInterruptibly();
try {
// 如果队列是空的,则进行等待
notEmpty.await();

// 到这里说明等待被唤醒,队列有数据,则取出数据
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 如果还有剩余,则唤醒等待的线程
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}

return x;
}
}

条件队列是基于独占锁的,因此调用await之前必须要先获取独占锁,接下来我们看看await的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void await() throws InterruptedException {
// 当前线程如果被中断,则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到条件队列中
Node node = addConditionWaiter();
// 由于调用await之前,一定是先获取到了独占锁的,因此这里需要把独占锁释放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 循环判断,如果当前节点不在同步队列中(SyncQueue,即是CLH队列),则说明在等待队列中,则阻塞当前线程
LockSupport.park(this);
// 阻塞被唤醒之后,先判断是否是被中断了,如果是,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 如果否,则再次进入while判断,如果是被唤醒的,则再次while判断不会成立,则跳出while循环
// 唤醒逻辑中,会将节点从等待队列移出,追加到同步队列的队尾,因此再次while判断不会成立
}
// 到这里,表示线程被唤醒了或者被中断了
// 调用acquireQueued尝试获取同步资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 到这里说明线程重新获得了独占锁
if (node.nextWaiter != null)
// 删除条件队列中被取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 根据不同模式处理中断
reportInterruptAfterWait(interruptMode);
}

接下来细化await中的一些函数调用逻辑,先看看addConditionWaiter函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 如果最后一个节点被取消,则删除队列中被取消的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程封装成一个节点,状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

// 遍历链表,删除取消的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

接下来看看fullyRelease:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前的state,并一并释放
int savedState = getState();
// 独占锁的释放资源入口函数
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

isOnSyncQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean isOnSyncQueue(Node node) {
// 快速判断1:节点状态或者节点没有前置节点
// 同步队列是有头节点的,而条件队列没有
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
if (node.next != null) // If has successor, it must be on queue
return true;

return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
// 注意这里用的是tail,这是因为条件队列中的节点是被加入到同步队列尾部,这样查找更快
// 从同步队列尾节点开始向前查找当前节点,如果找到则说明在,否则不在
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

checkInterruptWhileWaiting:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private int checkInterruptWhileWaiting(Node node) {
// 如果现在没有被中断,则正常返回0
// 否则进入同步队列,并返回是由中断导致的唤醒,还是由正常signal导致的唤醒
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

// 该方法返回true表示节点由中断加入同步队列,返回false表示由signal加入同步队列
final boolean transferAfterCancelledWait(Node node) {
// 设置节点状态为0,如果成功则加入同步队列,并返回true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}

// 如果上面设置失败,说明节点已经被signal唤醒,由于signal操作会将节点加入同步队列,我们只需自旋等待即可
while (!isOnSyncQueue(node))
// 让出CPU
Thread.yield();
return false;
}

reportInterruptAfterWait:

1
2
3
4
5
6
7
8
private void reportInterruptAfterWait(int interruptMode)
// 根据中断时机选择抛出异常或者设置线程中断状态
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

参考



-=全文完=-