Preface
Introducing AQS when , There is an inner class called ConditionObject, There was no introduction , And when reading the source code later , You'll find that many places use Condition , You'll be surprised , This Condition What's the use ? Just read today Condition Source code , And then I can figure out Condition What do you do ? Of course, I hope you have read this article AQS、ReentrantLock as well as LockSupport Or have a certain understanding of (
Of course, you can also jump to the end of the article to see the summary).
=>
official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !
Introduce
Object The monitor method of :wait、notify、notifyAll It should be no stranger , In the multithreading scenario , Must be used first synchronized Get lock , Then you can call Object Of wait、notify.
Condition Use , Equivalent to using Lock To replace the synchronized, And then use Condition Replace Object The monitor method of .
Conditions( Also known as conditional queue or conditional variable ) Provides a pause for a thread ( wait for ), Until another thread notifies the blocked thread , Some state conditions may now be true .
Because accessing this shared state information occurs in different threads , So it has to be protected , So some form of lock will be used . The key property provided by the wait condition is that it releases the associated lock atomically , And suspend the current thread , It's like Object.wait equally .
Condition Instances are essentially bound to locks . In order to obtain Condition example , In general use Lock Example of newCondition() Method .
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
Basic use
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition Instances depend on lock example
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putPtr, takePtr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// put To judge whether it is full
// And the thread is in notFull Conditionally, the queue is blocked
while (count == items.length) {
notFull.await();
}
items[putPtr] = x;
if (++putPtr == items.length) {
putPtr = 0;
}
++count;
// put After success , There are elements in the queue
// Wake up in notEmpty Conditionally queued blocked threads
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
// take when , Found empty
// And the thread is in notEmpty Queue blocking on condition of
while (count == 0) {
notEmpty.await();
}
Object x = items[takePtr];
if (++takePtr == items.length) {
takePtr = 0;
}
--count;
// take success , The queue can't be full
// Wake up in notFull Conditionally queued blocked threads
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
Above is an example of official documentation , A simple BlockingQueue , Understand here , Will find Synchronous queue This logic is used in many places in . The necessary code descriptions have been annotated in the code .
Question question
- Condition and AQS What does it matter ?
- Condition What is the implementation principle of ?
- Condition Waiting queue and AQS What's the difference and connection between synchronization queues of ?
Source code analysis
The basic structure
adopt UML It can be seen that ,Condition It's just an abstract class , Its main implementation logic is in AQS The inner class of ConditionObject Realized . The following is mainly from await and signal Two ways to start , Learn from source code ConditionObject.
establish Condition
Lock lock = new ReentrantLock();
Condition con = lock.newCondition();
In general use lock.newCondition() Create conditional variables .
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public Condition newCondition() {
return sync.newCondition();
}
// Sync Integrate AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
}
What we use here is ReentrantLock Source code , It's called sync.newCondition(),Sync Inherit AQS, In fact, we created a AQS Internal class ConditionObject Example .
What needs to be noted here is lock Every call lock.newCondition()
There will be a new ConditionObject Instance generation , That is to say one lock You can create multiple Condition example .
Condition Parameters
/** The first node in the conditional queue */
private transient Node firstWaiter;
/** The last node in the conditional queue */
private transient Node lastWaiter;
await Method
await Method , Will cause the current thread to wait , Until a signal is received or interrupted .
With this Condition The associated lock is released by the atom , And for thread scheduling purposes , The current thread is disabled , And it's dormant , Until one of the following four happens :
- Other threads call this Condition Of signal Method , The current thread is chosen to wake up ;
- Other threads call this Condition Of signalAll Method ;
- Other threads interrupt the current thread , And support interrupt thread suspend ;
- False arousal occurs .
In all cases , Before this method can return , The current thread must reacquire the lock associated with this condition . When the thread returns , This lock can be guaranteed to remain .
Now let's look at AQS Internal implementation logic :
public final void await() throws InterruptedException {
// In response to interrupt
if (Thread.interrupted())
throw new InterruptedException();
// Add to end of condition queue ( Waiting in line )
// Inside will create Node.CONDITION Type of Node
Node node = addConditionWaiter();
// Release the lock acquired by the current thread ( By manipulating the state Value )
// If the lock is released, it will be blocked and suspended
int savedState = fullyRelease(node);
int interruptMode = 0;
// The node is no longer in the synchronization queue , Call park Let it hang in the waiting queue
while (!isOnSyncQueue(node)) {
// call park Block suspends the current thread
LockSupport.park(this);
// explain signal Called or thread interrupted , Check the wake-up reason
// If the terminal is awakened , And then jump out of the loop
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// while The loop ends , Threads start to lock
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// Unified handling of interrupts
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await The method steps are as follows :
- establish Node.CONDITION Type of Node And add it to the conditional queue (ConditionQueue) Tail of ;
- Release the lock acquired by the current thread ( By manipulating the state Value )
- Determine whether the current thread is in the synchronization queue (SyncQueue) in , If you are not there, you will use park Hang up .
- After the end of the cycle , Indicates that the synchronization queue is already in progress (SyncQueue) It's in , Wait to get the lock , Just go ahead .
Here we must distinguish the conditional queue from the synchronous queue !!
Condition queue / Waiting in line : namely Condition Queues
Synchronous queue :AQS Queues .
Following pair await It's an important way to read :
- addConditionWaiter() Method
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// Judge the tail node state , If it's cancelled , Clear all canceled nodes
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// Create a new node , The type is Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// Put the new node at the end of the waiting queue
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
addConditionWaiter The method shows that , Just create a type of Node.CONDITION And put it at the end of the conditional queue . At the same time, other conclusions can be drawn through this code :
- Inside the conditional queue Node, It's only used. thread、waitStatus、nextWaiter attribute ;
- A conditional queue is a one-way queue .
As a contrast , Here we compare the conditional queue with the synchronous queue :
AQS The synchronization queue is as follows :
And then look at Condition The condition queue of
waitStatus stay AQS It has been introduced in :
- The default state is 0;
- waitStatus > 0 (CANCELLED 1) Indicates that the node timed out or interrupted , Need to be removed from the queue ;
- waitStatus = -1 SIGNAL The status of the previous node of the current thread is SIGNAL, The current thread needs to be blocked (unpark);
- waitStatus = -2 CONDITION -2 : The node is currently in the conditional queue ;
- waitStatus = -3 PROPAGATE -3 :releaseShared It should be propagated to other nodes , Use in shared lock mode .
- fullyRelease Method (AQS)
final int fullyRelease(Node node) {
boolean failed = true;
try {
// Get the... Of the current node state
int savedState = getState();
// Release the lock
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
fullyRelease The way is by AQS Provided , First get the current state, And then call release Method to release the lock .
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release Method in AQS It is introduced in detail in . Its main function is to release the lock , And it's important to note that :
- fullyRelease All locks will be released at one time , So no matter how many times you re-enter , It's all going to be released here .
- An exception will be thrown here , Mainly when the lock release fails , It's going to be finally Set the node status to Node.CANCELLED.
- isOnSyncQueue(node)
Through the above process , The node has been placed in Condition queue And released what was held lock , And then it will hang up and block , until signal Wake up the . However, when suspending, make sure that the node is no longer in the synchronization queue (SyncQueue) You can only hang if you win .
final boolean isOnSyncQueue(Node node) {
// The current node is a conditional queue node , Or the last node is empty
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
// Go through it from the tail
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
If a node ( Always a node that was initially placed in the conditional queue ) Now waiting to retrieve on the synchronization queue , Then return to true.
The main function of this code is to determine whether the node is in the synchronization queue , If it's not in the synchronization queue , It will be called later park Block the current thread . There's a question here :AQS Synchronization queue and Condition The conditional queue should be irrelevant , Why is it necessary to ensure that the node is not in the synchronization queue before blocking ? because signal perhaps signalAll After waking up the node , The node will be placed in the synchronization queue .
The thread has been blocked here , When other threads call signal perhaps signalAll when , The current thread will be awakened .
It then verifies if the current thread is awakened due to an interrupt , It is assumed that there is no interruption . that while Cyclic isOnSyncQueue(Node node) Must return to true , Indicates that the current node is already in the synchronization queue .
Later, it will call acquireQueued(node, savedState) To get the lock .
final boolean acquireQueued(final Node node, int arg) {
// Whether or not to get resources
boolean failed = true;
try {
// Interrupt state
boolean interrupted = false;
// Infinite loop
for (;;) {
// The node before the current node
final Node p = node.predecessor();
// The previous node is the head node , Description the current node is Head of the node next That's the real first data node ( because head It's a virtual node )
// And then try to get resources
if (p == head && tryAcquire(arg)) {
// After success Point the head pointer to the current node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// p It's not a head node , perhaps The head node failed to get the resource ( It is preempted by other nodes under unfair circumstances )
// Judge node Is it going to be blocked , If you don't get the lock, you're stuck
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Here is the AQS Logic. , You can also read AQS About .
- Continuously obtain whether the previous node of this node is head, because head It's a virtual node , If the previous node of the current node is head node , Then the current node is
The first data node >
;- The first data node keeps getting resources , To be successful , Will head Point to the current node ;
- The current node is not a head node , perhaps
tryAcquire(arg)
Failure ( Failure may be unfair lock ). At this time, it is necessary to determine the state of the previous nodeWhether the current node should be blocked
( Whether the state of the previous node is SIGNAL).
It is worth noting that , When the node is placed in AQS When the synchronization queue of , It's also about competing for resources , Simultaneous setting savedState
Value , This value represents the number of reentries released when the lock was first released .
The overall flow chart is as follows :
signal
public final void signal() {
// Whether the thread is currently held
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// firstWaiter The header node points to the next node in the header of the conditional queue
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// Disconnect the original head node from the synchronization queue
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// Determine whether the node has been canceled before
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// call enq Add to The tail of the synchronization queue
Node p = enq(node);
int ws = p.waitStatus;
// node The previous node of It is amended as follows SIGNAL So you can wake yourself up later
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
enq You can also read AQS Code for
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// The tail node is empty The header node needs to be initialized , At this point, the head and tail nodes are
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// Not empty Loop assignment
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
adopt enq Method to place the node in AQS After the synchronization queue of , To put node Of the previous node waitStatus Set to Node.SIGNAL.signalAll The code is similar to .
summary
Q&A
Q: Condition and AQS What does it matter ?
A: Condition Is based on AQS Realized ,Condition Implementation class of ConditionObject yes AQS An inner class , Share a part of it AQS The logic of .
Q: Condition What is the implementation principle of ?
A: Condition Internal maintenance of a conditional queue , In the case of acquiring a lock , Thread calls await, Threads are placed in the conditional queue and blocked . Until the call signal、signalAll Wake up the thread , After that thread wakes up , It will be put into AQS The synchronization queue of , Participate in the competition for lock resources .
Q: Condition Waiting queue and AQS What's the difference and connection between synchronization queues of ?
A: Condition The waiting queue for is a one-way linked list ,AQS It's a two-way list . There is no clear link between the two . Only after the node is awakened from the blocking state , Will move from the waiting queue to the synchronization queue .
Conclusion
This article is mainly about reading Condition Related code , However, the logic such as thread interrupt is omitted . Interested partners . Can be more in-depth study of the relevant source code .