After reading AQS, the condition principle can't be less!

Liu Zhihang 2020-11-08 23:46:56
reading aqs condition principle


Preface


Introducing AQS when , There is an inner class called ConditionObject, There was no introduction , And when reading the source code later , You'll find that many places use Condition , You'll be surprised , This Condition What's the use ? Just read today Condition Source code , And then I can figure out Condition What do you do ? Of course, I hope you have read this article AQS、ReentrantLock as well as LockSupport Or have a certain understanding of ( Of course, you can also jump to the end of the article to see the summary ).


official account :『 Liu Zhihang 』, Record the skills in work study 、 Development and source notes ; From time to time to share some of the life experience . You are welcome to guide !

Introduce

Object The monitor method of :wait、notify、notifyAll It should be no stranger , In the multithreading scenario , Must be used first synchronized Get lock , Then you can call Object Of wait、notify.

Condition Use , Equivalent to using Lock To replace the synchronized, And then use Condition Replace Object The monitor method of .

Conditions( Also known as conditional queue or conditional variable ) Provides a pause for a thread ( wait for ), Until another thread notifies the blocked thread , Some state conditions may now be true .

Because accessing this shared state information occurs in different threads , So it has to be protected , So some form of lock will be used . The key property provided by the wait condition is that it releases the associated lock atomically , And suspend the current thread , It's like Object.wait equally .

Condition Instances are essentially bound to locks . In order to obtain Condition example , In general use Lock Example of newCondition() Method .

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

Basic use

class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition Instances depend on lock example
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putPtr, takePtr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// put To judge whether it is full
// And the thread is in notFull Conditionally, the queue is blocked
while (count == items.length) {
notFull.await();
}
items[putPtr] = x;
if (++putPtr == items.length) {
putPtr = 0;
}
++count;
// put After success , There are elements in the queue
// Wake up in notEmpty Conditionally queued blocked threads
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
// take when , Found empty
// And the thread is in notEmpty Queue blocking on condition of
while (count == 0) {
notEmpty.await();
}
Object x = items[takePtr];
if (++takePtr == items.length) {
takePtr = 0;
}
--count;
// take success , The queue can't be full
// Wake up in notFull Conditionally queued blocked threads
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

Above is an example of official documentation , A simple BlockingQueue , Understand here , Will find Synchronous queue This logic is used in many places in . The necessary code descriptions have been annotated in the code .

Question question

  1. Condition and AQS What does it matter ?
  2. Condition What is the implementation principle of ?
  3. Condition Waiting queue and AQS What's the difference and connection between synchronization queues of ?

Source code analysis

The basic structure

condition-uml-rMKuf3

adopt UML It can be seen that ,Condition It's just an abstract class , Its main implementation logic is in AQS The inner class of ConditionObject Realized . The following is mainly from await and signal Two ways to start , Learn from source code ConditionObject.

establish Condition

Lock lock = new ReentrantLock();
Condition con = lock.newCondition();

In general use lock.newCondition() Create conditional variables .

public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public Condition newCondition() {
return sync.newCondition();
}
// Sync Integrate AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
}

What we use here is ReentrantLock Source code , It's called sync.newCondition(),Sync Inherit AQS, In fact, we created a AQS Internal class ConditionObject Example .

What needs to be noted here is lock Every call lock.newCondition() There will be a new ConditionObject Instance generation , That is to say one lock You can create multiple Condition example .

Condition Parameters

/** The first node in the conditional queue */
private transient Node firstWaiter;
/** The last node in the conditional queue */
private transient Node lastWaiter;

await Method

await Method , Will cause the current thread to wait , Until a signal is received or interrupted .

With this Condition The associated lock is released by the atom , And for thread scheduling purposes , The current thread is disabled , And it's dormant , Until one of the following four happens :

  1. Other threads call this Condition Of signal Method , The current thread is chosen to wake up ;
  2. Other threads call this Condition Of signalAll Method ;
  3. Other threads interrupt the current thread , And support interrupt thread suspend ;
  4. False arousal occurs .

In all cases , Before this method can return , The current thread must reacquire the lock associated with this condition . When the thread returns , This lock can be guaranteed to remain .

Now let's look at AQS Internal implementation logic :

