Reactor: in depth understanding of reactor core

floated 2020-11-09 10:49:35
reactor depth understanding reactor core

brief introduction

In the last article, we briefly introduced Reactor The history of development and the basic Flux and Mono Use , This article will further explore Reactor Advanced usage of , Let's see .

Customize Subscriber

In the previous article we mentioned 4 individual Flux Of subscribe Methods :

Disposable subscribe();
Disposable subscribe(Consumer<? super T> consumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

These four methods , We need to use lambda Expressions come from definitions consumer,errorConsumer,completeSonsumer and subscriptionConsumer The four Consumer.

It's more complicated to write , It doesn't seem convenient either , Let's think about it , The four Consumer Is it right? Subscriber Defined in interface 4 There is a one-to-one correspondence ?

 public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();

Right , So we have a simpler one subscribe Method :

public final void subscribe(Subscriber<? super T> actual)

This subscribe Method directly receives a Subscriber class . All the functions are realized .

Write it yourself Subscriber It's too troublesome ,Reactor It provides us with a BaseSubscriber Class , It has achieved Subscriber All the functions in , There are other ways to do it .

Let's take a look at BaseSubscriber The definition of :

public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,

Be careful ,BaseSubscriber It's a single use , That means , If it first subscription To Publisher1, then subscription To Publisher2, Then it will be cancelled for the first Publisher The subscription .

because BaseSubscriber Is an abstract class , So we need to inherit it , And rewrite the methods we need to implement ourselves .

Let's take a look at a custom Subscriber:

public class CustSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
public void hookOnNext(T value) {

BaseSubscriber There are a lot of them with hook Opening method , These methods can be rewritten , and Subscriber Natively defined on Opening method , stay BaseSubscriber All of them are final Of , They can't be rewritten .

Let's look at a definition :

public final void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
try {
catch (Throwable throwable) {
onError(Operators.onOperatorError(s, throwable, currentContext()));

You can see , It actually calls hook Methods .

above CustSubscriber in , We rewrite two methods , One is hookOnSubscribe, Call when creating a subscription , One is hookOnNext, Upon receipt of onNext Call when signal .

In these methods , It gives us enough room for customization , In the above example, we called request(1), Indicates that another element is requested .

Other hook There are ways : hookOnComplete, hookOnError, hookOnCancel and hookFinally.

Backpressure Handle

We talked about ,reactive stream The biggest feature of is that it can handle Backpressure.

What is? Backpressure Well ? Is that when consumer When you can't handle it , You can inform producer To reduce production speed .

Let's take a look at BaseSubscriber In the default hookOnSubscribe Realization :

 protected void hookOnSubscribe(Subscription subscription){

You can see that the default is request An infinite number of values . That is to say, by default, there is no Backpressure.

By rewriting hookOnSubscribe Method , We can customize the processing speed .

except request outside , We can still do that publisher Medium limit subscriber The speed of .

 public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));

stay Flux in , We have a limitRate Method , You can set publisher The speed of .

such as subscriber request(100), And then we set up limitRate(10), So at most producer One time only produces 10 Elements .

establish Flux

Next , We're going to explain how to create Flux, Generally speaking, there are 4 Methods to create Flux.

Use generate

The first method is the simplest synchronous creation generate.

Let's look at an example :

 public void useGenerate(){
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {"3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;

Output results :

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

In the example above , We use generate Method to generate elements synchronously .

generate Receive two parameters :

 public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)

The first parameter is stateSupplier, Used to specify the state of initialization .

The second parameter is a generator, For consumption SynchronousSink, And generate new states .

In the example above , Every time we will state+1, All the way up to 10.

And then use subscribe To output all the generated elements .

Use create

Flux There is also a create Method to create Flux,create It can be synchronous or asynchronous , And support multithreading operation .

because create There is no initial state state , So it can be used in multithreading .

create One of the most useful things about this is that you can use third-party asynchronism API and Flux Connect , for instance , We have a custom EventProcessor, When dealing with the corresponding event , Will call to register to Processor Medium listener Some of the ways .

 interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();

How do we make this Listener Response behavior and Flux It's connected ?

 public void useCreate(){
EventProcessor myEventProcessor = new EventProcessor();
Flux<String> bridge = Flux.create(sink -> {
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {;
public void processComplete() {

Use create That's enough ,create Receive one consumer Parameters :

 public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

This consumer The essence is to consume FluxSink object .

The above example is in MyEventListener In the event of FluxSink Object to consume .

Use push

push and create equally , It also supports asynchronous operations , But only one thread can call at the same time next, complete perhaps error Method , So it's single threaded .

Use Handle

Handle Different from the three methods above , It's an instance method .

It and generate Is very similar , It's also consumption SynchronousSink object .

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

The difference is that its parameter is a BiConsumer, There is no return value .

Let's look at an example of using :

 public void useHandle(){
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null);
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;

Examples of this article learn-reactive

The author of this article :flydean Program those things

Link to this article :

In this paper, the source :flydean The blog of

Welcome to my official account. :「 Program those things 」 The most popular interpretation , The deepest dry goods , The most concise tutorial , There are so many tricks you don't know about waiting for you to discover !


  1. [front end -- JavaScript] knowledge point (IV) -- memory leakage in the project (I)
  2. This mechanism in JS
  3. Vue 3.0 source code learning 1 --- rendering process of components
  4. Learning the realization of canvas and simple drawing
  5. gin里获取http请求过来的参数
  6. vue3的新特性
  7. Get the parameters from HTTP request in gin
  8. New features of vue3
  9. vue-cli 引入腾讯地图(最新 api,rocketmq原理面试
  10. Vue 学习笔记(3,免费Java高级工程师学习资源
  11. Vue 学习笔记(2,Java编程视频教程
  12. Vue cli introduces Tencent maps (the latest API, rocketmq)
  13. Vue learning notes (3, free Java senior engineer learning resources)
  14. Vue learning notes (2, Java programming video tutorial)
  15. 【Vue】—props属性
  16. 【Vue】—创建组件
  17. [Vue] - props attribute
  18. [Vue] - create component
  19. 浅谈vue响应式原理及发布订阅模式和观察者模式
  20. On Vue responsive principle, publish subscribe mode and observer mode
  21. 浅谈vue响应式原理及发布订阅模式和观察者模式
  22. On Vue responsive principle, publish subscribe mode and observer mode
  23. Xiaobai can understand it. It only takes 4 steps to solve the problem of Vue keep alive cache component
  24. Publish, subscribe and observer of design patterns
  25. Summary of common content added in ES6 + (II)
  26. No.8 Vue element admin learning (III) vuex learning and login method analysis
  27. Write a mini webpack project construction tool
  28. Shopping cart (front-end static page preparation)
  29. Introduction to the fluent platform
  30. Webpack5 cache
  31. The difference between drop-down box select option and datalist
  32. CSS review (III)
  33. Node.js学习笔记【七】
  34. Node.js learning notes [VII]
  35. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  36. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  37. 【JQuery框架,Java编程教程视频下载
  38. [jQuery framework, Java programming tutorial video download
  39. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  40. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  41. 【Vue,阿里P8大佬亲自教你
  42. 【Vue基础知识总结 5,字节跳动算法工程师面试经验
  43. [Vue, Ali P8 teaches you personally
  44. [Vue basic knowledge summary 5. Interview experience of byte beating Algorithm Engineer
  45. 【问题记录】- 谷歌浏览器 Html生成PDF
  46. [problem record] - PDF generated by Google browser HTML
  47. 【问题记录】- 谷歌浏览器 Html生成PDF
  48. [problem record] - PDF generated by Google browser HTML
  49. 【JavaScript】查漏补缺 —数组中reduce()方法
  50. [JavaScript] leak checking and defect filling - reduce() method in array
  51. 【重识 HTML (3),350道Java面试真题分享
  52. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  53. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  54. [re recognize HTML (3) and share 350 real Java interview questions
  55. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  56. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  57. 【重识 HTML ,nginx面试题阿里
  58. 【重识 HTML (4),ELK原来这么简单
  59. [re recognize HTML, nginx interview questions]
  60. [re recognize HTML (4). Elk is so simple