如何看待AQS? 看了Synchronized的实现方式之后,再来看JDK的AQS,感觉就比较简单了,它的行为有点像银行排队,银行有很多窗口处理很多业务,不同的窗口处理不同的业务,比如有个人业务,也有金融业务,也有公司业务等,每个窗口都有很多人排队。一般来讲当你在窗口处理业务的时候是不会有人来打扰你的,不一般的时候是什么时候呢?那就是资料分发窗口,谁来了都给你一张资料,一边看去吧。
而且当你去处理业务的时候,你可能有些资料忘记带了,然后你又要重新去取资料,取完之后回来继续排队。
所以说,代码源于生活,古人诚不欺我。
带着问题看源码
AQS中是如何实现线程阻塞的?
AQS为什么支持多个Condition?
线程唤醒的时候顺序是怎样的?
带着问题去看源码比一猛子扎进源码的海洋好,所以我建议你带着这几个问题去看AQS源码。
CLH队列 AQS是一个抽象类,很多类都是基于它来实现自己的特有的功能的,比如Lock,Semaphore,CountDownLatch等,我们都知道对于一个共享变量,为了线程安全,同一时刻肯定只能有一个线程线程可以写这个变量,也就是只有一个线程能拿到锁。那么那些没有拿到锁的线程是怎么被阻塞的呢?
AQS中维护了一个基于链表实现的FIFO队列(官方叫它CLH锁),那些没有获取到锁的线程会被封装后放入都在这个队列里面,我们来看下这个队列的定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; }
这里共享模式和排它模式是什么意思呢?排它模式是表示同时只能有一个线程获取到锁执行,而共享模式表示可以同时有多个线程执行,比如Semaphore/CountDownLatch。
waitStatus除了默认值外还有三个值,分别代表不同的含义
CANCELLED = 1 : 表示当前节点已经不能在获取锁了,当前节点由于超时或者中断而进入该状态,进入该状态的节点状态不会再发生变化,同时后续还会从队列中移除。
SIGNAL = -1 : 表示当前节点需要去唤醒后继节点。 后继节点入队时,会将当前节点的状态更新为SIGNAL
CONDITION = -2 : 当前节点处于条件队列中,当其他线程调用了condition.signal()后,节点会从条件队列转义到同步队列中
PROPAGATE = -3 : 当共享锁释放的时候,这个状态会被传递到其他后继节点
其实原本的CLH队列是没有prev这个指针的,它存在的意义是那些取消锁获取的线程需要被移除掉。
Synchronized的wait只能有一个条件队列(WaitSet),而AQS则是支持多个条件队列,其中条件队列的关键数据结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ConditionObject implements Condition , java .io .Serializable { private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } public final void signal () { ... } public final void await () throws InterruptedException { ... } }
AQS支持多个Condition的原因就是每个ConditionObject都对应一个队列,所以它可以支持多个Condition。
如果多个线程请求同一把锁,有的线程阻塞在请求锁上面,有的锁阻塞到等待某个条件成立。 那么当持有锁的线程让出锁的时候,哪个线程应该获取到锁呢?
当获取了锁的线程调用了signal()时,它又会不会从条件队列(condition queue)中出队一个node,加入到同步队列中呢?答案是会的。
如果我说AQS分析完毕,你们会不会打我?
哈哈哈,各位放下键盘,扶我起来,我还能在水一会儿。
AQS中除了这两个类之外,还有AQS本身还有几个重要的属性,它们共同构成了AQS这个框架类的基础结构
1 2 3 4 5 6 7 8 9 10 private transient volatile Node head;private transient volatile Node tail;private volatile int state;
看到了head和tail,是不是一个双向链表的含义跃然纸上了。
AQS 中的state变量,对不同的子类有不同的含义,该变量对 ReentrantLock 来说表示加锁状态,对Semaphore来说则是信号量的数量。总之它们都是基于AQS中的state来实现各自独有功能的。
这里的state我觉得更应该叫做资源,就类似于去银行排队时取的票。对于ReentrantLock来说,这个资源只有1个,对于Semaphore或者CountDownLatch来说资源可以有很多个。
Unsafe讲解 unsafe是一个非常强大的类,通过它提供的API,我们可以操作内存,修改对象属性值,甚至内存屏障,加锁解锁都能通过它完成
图片来自于: https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html
AQS中就使用了unsafe来实现线程的阻塞以及修改属性的值,这里把AQS中涉及到unsafe的方法列出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public native void park (boolean isAbsolute, long time) ;public native void unpark (Object thread) ;public native long objectFieldOffset (Field f) ;public final native boolean compareAndSwapObject (Object o, long offset, Object expected, Object x) ;public final native boolean compareAndSwapInt (Object o, long offset, int expected, int x) ;
park和unpark的底层实现其实还是之前我们讲解过的pthread,这里不再讲解,感兴趣的可以去看看之前关于synchronized的文章。
1 2 3 4 5 6 7 8 9 UNSAFE_ENTRY(void , Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) ... thread->parker()->park(isAbsolute != 0 , time); ... UNSAFE_END
修改对象的属性值,我们用一段代码来演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class UnSafeDemo { public static void main (String[] args) throws Exception { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe) theUnsafe.get(null ); Student student = new Student(); student.setAge(18 ); long ageOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("age")); unsafe.compareAndSwapInt(student, ageOffset, 18 , 20 ); System.out.println(student.getAge()); } } class Student { int age; public int getAge () { return age; } public void setAge (int age) { this .age = age; } }
上面代码的输出结果为 20
,我们通过首先通过objectFieldOffset获取到age属性的内存偏移地址,然后通过compareAndSwapInt将age属性的值修改为20。
比如对于一维数组a[10],基地址是0x000000,那么a[1]的内存地址就是0x000001 = 0x000000(基地址) + 1(偏移量)
源码分析 其他类如果要想将AQS作为基类以实现自己的功能,只需要根据需求实现以下方法就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively () { throw new UnsupportedOperationException(); }
AQS中获取锁的方法是在acquire()
中,释放锁的方法是release()
,如果我们想要实现自定义的锁的时候,只需要根据自己的需求实现对应的tryXXX方法就行了,为了简化分析,我们只分析独占锁的源码。
获取锁 1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们假设tryAcquire返回false(不关心子类实现),也就是现在锁已经被其他线程占有了。现在又有线程来获取锁肯定是会获取失败的,所以失败的线程会被封装成Node插入到队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
上面逻辑很简单,就是构造一个排它锁模式的node,插入队列中(尾插入)。
但是你尤其需要注意到enq中,当节点第一次插入的时候,head的值是new Node(),它是一个虚拟节点,这个节点本身没有可运行的线程。
在链表中,使用这种虚拟节点很正常,有时候更加有利于操作。
enq执行完成后就形成了这样的链路关系:
acquireQueued的源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
逻辑也比较简单明了,在线程被阻塞之前,如果前一个节点是头节点(head),那么在尝试去获取一次锁,如果成功了就直接返回。
如果还是失败的话,就更新下节点状态,然后将线程阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
线程阻塞的代码更简单
1 2 3 4 5 6 7 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
至此,获取锁的源码就解析完成了。
从上面的代码,我们可以知道 head --> next --> next --> tail
这样的数据结构组成了同步队列,等待获取锁的线程会被封装成node插入到队列中去。
释放锁 release()中主要调用了unparkSuccessor()方法来唤醒后继节点,源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
之前提到过head节点是虚拟节点,所以会调用unsafe.park
去解锁head.next节点
条件变量(Condition) Java 语言内置的管程(synchronized)里只有一个条件变量,而我们的AQS是支持多个条件变量的,Java中定义了一个接口,叫做Condition,AQS的内部类ConditionObject实现了这个接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface Condition { void await () throws InterruptedException ; void signal () ; void signalAll () ; }
Condition中线程等待和通知需要调用await()
、signal()
、signalAll()
,它们的语义和 wait()
、notify()
、notifyAll()
是相同的。但是不一样的是,Condition里只能使用前面的 await()
、signal()
、signalAll()
,而后面的 wait()
、notify()
、notifyAll()
只有在 synchronized 实现的管程里才能使用。
await 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
await()方法的逻辑也比较简单,其执行的主要流程如下
如果当前线程已经被中断则抛出异常,未中断则执行步骤2
将当前线程构造为condition node,并插入到条件队列中
如果condition node不在同步队列中(有可能被唤醒后,移出条件队列了),则调用unsafe.park阻塞当前线程
通过以上代码分析,await之后条件队列会形成下面这样的数据结构
1 firstWaiter --> nextWaiter --> nextWaiter --> lastWaiter
这里的firsetWaiter并不是虚拟节点,当只有一个线程在条件队列中时,firstWaiter == node == lastWaiter
signal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
其实signal的逻辑也是很简单的,不知道为什么写到signal的时候,我就想起了我当时写synchronized的时候,一样的索然无味,无欲无求。我想可能是没人给我点赞。
说回signal,它的主要逻辑如下
将第一个等待节点从等待队列中移除
修改第一个等待节点的waitStatus为0
将节点入队到同步队列,并返回前一个节点
判断前一个节点是否处于CANCELLED或者waitStatus能否正常修改,如果不能则解锁刚入队的节点
你需要注意的是,signal并没有释放自己获得的锁。释放锁的操作仍然是通过release的。
总结 通过源码的分析,我相信你一定可以回答上之前的三个问题。也让我们知道了AQS和Synchronized其实内在实现方式是很类似的。
AQS中有同步队列和条件队列,signal的时候是将节点从条件队列转移到同步队列。 Synchronized中有EntryList和WaitSet。notify的时候将线程从WaitSet移动到EntryList中。
同样的,看源码我个人更喜欢沿着一条线去看,而不是大而全的看,这样更容易把控整个脉络。