java.util.concurrent.locks.LockSupport用法

古人学问无遗力,少壮工夫老始成。这篇文章主要讲述java.util.concurrent.locks.LockSupport用法相关的知识,希望能为你提供帮助。


在看AQS内部的时候发现很多使用java.util.concurrent.locks.LockSupport类的东西。 比如CountDownLatch.await 阻塞的时候以及使用阻塞队列进行take、take 方法在线程阻塞的时候也是使用的该类。下面研究其主要的使用方法。
1. 线程状态简单理解一开始学习线程的时候线程的状态如下:




1、新建状态NEW

new了线程但是没有开始执行,比如: Thread t1 = new Thread(); t1就是一个新建状态的线程。

2、可运行状态RUNNABLE

new出来线程,调用start()方法即处于RUNNABLE状态了。处于RUNNABLE状态的线程可能正在Java虚拟机中运行,也可能正在等待处理器的资源,因为一个线程必须获得CPU的资源后,才可以运行其run()方法中的内容,否则排队等待

3、阻塞BLOCKED

如果某一线程正在等待监视器锁,以便进入一个同步的块/方法,那么这个线程的状态就是阻塞BLOCKED

4、等待WAITING

某一线程因为调用不带超时的Object的wait()方法、不带超时的Thread的join()方法、LockSupport的park()方法,就会处于等待WAITING状态

5、超时等待TIMED_WAITING

某一线程因为调用带有指定正等待时间的Object的wait()方法、Thread的join()方法、Thread的sleep()方法、LockSupport的parkNanos()方法、LockSupport的parkUntil()方法,就会处于超时等待TIMED_WAITING状态

6、终止状态TERMINATED

线程调用终止或者run()方法执行结束后,线程即处于终止状态。处于终止状态的线程不具备继续运行的能力。



可以看到当调用park 方法之后进入WAITING 状态。
2. 主要API1. 主要API如下:
源码如下:


java.util.concurrent.locks.LockSupport用法

文章图片
java.util.concurrent.locks.LockSupport用法

文章图片



/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/

/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent.locks;
import sun.misc.Unsafe;

/**
* Basic thread blocking primitives for creating locks and other
* synchronization classes.
*
* < p> This class associates, with each thread that uses it, a permit
* (in the sense of the {@link java.util.concurrent.Semaphore
* Semaphore} class). A call to {@code park} will return immediately
* if the permit is available, consuming it in the process; otherwise
* it < em> may< /em> block.A call to {@code unpark} makes the permit
* available, if it was not already available. (Unlike with Semaphores
* though, permits do not accumulate. There is at most one.)
*
* < p> Methods {@code park} and {@code unpark} provide efficient
* means of blocking and unblocking threads that do not encounter the
* problems that cause the deprecated methods {@code Thread.suspend}
* and {@code Thread.resume} to be unusable for such purposes: Races
* between one thread invoking {@code park} and another thread trying
* to {@code unpark} it will preserve liveness, due to the
* permit. Additionally, {@code park} will return if the callers
* thread was interrupted, and timeout versions are supported. The
* {@code park} method may also return at any other time, for "no
* reason", so in general must be invoked within a loop that rechecks
* conditions upon return. In this sense {@code park} serves as an
* optimization of a "busy wait" that does not waste as much time
* spinning, but must be paired with an {@code unpark} to be
* effective.
*
* < p> The three forms of {@code park} each also support a
* {@code blocker} object parameter. This object is recorded while
* the thread is blocked to permit monitoring and diagnostic tools to
* identify the reasons that threads are blocked. (Such tools may
* access blockers using method {@link #getBlocker(Thread)}.)
* The use of these forms rather than the original forms without this
* parameter is strongly encouraged. The normal argument to supply as
* a {@code blocker} within a lock implementation is {@code this}.
*
* < p> These methods are designed to be used as tools for creating
* higher-level synchronization utilities, and are not in themselves
* useful for most concurrency control applications.The {@code park}
* method is designed for use only in constructions of the form:
*
*< pre> {@code
* while (!canProceed()) { ... LockSupport.park(this); }}< /pre>
*
* where neither {@code canProceed} nor any other actions prior to the
* call to {@code park} entail locking or blocking.Because only one
* permit is associated with each thread, any intermediary uses of
* {@code park} could interfere with its intended effects.
*
* < p> < b> Sample Usage.< /b> Here is a sketch of a first-in-first-out
* non-reentrant lock class:
*< pre> {@code
* class FIFOMutex {
*private final AtomicBoolean locked = new AtomicBoolean(false);
*private final Queue< Thread> waiters
*= new ConcurrentLinkedQueue< Thread> ();
*
*public void lock() {
*boolean wasInterrupted = false;
*Thread current = Thread.currentThread();
*waiters.add(current);
*
*// Block while not first in queue or cannot acquire lock
*while (waiters.peek() != current ||
*!locked.compareAndSet(false, true)) {
*LockSupport.park(this);
*if (Thread.interrupted()) // ignore interrupts while waiting
*wasInterrupted = true;
*}
*
*waiters.remove();
*if (wasInterrupted)// reassert interrupt status on exit
*current.interrupt();
*}
*
*public void unlock() {
*locked.set(false);
*LockSupport.unpark(waiters.peek());
*}
* }}< /pre>
*/
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.

