Thread and scheduler in reactor

floated 2020-11-11 09:44:22
thread scheduler reactor


brief introduction

Today we are going to introduce Reactor Multithreading model and timer model in ,Reactor We've already introduced , It's actually an extension of the observer pattern .

So essentially ,Reactor It's not about multithreading . You can use it in multithreading or not in multithreading .

Today, I will introduce you how to do it in Reactor Multithreading and timer model are used in .

Thread Multithreading

Let's take a look at the Flux An example of the creation of :

 Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});
flux.subscribe(System.out::println);

You can see , Whether it's Flux generator still subscriber, They are actually running in the same thread .

If we want to subscribe Occurs in a new thread , We need to start a new thread , And then within the thread subscribe operation .

 Mono<String> mono = Mono.just("hello ");
Thread t = new Thread(() -> mono
.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
);
t.start();
t.join();

In the example above ,Mono Create in main thread , and subscribe It happened in the newly launched Thread in .

Schedule Timer

In many cases , our publisher It is necessary to call some methods regularly , To produce elements .Reactor Provides a new Schedule Class is responsible for the generation and management of timed tasks .

Scheduler It's an interface :

public interface Scheduler extends Disposable 

It defines some methods that must be implemented in timers :

For example, immediately executed :

Disposable schedule(Runnable task);

Delayed execution :

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

And regularly :

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

Schedule There is a tool class called Schedules, It provides multiple creation Scheduler Methods , Its essence is to ExecutorService and ScheduledExecutorService encapsulate , Think of it as Supplier To create Schedule.

Just look at it Schedule That's right ExecutorService Encapsulation .

Schedulers Tool class

Schedulers Tool classes provide many useful tool classes , Let's introduce in detail :

Schedulers.immediate():

The submitted Runnable Will immediately execute in the current thread .

Schedulers.single():

Use the same thread to perform all tasks .

Schedulers.boundedElastic():

Create a reusable thread pool , If the thread in the thread pool has not been used for a long time , Then it will be recycled .boundedElastic There will be a maximum number of threads , Generally speaking, it is CPU cores x 10. If there is no available worker Threads , The submitted task will be put in the queue to wait .

Schedulers.parallel():

Create a fixed number of worker threads , Sum of numbers CPU The kernel correlation of .

Schedulers.fromExecutorService(ExecutorService):

Create from an existing thread pool Scheduler.

Schedulers.newXXX:

Schedulers It provides a lot of new Opening method , To create all kinds of Scheduler.

Let's look at one Schedulers Specific application of , We can specify specific Scheduler To produce elements :

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

publishOn and subscribeOn

publishOn and subscribeOn It is mainly used for switching Scheduler Execution context .

Let's start with a conclusion , In chain calls ,publishOn Switchable Scheduler, however subscribeOn It doesn't work .

It's because of the real publish-subscribe The relationship is only in subscriber Start subscribe It's time to set up .

Let's take a look at the usage of these two methods :

publishOn

publishOn Can be in the chain call process , Conduct publish Handoff :

 @Test
public void usePublishOn() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i + ":"+ Thread.currentThread())
.publishOn(s)
.map(i -> "value " + i+":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
System.out.println(Thread.currentThread());
Thread.sleep(5000);
}

Above we created a name called parallel-scheduler Of scheduler.

And then I created one Flux,Flux First made a map operation , Then switch the execution context to parallel-scheduler, Finally, right executed once map operation .

Last , We use a new thread to do subscribe Output .

Let's look at the output first :

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

You can see , The name of the main thread is Thread.Subscriber The name of the thread is ThreadA.

So in publishOn Before ,map The thread used is ThreadA. And in the publishOn after ,map The thread used is switched to parallel-scheduler Thread pool .

subscribeOn

subscribeOn It's used to switch Subscriber Execution context , No matter subscribeOn Which part of the call chain appears , It will eventually apply to the entire call chain .