public final void await() throws InterruptedException {
// In response to interrupt
if (Thread.interrupted())
throw new InterruptedException();
// Add to end of condition queue ( Waiting in line )
// Inside will create Node.CONDITION Type of Node
Node node = addConditionWaiter();
// Release the lock acquired by the current thread ( By manipulating the state Value )
// If the lock is released, it will be blocked and suspended
int savedState = fullyRelease(node);
int interruptMode = 0;
// The node is no longer in the synchronization queue , Call park Let it hang in the waiting queue
while (!isOnSyncQueue(node)) {
// call park Block suspends the current thread
LockSupport.park(this);
// explain signal Called or thread interrupted , Check the wake-up reason
// If the terminal is awakened , And then jump out of the loop
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// while The loop ends , Threads start to lock
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// Unified handling of interrupts
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

await The method steps are as follows :

  1. establish Node.CONDITION Type of Node And add it to the conditional queue (ConditionQueue) Tail of ;
  2. Release the lock acquired by the current thread ( By manipulating the state Value )
  3. Determine whether the current thread is in the synchronization queue (SyncQueue) in , If you are not there, you will use park Hang up .
  4. After the end of the cycle , Indicates that the synchronization queue is already in progress (SyncQueue) It's in , Wait to get the lock , Just go ahead .

Here we must distinguish the conditional queue from the synchronous queue !!

Condition queue / Waiting in line : namely Condition Queues
Synchronous queue :AQS Queues .

Following pair await It's an important way to read :

  • addConditionWaiter() Method
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// Judge the tail node state , If it's cancelled , Clear all canceled nodes
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// Create a new node , The type is Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// Put the new node at the end of the waiting queue
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

addConditionWaiter The method shows that , Just create a type of Node.CONDITION And put it at the end of the conditional queue . At the same time, other conclusions can be drawn through this code :

  1. Inside the conditional queue Node, It's only used. thread、waitStatus、nextWaiter attribute ;
  2. A conditional queue is a one-way queue .

As a contrast , Here we compare the conditional queue with the synchronous queue :

condition-node-7yUQjE

AQS The synchronization queue is as follows :

condition-aqs-n5Fs85

And then look at Condition The condition queue of

condition-condition-A97bUS

waitStatus stay AQS It has been introduced in :

  1. The default state is 0;
  2. waitStatus > 0 (CANCELLED 1) Indicates that the node timed out or interrupted , Need to be removed from the queue ;
  3. waitStatus = -1 SIGNAL The status of the previous node of the current thread is SIGNAL, The current thread needs to be blocked (unpark);
  4. waitStatus = -2 CONDITION -2 : The node is currently in the conditional queue ;
  5. waitStatus = -3 PROPAGATE -3 :releaseShared It should be propagated to other nodes , Use in shared lock mode .
  • fullyRelease Method (AQS)
final int fullyRelease(Node node) {
boolean failed = true;
try {
// Get the... Of the current node state
int savedState = getState();
// Release the lock
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

fullyRelease The way is by AQS Provided , First get the current state, And then call release Method to release the lock .

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

release Method in AQS It is introduced in detail in . Its main function is to release the lock , And it's important to note that :

  1. fullyRelease All locks will be released at one time , So no matter how many times you re-enter , It's all going to be released here .
  2. An exception will be thrown here , Mainly when the lock release fails , It's going to be finally Set the node status to Node.CANCELLED.
  • isOnSyncQueue(node)

Through the above process , The node has been placed in Condition queue And released what was held lock , And then it will hang up and block , until signal Wake up the . However, when suspending, make sure that the node is no longer in the synchronization queue (SyncQueue) You can only hang if you win .

final boolean isOnSyncQueue(Node node) {
// The current node is a conditional queue node , Or the last node is empty
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
// Go through it from the tail
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

If a node ( Always a node that was initially placed in the conditional queue ) Now waiting to retrieve on the synchronization queue , Then return to true.

The main function of this code is to determine whether the node is in the synchronization queue , If it's not in the synchronization queue , It will be called later park Block the current thread . There's a question here :AQS Synchronization queue and Condition The conditional queue should be irrelevant , Why is it necessary to ensure that the node is not in the synchronization queue before blocking ? because signal perhaps signalAll After waking up the node , The node will be placed in the synchronization queue .

The thread has been blocked here , When other threads call signal perhaps signalAll when , The current thread will be awakened .

It then verifies if the current thread is awakened due to an interrupt , It is assumed that there is no interruption . that while Cyclic isOnSyncQueue(Node node) Must return to true , Indicates that the current node is already in the synchronization queue .

Later, it will call acquireQueued(node, savedState) To get the lock .

final boolean acquireQueued(final Node node, int arg) {
// Whether or not to get resources
boolean failed = true;
try {
// Interrupt state
boolean interrupted = false;
// Infinite loop
for (;;) {
// The node before the current node
final Node p = node.predecessor();
// The previous node is the head node , Description the current node is Head of the node next That's the real first data node ( because head It's a virtual node )
// And then try to get resources
if (p == head && tryAcquire(arg)) {
// After success Point the head pointer to the current node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// p It's not a head node , perhaps The head node failed to get the resource ( It is preempted by other nodes under unfair circumstances )
// Judge node Is it going to be blocked , If you don't get the lock, you're stuck
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

Here is the AQS Logic. , You can also read AQS About .

  1. Continuously obtain whether the previous node of this node is head, because head It's a virtual node , If the previous node of the current node is head node , Then the current node is The first data node >;
  2. The first data node keeps getting resources , To be successful , Will head Point to the current node ;
  3. The current node is not a head node , perhaps tryAcquire(arg) Failure ( Failure may be unfair lock ). At this time, it is necessary to determine the state of the previous node Whether the current node should be blocked ( Whether the state of the previous node is SIGNAL).

It is worth noting that , When the node is placed in AQS When the synchronization queue of , It's also about competing for resources , Simultaneous setting savedState Value , This value represents the number of reentries released when the lock was first released .

The overall flow chart is as follows :

condition-await-q4EBQx

signal

public final void signal() {
// Whether the thread is currently held
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// firstWaiter The header node points to the next node in the header of the conditional queue
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// Disconnect the original head node from the synchronization queue
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// Determine whether the node has been canceled before
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// call enq Add to The tail of the synchronization queue
Node p = enq(node);
int ws = p.waitStatus;
// node The previous node of It is amended as follows SIGNAL So you can wake yourself up later
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

enq You can also read AQS Code for

private Node enq(final Node node) {
for (;;) {
Node t = tail;
// The tail node is empty The header node needs to be initialized , At this point, the head and tail nodes are
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// Not empty Loop assignment
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

adopt enq Method to place the node in AQS After the synchronization queue of , To put node Of the previous node waitStatus Set to Node.SIGNAL.signalAll The code is similar to .

summary

Q&A

Q: Condition and AQS What does it matter ?

A: Condition Is based on AQS Realized ,Condition Implementation class of ConditionObject yes AQS An inner class , Share a part of it AQS The logic of .

Q: Condition What is the implementation principle of ?

A: Condition Internal maintenance of a conditional queue , In the case of acquiring a lock , Thread calls await, Threads are placed in the conditional queue and blocked . Until the call signal、signalAll Wake up the thread , After that thread wakes up , It will be put into AQS The synchronization queue of , Participate in the competition for lock resources .

Q: Condition Waiting queue and AQS What's the difference and connection between synchronization queues of ?
A: Condition The waiting queue for is a one-way linked list ,AQS It's a two-way list . There is no clear link between the two . Only after the node is awakened from the blocking state , Will move from the waiting queue to the synchronization queue .

Conclusion

This article is mainly about reading Condition Related code , However, the logic such as thread interrupt is omitted . Interested partners . Can be more in-depth study of the relevant source code .

版权声明
本文为[Liu Zhihang]所创,转载请带上原文链接,感谢

  1. [front end -- JavaScript] knowledge point (IV) -- memory leakage in the project (I)
  2. This mechanism in JS
  3. Vue 3.0 source code learning 1 --- rendering process of components
  4. Learning the realization of canvas and simple drawing
  5. gin里获取http请求过来的参数
  6. vue3的新特性
  7. Get the parameters from HTTP request in gin
  8. New features of vue3
  9. vue-cli 引入腾讯地图(最新 api,rocketmq原理面试
  10. Vue 学习笔记(3,免费Java高级工程师学习资源
  11. Vue 学习笔记(2,Java编程视频教程
  12. Vue cli introduces Tencent maps (the latest API, rocketmq)
  13. Vue learning notes (3, free Java senior engineer learning resources)
  14. Vue learning notes (2, Java programming video tutorial)
  15. 【Vue】—props属性
  16. 【Vue】—创建组件
  17. [Vue] - props attribute
  18. [Vue] - create component
  19. 浅谈vue响应式原理及发布订阅模式和观察者模式
  20. On Vue responsive principle, publish subscribe mode and observer mode
  21. 浅谈vue响应式原理及发布订阅模式和观察者模式
  22. On Vue responsive principle, publish subscribe mode and observer mode
  23. Xiaobai can understand it. It only takes 4 steps to solve the problem of Vue keep alive cache component
  24. Publish, subscribe and observer of design patterns
  25. Summary of common content added in ES6 + (II)
  26. No.8 Vue element admin learning (III) vuex learning and login method analysis
  27. Write a mini webpack project construction tool
  28. Shopping cart (front-end static page preparation)
  29. Introduction to the fluent platform
  30. Webpack5 cache
  31. The difference between drop-down box select option and datalist
  32. CSS review (III)
  33. Node.js学习笔记【七】
  34. Node.js learning notes [VII]
  35. Vue Router根据后台数据加载不同的组件(思考->实现->不止于实现)
  36. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  37. 【JQuery框架,Java编程教程视频下载
  38. [jQuery framework, Java programming tutorial video download
  39. Vue Router根据后台数据加载不同的组件(思考->实现->不止于实现)
  40. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  41. 【Vue,阿里P8大佬亲自教你
  42. 【Vue基础知识总结 5,字节跳动算法工程师面试经验
  43. [Vue, Ali P8 teaches you personally
  44. [Vue basic knowledge summary 5. Interview experience of byte beating Algorithm Engineer
  45. 【问题记录】- 谷歌浏览器 Html生成PDF
  46. [problem record] - PDF generated by Google browser HTML
  47. 【问题记录】- 谷歌浏览器 Html生成PDF
  48. [problem record] - PDF generated by Google browser HTML
  49. 【JavaScript】查漏补缺 —数组中reduce()方法
  50. [JavaScript] leak checking and defect filling - reduce() method in array
  51. 【重识 HTML (3),350道Java面试真题分享
  52. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  53. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  54. [re recognize HTML (3) and share 350 real Java interview questions
  55. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  56. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  57. 【重识 HTML ,nginx面试题阿里
  58. 【重识 HTML (4),ELK原来这么简单
  59. [re recognize HTML, nginx interview questions]
  60. [re recognize HTML (4). Elk is so simple