private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesnt need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

/**
* Makes available the permit for the given thread, if it
* was not already available.If the thread was blocked on
* {@code park} then it will unblock.Otherwise, its next call
* to {@code park} is guaranteed not to block. This operation
* is not guaranteed to have any effect at all if the given
* thread has not been started.
*
* @param thread the thread to unpark, or {@code null}, in which case
*this operation has no effect
*/
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

/**
* Disables the current thread for thread scheduling purposes unless the
* permit is available.
*
* < p> If the permit is available then it is consumed and the call returns
* immediately; otherwise
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of three things happens:
*
* < ul>
* < li> Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* < li> Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* < li> The call spuriously (that is, for no reason) returns.
* < /ul>
*
* < p> This method does < em> not< /em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread upon return.
*
* @param blocker the synchronization object responsible for this
*thread parking
* @since 1.6
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

/**
* Disables the current thread for thread scheduling purposes, for up to
* the specified waiting time, unless the permit is available.
*
* < p> If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant until one of four
* things happens:
*
* < ul>
* < li> Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* < li> Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* < li> The specified waiting time elapses; or
*
* < li> The call spuriously (that is, for no reason) returns.
* < /ul>
*
* < p> This method does < em> not< /em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread, or the elapsed time
* upon return.
*
* @param blocker the synchronization object responsible for this
*thread parking
* @param nanos the maximum number of nanoseconds to wait
* @since 1.6
*/
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}

/**
* Disables the current thread for thread scheduling purposes, until
* the specified deadline, unless the permit is available.
*
* < p> If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant until one of four
* things happens:
*
* < ul>
* < li> Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* < li> Some other thread {@linkplain Thread#interrupt interrupts} the
* current thread; or
*
* < li> The specified deadline passes; or
*
* < li> The call spuriously (that is, for no reason) returns.
* < /ul>
*
* < p> This method does < em> not< /em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread, or the current time
* upon return.
*
* @param blocker the synchronization object responsible for this
*thread parking
* @param deadline the absolute time, in milliseconds from the Epoch,
*to wait until
* @since 1.6
*/
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}

/**
* Returns the blocker object supplied to the most recent
* invocation of a park method that has not yet unblocked, or null
* if not blocked.The value returned is just a momentary
* snapshot -- the thread may have since unblocked or blocked on a
* different blocker object.
*
* @param t the thread
* @return the blocker
* @throws NullPointerException if argument is null
* @since 1.6
*/
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

/**
* Disables the current thread for thread scheduling purposes unless the
* permit is available.
*
* < p> If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant until one of three
* things happens:
*
* < ul>
*
* < li> Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* < li> Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* < li> The call spuriously (that is, for no reason) returns.
* < /ul>
*
* < p> This method does < em> not< /em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread upon return.
*/
public static void park() {
UNSAFE.park(false, 0L);
}

/**
* Disables the current thread for thread scheduling purposes, for up to
* the specified waiting time, unless the permit is available.
*
* < p> If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant until one of four
* things happens:
*
* < ul>
* < li> Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* < li> Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* < li> The specified waiting time elapses; or
*
* < li> The call spuriously (that is, for no reason) returns.
* < /ul>
*
* < p> This method does < em> not< /em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread, or the elapsed time
* upon return.
*
* @param nanos the maximum number of nanoseconds to wait
*/
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}