Let's look at an example :

 @Test
public void useSubscribeOn() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i + ":" + Thread.currentThread())
.subscribeOn(s)
.map(i -> "value " + i + ":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
Thread.sleep(5000);
}

alike , In the example above , We used two map, And then in two map Used a subscribeOn To switch subscribe Execution context .

Look at the output :

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

You can see , Either way map, It's all used. It's switched parallel-scheduler.

Examples of this article learn-reactive

The author of this article :flydean Program those things

Link to this article :http://www.flydean.com/reactor-thread-scheduler/

In this paper, the source :flydean The blog of

Welcome to my official account. :「 Program those things 」 The most popular interpretation , The deepest dry goods , The most concise tutorial , There are so many tricks you don't know about waiting for you to discover !

版权声明
本文为[floated]所创,转载请带上原文链接,感谢

  1. [front end -- JavaScript] knowledge point (IV) -- memory leakage in the project (I)
  2. This mechanism in JS
  3. Vue 3.0 source code learning 1 --- rendering process of components
  4. Learning the realization of canvas and simple drawing
  5. gin里获取http请求过来的参数
  6. vue3的新特性
  7. Get the parameters from HTTP request in gin
  8. New features of vue3
  9. vue-cli 引入腾讯地图(最新 api,rocketmq原理面试
  10. Vue 学习笔记(3,免费Java高级工程师学习资源
  11. Vue 学习笔记(2,Java编程视频教程
  12. Vue cli introduces Tencent maps (the latest API, rocketmq)
  13. Vue learning notes (3, free Java senior engineer learning resources)
  14. Vue learning notes (2, Java programming video tutorial)
  15. 【Vue】—props属性
  16. 【Vue】—创建组件
  17. [Vue] - props attribute
  18. [Vue] - create component
  19. 浅谈vue响应式原理及发布订阅模式和观察者模式
  20. On Vue responsive principle, publish subscribe mode and observer mode
  21. 浅谈vue响应式原理及发布订阅模式和观察者模式
  22. On Vue responsive principle, publish subscribe mode and observer mode
  23. Xiaobai can understand it. It only takes 4 steps to solve the problem of Vue keep alive cache component
  24. Publish, subscribe and observer of design patterns
  25. Summary of common content added in ES6 + (II)
  26. No.8 Vue element admin learning (III) vuex learning and login method analysis
  27. Write a mini webpack project construction tool
  28. Shopping cart (front-end static page preparation)
  29. Introduction to the fluent platform
  30. Webpack5 cache
  31. The difference between drop-down box select option and datalist
  32. CSS review (III)
  33. Node.js学习笔记【七】
  34. Node.js learning notes [VII]
  35. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  36. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  37. 【JQuery框架,Java编程教程视频下载
  38. [jQuery framework, Java programming tutorial video download
  39. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  40. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  41. 【Vue,阿里P8大佬亲自教你
  42. 【Vue基础知识总结 5,字节跳动算法工程师面试经验
  43. [Vue, Ali P8 teaches you personally
  44. [Vue basic knowledge summary 5. Interview experience of byte beating Algorithm Engineer
  45. 【问题记录】- 谷歌浏览器 Html生成PDF
  46. [problem record] - PDF generated by Google browser HTML
  47. 【问题记录】- 谷歌浏览器 Html生成PDF
  48. [problem record] - PDF generated by Google browser HTML
  49. 【JavaScript】查漏补缺 —数组中reduce()方法
  50. [JavaScript] leak checking and defect filling - reduce() method in array
  51. 【重识 HTML (3),350道Java面试真题分享
  52. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  53. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  54. [re recognize HTML (3) and share 350 real Java interview questions
  55. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  56. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  57. 【重识 HTML ,nginx面试题阿里
  58. 【重识 HTML (4),ELK原来这么简单
  59. [re recognize HTML, nginx interview questions]
  60. [re recognize HTML (4). Elk is so simple