不知道你有没有想过,那些去申请锁的线程都怎样了?有些可能申请到了锁,马上就能执行业务代码。但是如果有一个锁被很多个线程需要,那么这些线程是如何被处理的呢?
今天我们走进synchronized 重量级锁,看看那些没有申请到锁的线程都怎样了。
ps: 如果你不想看分析结果,可以拉到最后,末尾有一张总结图,一图胜千言
之前文章分析过synchroinzed中锁的优化,但是如果存在大量竞争的情况下,那么最终还是都会变成重量级锁。所以我们这里开始直接分析重量级锁的代码。
申请锁 在ObjectMonitor::enter函数中,有很多判断和优化执行的逻辑,但是核心还是通过EnterI函数实际进入队列将将当前线程阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 void ObjectMonitor::EnterI (TRAPS) { Thread * const Self = THREAD; if (TryLock (Self) > 0 ) { assert(_succ != Self, "invariant" ); assert(_owner == Self, "invariant" ); assert(_Responsible != Self, "invariant" ); return ; } if (TrySpin(Self) > 0 ) { assert(_owner == Self, "invariant" ); assert(_succ != Self, "invariant" ); assert(_Responsible != Self, "invariant" ); return ; } ... ObjectWaiter node (Self) ; Self->_ParkEvent->reset(); node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ; ObjectWaiter * nxt; for (;;) { node._next = nxt = _cxq; if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break ; if (TryLock (Self) > 0 ) { assert(_succ != Self, "invariant" ); assert(_owner == Self, "invariant" ); assert(_Responsible != Self, "invariant" ); return ; } } for (;;) { if (TryLock(Self) > 0 ) break ; assert(_owner != Self, "invariant" ); if (_Responsible == Self) { Self->_ParkEvent->park((jlong) recheckInterval); recheckInterval *= 8 ; if (recheckInterval > MAX_RECHECK_INTERVAL) { recheckInterval = MAX_RECHECK_INTERVAL; } } else { Self->_ParkEvent->park(); } ... if (TryLock(Self) > 0 ) break ; ++nWakeups; if (TrySpin(Self) > 0 ) break ; ... } ... UnlinkAfterAcquire(Self, &node); ... }
在入队之前,会调用tryLock尝试通过CAS操作将_owner(当前ObjectMonitor对象锁持有的线程指针)字段设置为Self(指向当前执行的线程),如果设置成功,表示当前线程获得了锁,否则没有。
1 2 3 4 5 6 7 8 int ObjectMonitor::TryLock (Thread * Self) { void * own = _owner; if (own != NULL ) return 0 ; if (Atomic::replace_if_null(Self, &_owner)) { return 1 ; } return -1 ; }
如果tryLock没有成功,又会再次调用tryLock(trySpin中调用了tryLock)去尝试获取锁,因为这样可以告诉操作系统我迫切需要这个资源,希望能尽量分配给我。不过这种亲和力并不是一定能得到保证的协议,只是一种积极的操作。
通过 ObjectWaiter对象将当前线程包裹起来,入到 CXQ 队列的头部
阻塞当前线程(通过pthread_cond_wait)
当线程被唤醒而获取了锁,调用UnlinkAfterAcquire方法将ObjectWaiter从CXQ或者EntryList中移除
核心数据结构 ObjectMonitor对象中保存了 sychronized 阻塞的线程的队列,以及实现了不同的队列调度策略,因此我们有必须先来认识下这个对象的一些重要属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class ObjectMonitor { volatile markOop _header; void * volatile _owner; volatile jlong _previous_owner_tid; volatile intptr_t _recursions; Thread * volatile _succ; ObjectWaiter * volatile _EntryList; ObjectWaiter * volatile _cxq; ObjectWaiter * volatile _WaitSet; } class ObjectWaiter : public StackObj { public : enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ }; ObjectWaiter * volatile _next; ObjectWaiter * volatile _prev; Thread* _thread; volatile TStates TState; public : ObjectWaiter(Thread* thread); };
看到ObjectWaiter中的_next和_prev你就会明白,这是使用了双向队列实现等待队列的的,但是实际上我们上面的入队操作并没有形成双向列表,形成双向列表是在exit锁的时候。
wait Java Object 类提供了一个基于 native 实现的 wait 和 notify 线程间通讯的方式,JDK中wait/notify/notifyAll全部是通过native实现的,当然到了JVM,它的实现还是在 src/hotspot/share/runtime/objectMonitor.cpp
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 void ObjectMonitor::wait (jlong millis , bool interruptible, TRAPS) { Thread * const Self = THREAD; JavaThread *jt = (JavaThread *)THREAD; ... if (interruptible && Thread::is_interrupted(Self, true ) && !HAS_PENDING_EXCEPTION) { THROW(vmSymbols::java_lang_InterruptedException()); return ; } jt->set_current_waiting_monitor(this ); ObjectWaiter node (Self) ; node.TState = ObjectWaiter::TS_WAIT; ... AddWaiter(&node); exit (true , Self); ... if (interruptible && (Thread::is_interrupted(THREAD, false ) || HAS_PENDING_EXCEPTION)) { } else if (node._notified == 0 ) { if (millis <= 0 ) { Self->_ParkEvent->park(); } else { ret = Self->_ParkEvent->park(millis ); } } ... } inline void ObjectMonitor::AddWaiter (ObjectWaiter* node) { if (_WaitSet == NULL ) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet; ObjectWaiter* tail = head->_prev; tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; }
上面我把wait的主要方法逻辑列出来了,主要会执行以下步骤
首先判断当前线程是否被中断,如果被中断了需要抛出InterruptedException
如果没有被中断,则会使用当前线程构造ObjectWaiter节点,将其插入双向链表WaitSet的尾部
调用exit,让出锁(让出锁的逻辑会在后面分析)
调用park(实际上是调用pthread_cond_wait)阻塞当前线程
notify 同样的notify的逻辑也是在ObjectMonitory.cpp中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void ObjectMonitor::notify (TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL ) { TEVENT(Empty-Notify); return ; } DTRACE_MONITOR_PROBE(notify, this , object(), THREAD); INotify(THREAD); OM_PERFDATA_OP(Notifications, inc(1 )); }
在notify中首先会判断waitSet是否为空,如果为空,表示没有线程在等待,则直接返回。否则则调用INotify方法。
notifyAll方法实际上是循环调用INotify
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 void ObjectMonitor::INotify (Thread * Self) { Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify" ); ObjectWaiter * iterator = DequeueWaiter(); if (iterator != NULL ) { iterator->TState = ObjectWaiter::TS_ENTER; iterator->_notified = 1 ; iterator->_notifier_tid = JFR_THREAD_ID(Self); ObjectWaiter * list = _EntryList; if (list != NULL ) { assert(list ->_prev == NULL , "invariant" ); assert(list ->TState == ObjectWaiter::TS_ENTER, "invariant" ); assert(list != iterator, "invariant" ); } if (list == NULL ) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator; } else { iterator->TState = ObjectWaiter::TS_CXQ; for (;;) { ObjectWaiter * front = _cxq; iterator->_next = front; if (Atomic::cmpxchg(iterator, &_cxq, front) == front) { break ; } } } iterator->wait_reenter_begin(this ); } Thread::SpinRelease(&_WaitSetLock); }
notify的逻辑比较简单,就是将WaitSet的头节点从队列中移除,如果EntryList为空,则将出队节点放入到EntryList中,如果EntryList不为空,则将节点插入到CXQ列表的头节点。
需要注意的是,notify并没有释放锁,释放锁的逻辑是在exit中
exit 当一个线程获得对象锁成功之后,就可以执行自定义的同步代码块了。执行完成之后会执行到 ObjectMonitor 的 exit 函数中,释放当前对象锁,方便下一个线程来获取这个锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 void ObjectMonitor::exit (bool not_suspended, TRAPS) { Thread * const Self = THREAD; if (THREAD != _owner) { if (THREAD->is_lock_owned((address) _owner)) { assert(_recursions == 0 , "invariant" ); _owner = THREAD; _recursions = 0 ; } else { assert(false , "Non-balanced monitor enter/exit! Likely JNI locking" ); return ; } } if (_recursions != 0 ) { _recursions--; return ; } for (;;) { ... w = _EntryList; if (w != NULL ) { assert(w->TState == ObjectWaiter::TS_ENTER, "invariant" ); ExitEpilog(Self, w); return ; } w = _cxq; ... _EntryList = w; ObjectWaiter * q = NULL ; ObjectWaiter * p; for (p = w; p != NULL ; p = p->_next) { guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant" ); p->TState = ObjectWaiter::TS_ENTER; p->_prev = q; q = p; } w = _EntryList; if (w != NULL ) { guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant" ); ExitEpilog(Self, w); return ; } ... } ... } void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) { _succ = Wakee->_thread; ParkEvent * Trigger = Wakee->_event; Wakee = NULL ; OrderAccess::release_store(&_owner, (void *)NULL ); OrderAccess::fence(); ... Trigger->unpark(); }
exit的逻辑还是比较简单的
如果当前是当前线程要让出锁,那么则查看其重入次数是否为0,不为0则将重入次数减去1,然后直接退出。
如果EntryList不为空,则将EntryList的头元素中的线程唤醒
将cxq指针赋值给EntryList,然后通过循环将cxq链表变成双向链表,然后调用ExitEpilog将CXQ链表的头结点唤醒(实际是通过pthread_cond_signal)
从这里之后,EntryList和CXQ就是同一个了,因为将CXQ赋值给了EntryList了。
需要注意的是这里唤醒的线程会继续执行文章开头的EnterI方法,此时会将ObjectWaiter从EntryList或者CXQ中移除。
实战演示 上面的源码均是基于JDK12,JDK8中的代码关于exit和notify都还有其他策略(选择哪个线程),而从JDK9开始就只保留了默认策略了。
所以下面的Java代码的运行结果无论是在jdk8还是jdk12,得到的结果都是一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 Object lock = new Object(); Thread t1 = new Thread(() -> { System.out.println("Thread 1 start!!!!!!" ); synchronized (lock) { try { lock.wait(); } catch (Exception e) { } System.out.println("Thread 1 end!!!!!!" ); } }); Thread t2 = new Thread(() -> { System.out.println("Thread 2 start!!!!!!" ); synchronized (lock) { try { lock.wait(); } catch (Exception e) { } System.out.println("Thread 2 end!!!!!!" ); } }); Thread t3 = new Thread(() -> { System.out.println("Thread 3 start!!!!!!" ); synchronized (lock) { try { lock.wait(); } catch (Exception e) { } System.out.println("Thread 3 end!!!!!!" ); } }); Thread t4 = new Thread(() -> { System.out.println("Thread 4 start!!!!!!" ); synchronized (lock) { try { System.in.read(); } catch (Exception e) { } lock.notify(); lock.notify(); lock.notify(); System.out.println("Thread 4 end!!!!!!" ); } }); Thread t5 = new Thread(() -> { System.out.println("Thread 5 start!!!!!!" ); synchronized (lock) { System.out.println("Thread 5 end!!!!!!" ); } }); Thread t6 = new Thread(() -> { System.out.println("Thread 6 start!!!!!!" ); synchronized (lock) { System.out.println("Thread 6 end!!!!!!" ); } }); Thread t7 = new Thread(() -> { System.out.println("Thread 7 start!!!!!!" ); synchronized (lock) { System.out.println("Thread 7 end!!!!!!" ); } }); t1.start(); sleep_1_second(); t2.start(); sleep_1_second(); t3.start(); sleep_1_second(); t4.start(); sleep_1_second(); t5.start(); sleep_1_second(); t6.start(); sleep_1_second(); t7.start();
上面的代码很简单,我们来分析一下。
线程1,2,3都调用了wait,所以会阻塞,然后WaitSet的链表结构如下:
线程4获取了锁,在等待一个输入
线程5,6,7也在等待锁,所以他们也会把阻塞,所以CXQ链表结构如下:
当线程4输入任意内容,并回车结束后(调用了其中的3个notify方法,但还未释放锁)
线程4让出锁之后,由于EntryList不为空,所以会先唤醒EntryList中的线程1,然后接下来会唤醒CXQ队列中的线程(后面你可以认为CXQ就是EntryList) 所以最终线程执行顺序为 4 1 3 2 7 6 5,我们的输出结果也能验证我们的结论
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Thread 1 start!!!!!! Thread 2 start!!!!!! Thread 3 start!!!!!! Thread 4 start!!!!!! Thread 5 start!!!!!! Thread 6 start!!!!!! Thread 7 start!!!!!! think123 Thread 4 end!!!!!! Thread 1 end!!!!!! Thread 3 end!!!!!! Thread 2 end!!!!!! Thread 7 end!!!!!! Thread 6 end!!!!!! Thread 5 end!!!!!!
一图胜千言