/**
* Disables the current thread for thread scheduling purposes, until
* the specified deadline, unless the permit is available.
*
* < p> If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant until one of four
* things happens:
*
* < ul>
* < li> Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* < li> Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* < li> The specified deadline passes; or
*
* < li> The call spuriously (that is, for no reason) returns.
* < /ul>
*
* < p> This method does < em> not< /em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread, or the current time
* upon return.
*
* @param deadline the absolute time, in milliseconds from the Epoch,
*to wait until
*/
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}

/**
* Returns the pseudo-randomly initialized or updated secondary seed.
* Copied from ThreadLocalRandom due to package access restrictions.
*/
static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
r ^= r < < 13; // xorshift
r ^= r > > > 17;
r ^= r < < 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
UNSAFE.putInt(t, SECONDARY, r);
return r;
}

// Hotspot implementation via intrinsics API
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class< ?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}

}

View Code
2. 使用




import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("111222");
LockSupport.park();
System.out.println("222333");
try {
Thread.sleep(3*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("333444");
});
thread.start();

Thread.sleep(1*1000);
System.out.println("thread.getState(): " + thread.getState() + "\\t1");
LockSupport.unpark(thread);
Thread.sleep(1*1000);
System.out.println("thread.getState(): " + thread.getState() + "\\t2");
Thread.sleep(3*1000);
System.out.println("thread.getState(): " + thread.getState() + "\\t3");
}
}



结果:




111222
thread.getState(): WAITING1
222333
thread.getState(): TIMED_WAITING2
333444
thread.getState(): TERMINATED3



2. park 方法
park用于挂起当前线程。 当前线程进入等待或者超时等待状态。其恢复的条件是调用unpark、其它线程中断了线程、带参数的park 时间到达指定时间。
1. park 可以设置一个blocker 参数, 也可以不设置。设置之后可以获取到当前线程阻塞的信息。
设置的时候会通过unsafe(parkBlockerOffset 偏移量获取到thread 对象的parkBlocker 的偏移量), 然后设置到java.lang.Thread#parkBlocker。
java.lang.Thread#parkBlocker:




/**
* The argument supplied to the current call to
* java.util.concurrent.locks.LockSupport.park.
* Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
* Accessed using java.util.concurrent.locks.LockSupport.getBlocker
*/
volatile Object parkBlocker;



(1) 不设置:




import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) {
LockSupport.park();
}
}



jstack查看线程信息:




"main" #1 prio=5 os_prio=0 tid=0x00000224f146d000 nid=0x3524 waiting on condition [0x00000081e89fe000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at PlainTest.main(PlainTest.java:6)



(2) 设置blocker
代码:




import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) {
LockSupport.park(new Object());
}
}



结果:




"main" #1 prio=5 os_prio=0 tid=0x000002402c6ad800 nid=0x4a14 waiting on condition [0x000000d0ed6ff000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for< 0x000000076caeab80> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at PlainTest.main(PlainTest.java:6)



(3) 设置超时时间




import java.util.concurrent.locks.LockSupport;

public class PlainTest {

public static void main(String[] args) {
LockSupport.parkNanos(Long.MAX_VALUE);
}
}



结果:




"main" #1 prio=5 os_prio=0 tid=0x000001e39c82c800 nid=0x3e70 waiting on condition [0x000000215d5ff000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
at PlainTest.main(PlainTest.java:6)



3. unpark 需要设置一个线程进行解除阻塞
用于解除线程的阻塞。注意也可以先unpark, 在park。 只不过先unpark、后park, 调用park的时候相当于线程不会进行阻塞。多次unpark 和 一次unpark 的效果一样, 只能对一次park 生效。
下面研究其原理。
3. 原理1. park 原理
park 调用的最终是: sun.misc.Unsafe#park, 是一个native 方法
【java.util.concurrent.locks.LockSupport用法】



public native void park(boolean var1, long var2);



其参数有两个, 第一个是是否是相对时间(isAbsolute), 第二个参数是时间。
对于第一个参数,LockSupport 使用的时候只有Ijava.util.concurrent.locks.LockSupport#parkUntil(java.lang.Object, long)传递的是true(代表是相对时间), 其他是false。
接下来查看其调用到C++相关方法。
1. \\openjdk\\hotspot\\src\\share\\vm\\prims\\unsafe.cpp 内部的方法:




UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread-> parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread-> parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread-> parker()-> park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread-> parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread-> parker());
#endif /* USDT2 */
if (event.should_commit()) {
oop obj = thread-> current_park_blocker();
event.set_klass((obj != NULL) ? obj-> klass() : NULL);
event.set_timeout(time);
event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop< uintptr_t> (obj) : 0);
event.commit();
}
UNSAFE_END



核心是在thread-> parker()-> park(isAbsolute != 0, time); 这一行代码,调用线程内部的parker() 获取到parker 之后继续调用 park 方法。(每个线程对象都有一个parker对象)
parker 对象:openjdk\\hotspot\\src\\share\\vm\\runtime\\park.hpp




class Parker : public os::PlatformParker {
private:
volatile int _counter ;
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association

public:
Parker() : PlatformParker() {
_counter= 0 ;
FreeNext= NULL ;
AssociatedWith = NULL ;
}
protected:
~Parker() { ShouldNotReachHere(); }
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();

// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;

};



_counter 属性是起重要作用的属性。
其父类有互斥变量等属性:\\openjdk\\hotspot\\src\\os\\linux\\vm\\os_linux.hpp




class PlatformParker : public CHeapObj< mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t_cond[2] ; // one for relative times and one for abs.

public:// TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }

public:
PlatformParker() {
int status;
status = pthread_cond_init (& _cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (& _cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};

#endif // OS_LINUX_VM_OS_LINUX_HPP



2. park 方法根据操作系统不同交给对应的实现:
比如: openjdk\\hotspot\\src\\os\\solaris\\vm\\os_solaris.cpp




void Parker::park(bool isAbsolute, jlong time) {
// Ideally wed do something useful while spinning, such
// as calling unpackTime().

// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
// 如果_counter 属性大于0, 代表有许可,直接返回
if (Atomic::xchg(0, & _counter) > 0) return;

// Optional fast-exit: Check interrupt before trying to wait
Thread* thread = Thread::current();
assert(thread-> is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
if (Thread::is_interrupted(thread, false)) {
return;
}

// First, demultiplex/decode time arguments
timespec absTime;
// 如果time 参数小于0, 或者是绝对时间且时间等于0, 直接返回
if (time < 0 || (isAbsolute & & time == 0) ) { // dont wait at all
return;
}
if (time > 0) {
// Warning: this code might be exposed to the old Solaris time
// round-down bugs.Grep "roundingFix" for details.
// 将时间换算后保存起来
unpackTime(& absTime, isAbsolute, time);
}

// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: _mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex.If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt);

// Dont wait if cannot get lock since interference arises from
// unblocking.Also. check interrupt before trying wait
// 如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回
if (Thread::is_interrupted(thread, false) ||
os::Solaris::mutex_trylock(_mutex) != 0) {
return;
}

int status ;

// 走到这里代表有_counter 大于0, 则将其重置为0。
if (_counter > 0){ // no wait needed
_counter = 0;
// 对互斥变量解锁
status = os::Solaris::mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}

#ifdef ASSERT
// Dont catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals();
thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, & oldsigs);
#endif

OSThreadWaitState osts(thread-> osthread(), false /* not Object.wait() */);
jt-> set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

// Do this the hard way by blocking ...
// See http://monaco.sfbay/detail.jsf?cr=5094058.
// TODO-FIXME: for Solaris SPARC set fprs.FEF=0 prior to parking.
// Only for SPARC > = V8PlusA
#if defined(__sparc) & & defined(COMPILER2)
if (ClearFPUAtPark) { _mark_fpu_nosave() ; }
#endif

if (time == 0) {
status = os::Solaris::cond_wait (_cond, _mutex) ;
} else {
status = os::Solaris::cond_timedwait (_cond, _mutex, & absTime);
}
// Note that an untimed cond_wait() can sometimes return ETIME on older
// versions of the Solaris.
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");

#ifdef ASSERT
thr_sigsetmask(SIG_SETMASK, & oldsigs, NULL);
#endif
_counter = 0 ;
status = os::Solaris::mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();

// If externally suspended while waiting, re-suspend
if (jt-> handle_special_suspend_equivalent_condition()) {
jt-> java_suspend_self();
}
}



(1) ThreadBlockInVM tbivm(jt); 是修改线程状态为阻塞。 相当于创建一个ThreadBlockInVM对象, 变量名为tbivm, 参数为jt
\\openjdk\\hotspot\\src\\share\\vm\\runtime\\interfaceSupport.hpp




class ThreadBlockInVM : public ThreadStateTransition {
public:
ThreadBlockInVM(JavaThread *thread)
: ThreadStateTransition(thread) {
// Once we are blocked vm expects stack to be walkable
thread-> frame_anchor()-> make_walkable(thread);
trans_and_fence(_thread_in_vm, _thread_blocked);
}
~ThreadBlockInVM() {
trans_and_fence(_thread_blocked, _thread_in_vm);
// We dont need to clear_walkable because it will happen automagically when we return to java
}
};



然后调用到: D:\\study\\sourcecode\\openjdk\\openjdk\\hotspot\\src\\share\\vm\\runtime\\interfaceSupport.hpp 中的 transition_and_fence




// transition_and_fence must be used on any thread state transition
// where there might not be a Java call stub on the stack, in
// particular on Windows where the Structured Exception Handler is
// set up in the call stub. os::write_memory_serialize_page() can
// fault and we cant recover from it on Windows without a SEH in
// place.
static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {
assert(thread-> thread_state() == from, "coming from wrong thread state");
assert((from & 1) == 0 & & (to & 1) == 0, "odd numbers are transitions states");
// Change to transition state (assumes total store ordering!-Urs)
thread-> set_thread_state((JavaThreadState)(from + 1));

// Make sure new state is seen by VM thread
if (os::is_MP()) {
if (UseMembar) {
// Force a fence between the write above and read below
OrderAccess::fence();
} else {
// Must use this rather than serialization page in particular on Windows
InterfaceSupport::serialize_memory(thread);
}
}

if (SafepointSynchronize::do_call_back()) {
SafepointSynchronize::block(thread);
}
thread-> set_thread_state(to);

CHECK_UNHANDLED_OOPS_ONLY(thread-> clear_unhandled_oops(); )
}



定义的线程状态: openjdk\\hotspot\\src\\share\\vm\\utilities\\globalDefinitions.hpp




enum JavaThreadState {
_thread_uninitialized=0, // should never happen (missing initialization)
_thread_new=2, // just starting up, i.e., in process of being initialized
_thread_new_trans=3, // corresponding transition state (not used, included for completness)
_thread_in_native=4, // running in native code
_thread_in_native_trans=5, // corresponding transition state
_thread_in_vm=6, // running in VM
_thread_in_vm_trans=7, // corresponding transition state
_thread_in_Java=8, // running in Java or in stub code
_thread_in_Java_trans=9, // corresponding transition state (not used, included for completness)
_thread_blocked= 10, // blocked in vm
_thread_blocked_trans= 11, // corresponding transition state
_thread_max_state= 12// maximum thread state+1 - used for statistics allocation
};



2. unpark 原理
\\openjdk\\hotspot\\src\\os\\solaris\\vm\\os_solaris.cpp 中 unpark 方法如下:




void Parker::unpark() {
int s, status ;
// 获取互斥锁
status = os::Solaris::mutex_lock (_mutex) ;
assert (status == 0, "invariant") ;
// s记录原来的_counter 的值
s = _counter;
// _counter 设置为0
_counter = 1;
// 释放互斥锁
status = os::Solaris::mutex_unlock (_mutex) ;
assert (status == 0, "invariant") ;

// 如果原来的_counter为0, 证明有线程调用park 在等待信号。 则调用下面方法通知线程解除阻塞。 则原来park 等待的线程会继续后面的代码
if (s < 1) {
status = os::Solaris::cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
}



unpark本身就是将_counter 设置为1,并通知条件阻塞的线程已经可以结束等待了. 如果多次连续调用unpark 方法,则s < 1 不成立, 也就不会走下面的方法。
cond_signal 调用 \\openjdk\\hotspot\\src\\os\\solaris\\vm\\os_solaris.hpp 中 下面代码:




static int cond_signal(cond_t *cv){ return _cond_signal(cv); }




总结: 每个线程都有一个parker 对象,内部包含_counter 可以视作许可证。 每次park 的时候相当于等待该许可证(等待该变量改为1), 调用unpark 相当于将许可证变量改为1。



【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】






    推荐阅读