Reactive programming with reactor mono and flux

ZhaoYingChao88 2020-11-13 02:12:36
reactive programming reactor mono flux


Official website :https://projectreactor.io/

course :https://projectreactor.io/docs/core/release/reference/#about-doc

Reactor The type of


Reactor There are two types of ,Flux<T> and Mono<T>.Flux similar RaxJava Of Observable, It can trigger zero to many events , And end processing or trigger error according to actual situation .

Mono Trigger at most one event , It goes with RxJava Of Single and Maybe similar , So you can put Mono<Void> Used to notify when an asynchronous task completes .

Because of the simple difference between the two types , We can easily differentiate between responsive API The type of : From the type of return, we can know that a method will “ Launch and forget ” or “ Ask and wait ”(Mono), Or processing a stream with multiple data items (Flux).

Flux and Mono Some of the operations take advantage of this feature to switch between the two types . for example , call Flux<T> Of single() Method will return a Mono<T>, While using concatWith() How to put two Mono You can get one by stringing together Flux. Similarly , Some operations are right Mono It doesn't make sense ( for example take(n) You'll get n>1 Result ), And some operations only work on Mono That makes sense ( for example or(otherMono)).

Reactor One of the principles of design is to keep API To simplify , And the separation of these two response types , It's expressiveness and API The tradeoff between ease of use .

The idea of reactive programming :


Reactive programming framework mainly adopts observer mode , and SpringReactor At its core is an extension of the observer model . About the observed in the structure of the observer pattern (Observable) And the observer (Subscriber) When in a different threading environment , Because of their different workload , Causing them to generate events is not the same as processing them , There are two kinds of situations :

The observed event is slower , Observers deal with events quickly . Then the observer will wait for the observed to send the event ,( It's like an observer cooking the same rice , Procedures for , There's no problem with that ).
The speed of the observed events is very fast , And observers are slow to deal with it . Then there's a problem , If not dealt with , Events will pile up , Finally, your memory will burst , Cause the program to crash .( It's like the rice produced by the observed is not eaten , The pile will rot ).

For the convenience of the following understanding Mono and Flux, It can also be understood as Publisher( The publisher can also be understood as the observed ) Actively push data to Subscriber( Subscribers can also be called observers ), If Publisher It's too fast , More than the Subscriber Processing speed of , How to deal with it . That's when it comes Backpressure( Back pressure ----- In an asynchronous scenario , When the observed sends events much faster than the observer's processing speed , A strategy that tells the upstream observer to slow down the transmission speed )


Reactor Main classes :


stay Reactor in , Not many classes are often used , There are mainly the following two :

Mono  Realized  org.reactivestreams.Publisher  Interface , representative 0 To 1 Publishers of elements (Publisher).
Flux  It's also implemented  org.reactivestreams.Publisher  Interface , representative 0 To N Publishers of elements (Subscriber).
Classes that might be used :

Scheduler  Represents the scheduler behind the reactive flow , Usually implemented by various thread pools .

Reactive Streams、Srping Reactor and Spring Flux(Web Flux) The relationship between


Reactive Streams Is the specification

Reactor Realized Reactive Streams.

Web Flux With Reactor Based on , Realization Web Domain reactive programming framework

Reactive programming (Reactive Programming) This new programming paradigm is becoming more and more popular with developers . stay Java What's more popular in the community is RxJava and RxJava 2. This article will introduce another new reactive programming library Reactor.

Responsive Manifesto

I don't know if it's a tribute to the Agile Manifesto , The responsive manifesto also includes 4 Group key words :

Java

  • Responsive: Responsive . The system is required to be able to respond in a timely manner at any time .

  • Resilient: Recoverable . Ask the system to make a mistake , Can also maintain responsiveness .

  • Elastic: scalable . The system is required to be able to maintain responsiveness under various loads .

  • Message Driven: Message driven . The system is required to connect components through asynchronous messages .

You can see , For any responsive system , The first thing to ensure is responsiveness , Otherwise, it's not a reactive system . In that sense , It's a blue screen Windows The system is obviously not a reactive system .

 

On the left is the traditional basis of Servlet Of Spring Web MVC frame , The right side is 5.0 The new version is based on Reactive Streams Of Spring WebFlux frame , From top to bottom is Router Functions,WebFlux,Reactive Streams Three new components .

  • Router Functions: For standard @Controller,@RequestMapping Wait for the standard Spring MVC annotation , Provide a functional style of API, Used to create Router,Handler and Filter.

  • WebFlux: Core components , Coordinate upstream and downstream components to provide responsive programming support .

  • Reactive Streams: A support back pressure (Backpressure) Asynchronous data stream processing standard of , Mainstream implementation has RxJava and Reactor,Spring WebFlux The default integration is Reactor. 

stay Web The choice of containers ,Spring WebFlux Both support like Tomcat,Jetty Such a traditional container ( The premise is to support Servlet 3.1 Non-Blocking IO API), And support like Netty,Undertow Asynchronous containers like that . No matter what kind of container ,Spring WebFlux Both input and output streams will be adapted to Flux<DataBuffer> Format , In order to have a unified treatment .

It is worth mentioning that , Except for the new Router Functions Interface ,Spring WebFlux At the same time, support the use of old Spring MVC Note statement Reactive Controller. And traditional MVC Controller Different ,Reactive Controller The operation is non blocking ServerHttpRequest and ServerHttpResponse, Instead of Spring MVC Inside HttpServletRequest and HttpServletResponse.

Spring 5 - Spring webflux

It's a new non blocking function Reactive Web frame , Can be used to create asynchronous , Non blocking , Event driven services , And it's very scalable . Block up ( ineluctable ) Style code migrated to functional non blocking Reactive Style code .

  • new spring-webflux modular , One is based on reactive Of spring-webmvc, Completely asynchronous non blocking , Designed to use enent-loop The execution model and the traditional thread pool model .

  • Reactive Description in spring-core Such as encoding and decoding

  • spring-core Related infrastructure , such as Encode and Decoder Can be used to encode and decode data streams ;DataBuffer have access to java ByteBuffer perhaps Netty ByteBuf;ReactiveAdapterRegistry It can provide transport layer support for related libraries .

  • stay spring-web The bag contains HttpMessageReade and HttpMessageWrite

 

Reactive Based on event driven ( Event mode or subscriber mode ), Be similar to Netty Asynchronous event programming model , Do different things for different events . All information is processed through a programming model , It's like water moving in a pipe ( Compare the event here to the current ) 

All the information is packaged into a Channel, This channel It's like flowing in a pipe , Processed by these processors in the pipeline .

For example, famous React Front frame fit redux Flow model , Wrap the information returned by the server as action Data flow , And then according to action To map to the page , The page follows action To change . Pages and data are the same thing in this pipeline , Being combed one by one , Exhibition

Reactor Do reactive programming

Introduction to reactive programming

Reactive programming comes from the spread of data flow and change , It means that the underlying execution model is responsible for automatically propagating changes through data flow . For example, evaluate a simple expression c=a+b, When a perhaps b When the value of changes , Traditional programming paradigms need to address a+b Recalculate to get c Value . If reactive programming is used , When a perhaps b When the value of changes ,c The value of will be updated automatically . Reactive programming was first developed by .NET On the platform Reactive Extensions (Rx) Library to implement . Later moved to Java After the platform came the famous RxJava library , And produced a lot of other programming languages on the corresponding implementation . On the basis of these implementations, the later reactive flow (Reactive Streams) standard . This specification defines the relevant interfaces of reactive flow , And will be integrated into Java 9 in .

In the traditional programming paradigm , We usually use iterators (Iterator) Pattern to traverse a sequence . This traversal is controlled by the caller , It's a pull . Each time by the caller through next() Method to get the next value in the sequence . The use of reactive flow is the push way , Common publishers - Subscriber pattern . When new data is generated by publishers , The data is pushed to subscribers for processing . Various operations can be added to the reaction flow to process the data , Form data processing chain . This processing chain, which is added in an explicit way, will only be implemented when the subscriber has subscription operations .

The first important concept in reactive flow is negative pressure (backpressure). In the basic message push mode , When the publisher of a message generates data too fast , It will make the processing speed of message subscribers unable to keep up with the speed of generation , Which puts a lot of pressure on subscribers . When the pressure is too high , It may cause the subscriber's own crash , The cascade effect may even cause the whole system to be paralyzed . The function of negative pressure is to provide a feedback channel from subscribers to producers . Subscribers can go through request() Method to declare the number of messages it can process at one time , And the producer will only produce the corresponding amount of messages , Until the next time request() Method call . It's actually a push-pull pattern .

Reactor brief introduction

aforementioned RxJava Kuo is JVM The pioneer of reactive programming , It is also the basis of reactive flow specification .RxJava 2 stay RxJava On the basis of a lot of updates . however RxJava There are also disadvantages in the library .RxJava Produced before reactive flow specifications , Although it can be converted with the interface of reactive flow , But because of the underlying implementation , It's not very intuitive to use .RxJava 2 Integration with specifications is considered in design and Implementation , But in order to keep up with RxJava The compatibility of , Many places are not intuitive when they are used .Reactor It is a library designed and implemented completely based on reactive flow specification , No, RxJava Such a historical burden , In use, it is more intuitive and easy to understand .Reactor It's also Spring 5 The basis of reactive programming in . Learn and master Reactor Can better understand Spring 5 Related concepts in .

stay Java Used in program Reactor The library is very simple , Just go through Maven or Gradle To add a pair of io.projectreactor:reactor-core You can rely on

Flux and Mono

Flux and Mono yes Reactor Two basic concepts in .Flux It means containing 0 To N Asynchronous sequence of elements . There are three different types of message notifications that can be included in this sequence : Normal message with element 、 End of sequence messages and sequence error messages . When a message notification is generated , Corresponding methods in subscribers onNext(), onComplete() and onError() Will be called .Mono It means containing 0 perhaps 1 Asynchronous sequence of elements . The sequence can also contain and Flux The same three types of message notifications .Flux and Mono Can be converted between . To a Flux Sequence count operation , The result is a Mono<Long> object . Take two. Mono Sequence merge , What you get is a Flux object .

Flux

launch 0 To N Asynchrony of elements " The emitter

 

  • Flux<T> It's a standard Publisher<T>, Express 0 To N Asynchronous sequence of emitters , Optionally to complete a signal or terminate in error . And Reactive Streams It's the same in the specification , These three types of signals are converted to... For downstream subscribers onNext、onComplete or onError Method call .
  • In this wide range of possible signals ,Flux It's universal reactive type . Be careful , All events , Even termination events , It's all optional : No, onNext event , however onComplete An event represents an empty finite sequence , But remove onComplete And you have an infinite empty sequence ( In addition to the tests about cancellation , Not particularly useful ). Again , Infinite sequences are not necessarily empty . for example ,Flux.interval(Duration) Produce a Flux<Long>, It's infinite , Send regular data from the clock .

Mono

launch 0 To 1 Asynchrony of elements " The emitter

 

  • Mono<T> It's a special Publisher<T>, It sends out at most one item , And then optionally with onComplete Signal or onError The signal is over .
  • It only provides for Flux A subset of the operators of , And some operators ( Especially those who will Mono Operator combined with another publisher ) Switch to Flux.
  • for example ,Mono#concatWith(Publisher) Return to one Flux , and Mono#then(Mono) Then go back to the other Mono.
  • Be careful ,Mono It can be used to express only the concept of completion ( Be similar to Runnable) No value asynchronous process . To create a , Please use Mono<Void>.

Create a function

  • create

Programmatically create... With multiple launch capabilities Flux,
Elements pass through FluxSink API In a synchronous or asynchronous manner .

eg:

 

 Flux.create((t) -> {
t.next("create");
t.next("create1");
t.complete();
}).subscribe(System.out::println);
  • generate

Create a... Programmatically Flux, adopt consumer Callbacks generate signals one by one ;generate in next It can only be adjusted 1 Time , Otherwise, an error will be reported reactor.core.Exceptions$ErrorCallbackNotImplemented

 

eg:

 

 Flux.generate(t -> {
t.next("generate");
// Be careful generate in next Can only call 1 Time
t.complete();
}).subscribe(System.out::println);
  • just

Create a Flux, It emits the provided elements , Then finish .

 

eg:

 

 // Single element
Flux.just("just").subscribe(System.out::println);
// Multiple elements
Flux.just("just", "just1", "just2").subscribe(System.out::println);
  • from

use Flux API To decorate with Publisher, adopt Publisher Create a Flux

 

eg:

 

 //Flux->Flux
Flux.from(Flux.just("just", "just1", "just2"))
.subscribe(System.out::println);
//Mono->Mono
Flux.from(Mono.just("just")).subscribe(System.out::println);
  • fromArray

Create a Flux, It issues the items contained in the provided array .

 

eg:

 

 Flux.fromArray(new String[] { "arr", "arr", "arr", "arr" })
.subscribe(System.out::println);
  • fromIterable

Create one by one Flux, It sends out the Iterable Items included in . Will be for each subscriber Create a new Iterable.

 

eg:

 

Set<String> v = new HashSet<>();
v.add("1");
v.add("2");
v.add("3");
Flux.fromIterable(() -> v.iterator()).subscribe(System.out::println);
  • fromStream

Create a Flux, It sends out the Stream Items included in . please remember ,Stream Can't be reused , It could be a problem . Multiple subscriptions or re subscriptions ( Such as repeat or retry)Stream yes closed Cancelled by the operator , To make a mistake or complete .

 

  • defer

Every time I get Flux Conduct Subscription when , Delay in providing Publisher, So the actual source instantiation is delayed , Until each subscription and Supplier You can create subscriber specific instances .
however , If the supplier does not generate a new instance , This operator will effectively start from Publisher Work .

 

eg:

 

Flux.defer(() -> Flux.just("just", "just1", "just2"))
.subscribe(System.out::println);
  • interval

Create a Flux, It uses 0 Start firing long value and increment
The time interval specified on the global timer . If demand doesn't come in time , One OnError Will be used to signal .IllegalStateException Specify the information that cannot be sent . Under normal circumstances ,Flux It will never be finished .

 

eg:

 

Flux.interval(Duration.of(500, ChronoUnit.MILLIS))
.subscribe(System.out::println);
// Prevent premature exit of the program , Let's play one. CountDownLatch stop
CountDownLatch latch = new CountDownLatch(1);
latch.await();
  • empty

Create a Flux, Complete without launching any projects .

 

eg:

 

Flux.empty().subscribe(System.out::println);
  • error

Create a Flux, It terminates with the specified error immediately after the subscription .

 

eg:

Flux.error(new RuntimeException()).subscribe(System.out::println);
  • never

Create a Flux, It will never send out any data 、 Error or completion signal .

eg:

Flux.never().subscribe(System.out::println);
  • range

Build a Flux, It will only send out one count A sequence of increasing integers , from start Start . in other words , stay start( contain ) and start + count( exclude ) Give an integer between , Then finish .

eg:

 

Flux.range(0, 100).subscribe(System.out::println);

 

establish Flux

There are many different ways to create Flux Sequence .

Flux Class static methods

The first way is through Flux Static methods in a class .

  • just(): You can specify all the elements contained in the sequence . created Flux The sequence will automatically end after these elements are published .
  • fromArray(),fromIterable() and fromStream(): From an array 、Iterable Object or Stream Object Flux object .
  • empty(): Create a without any elements , Just publish the sequence of the end message .
  • error(Throwable error): Create a sequence that contains only error messages .
  • never(): Create a sequence that does not contain any message notifications .
  • range(int start, int count): Create include from start Initial count Number of Integer Sequence of objects .
  • interval(Duration period) and interval(Duration delay, Duration period): Create an include from 0 Start incremental Long Sequence of objects . The contained elements are published at specified intervals . Apart from the interval , You can also specify the delay time before the start element is published .
  • intervalMillis(long period) and intervalMillis(long delay, long period): And interval() The method works the same , But this method uses milliseconds to specify the time interval and delay time .

Code list 1 Examples of the use of these methods are given in .

detailed list 1. adopt Flux Class static method creation Flux Sequence

1

2

3

4

5

6

Flux.just("Hello", "World").subscribe(System.out::println);

Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

Flux.empty().subscribe(System.out::println);

Flux.range(1, 10).subscribe(System.out::println);

Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

Flux.intervalMillis(1000).subscribe(System.out::println);

These static methods above are suitable for simple sequence generation , When complex logic is needed for sequence generation , Should be used generate() or create() Method .

generate() Method

generate() Methods are generated in a synchronous and one by one way Flux Sequence . The sequence is generated by calling the provided SynchronousSink Object's next(),complete() and error(Throwable) Method to complete . The meaning of generation by generation is in the specific generation logic ,next() Method can only be called at most once . In some cases , Sequence generation may be stateful , Some state objects are needed . You can use generate() Another form of method generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator), among stateSupplier Used to provide the initial state object . In sequence generation , The state object will act as generator The first parameter used is passed in , The state object can be modified in the corresponding logic for use in the next generation .

In the code list 2 in , The first sequence is generated by next() Method to produce a simple value , And then through complete() Method to end the sequence . If you don't call complete() Method , What is produced is an infinite sequence . The state object in the generation logic of the second sequence is a ArrayList object . The actual value produced is a random number . The resulting random number is added to ArrayList in . When there is 10 Number of hours , adopt complete() Method to end the sequence .

detailed list 2. Use generate() Method generation Flux Sequence

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Flux.generate(sink -> {

    sink.next("Hello");

    sink.complete();

}).subscribe(System.out::println);

 

 

final Random random = new Random();

Flux.generate(ArrayList::new, (list, sink) -> {

    int value = random.nextInt(100);

    list.add(value);

    sink.next(value);

    if (list.size() == 10) {

        sink.complete();

    }

    return list;

}).subscribe(System.out::println);

create() Method

create() Methods and generate() The difference in the method is that FluxSink object .FluxSink Support synchronous and asynchronous message generation , And you can generate multiple elements in one call . In the code list 3 in , In a single call, all of 10 Elements .

detailed list 3. Use create() Method generation Flux Sequence

1

2

3

4

5

6

Flux.create(sink -> {

    for (int i = 0; i < 10; i++) {

        sink.next(i);

    }

    sink.complete();

}).subscribe(System.out::println);

establish Mono

Mono The creation method of is the same as that described before Flux More similar .Mono Class also contains some and Flux The same static method in class . These methods include just(),empty(),error() and never() etc. . In addition to these methods ,Mono There are also some unique static methods .

  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable() and fromSupplier(): Respectively from the Callable、CompletionStage、CompletableFuture、Runnable and Supplier Created in Mono.
  • delay(Duration duration) and delayMillis(long duration): Create a Mono Sequence , After the specified delay time , Generate numbers 0 As the only value .
  • ignoreElements(Publisher<T> source): Create a Mono Sequence , Ignore as source Publisher All elements in , Only the end message .
  • justOrEmpty(Optional<? extends T> data) and justOrEmpty(T data): From a Optional The object may be null Object to create Mono. Only Optional Object contains value or object is not null when ,Mono The sequence produces the corresponding element .

You can also use create() Method to use MonoSink To create Mono. Code list 4 The creation of Mono Examples of sequences .

detailed list 4. establish Mono Sequence

1

2

3

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

The operator

and RxJava equally ,Reactor The power is that you can add a variety of different operators declaratively on a reactive flow . The following is a classification of the important operators .

buffer and bufferTimeout

The purpose of these two operators is to collect the elements in the current flow into the collection , And set objects as new elements in the stream . Different conditions can be specified when collecting : The maximum number of elements contained or the time interval between collections . Method buffer() Use only one condition , and bufferTimeout() You can specify two conditions at the same time . You can use... When specifying time intervals Duration Object or milliseconds , That is to use bufferMillis() or bufferTimeoutMillis() Two methods .

In addition to the number of elements and time intervals , You can also use bufferUntil and bufferWhile Operators to collect . The parameters of these two operators represent the conditions to be satisfied by the elements in each set Predicate object .bufferUntil It will be collected until Predicate Return to true. bring Predicate return true You can choose to add that element to the current collection or the next collection ;bufferWhile Only when the Predicate return true Only when you collect . Once the value is false, Will start the next collection immediately .

Code list 5 given buffer Examples of use of related operators . The first line of the statement outputs 5 Contains 20 Array of elements ; The second line outputs 2 One contains 10 Array of elements ; The third line of the statement outputs 5 Contains 2 Array of elements . Every time an even number is encountered, the current collection ends ; The fourth line outputs 5 Contains 1 Array of elements , The array contains only even numbers .

It should be noted that , In the code list 5 in , First, through toStream() Method to Flux Sequence to Java 8 Medium Stream object , Re pass forEach() Method to output . This is because sequence generation is asynchronous , And into Stream Object to ensure that the main thread does not exit until the sequence generation is complete , So you can correctly output all the elements in the sequence .

detailed list 5. buffer Examples of use of related operators

1

2

3

4

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

filter

Filter the elements contained in the convection , Just leave contentment Predicate The element that specifies the condition . Code list 6 The statement in the output is 1 To 10 All even numbers in .

detailed list 6. filter Examples of operator usage

1

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

window

window The function of the operator is similar to buffer, The difference is window The operator is to collect elements from the current stream into another Flux In sequence , So the return value type is Flux<Flux<T>>. In the code list 7 in , The output of the two lines is 5 And 2 individual UnicastProcessor character . This is because window The flow generated by the operator contains UnicastProcessor Class object , and UnicastProcessor Class toString The output of the method is UnicastProcessor character .

detailed list 7. window Examples of operator usage

1

2

Flux.range(1, 100).window(20).subscribe(System.out::println);

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

zipWith

zipWith The operator merges the elements in the current flow with those in another flow in a one-to-one way . You can do nothing when merging , So what we get is an element of type Tuple2 The flow of ; You can also pass a BiFunction Function to process the merged elements , The element type of the resulting stream is the return value of the function .

In the code list 8 in , The elements contained in the two flows are a,b and c,d. first zipWith The operator does not use the merge function , So the element type in the result stream is Tuple2; the second zipWith The operation changes the element type to... By combining functions String.

detailed list 8. zipWith Examples of operator usage

1

2

3

4

5

6

Flux.just("a", "b")

        .zipWith(Flux.just("c", "d"))

        .subscribe(System.out::println);

Flux.just("a", "b")

        .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

        .subscribe(System.out::println);

take

take The series operator is used to extract elements from the current stream . There are many ways to extract .

  • take(long n),take(Duration timespan) and takeMillis(long timespan): Extract... At a specified amount or interval .
  • takeLast(long n): Extract the last... In the stream N Elements .
  • takeUntil(Predicate<? super T> predicate): Extract elements until Predicate return true.
  • takeWhile(Predicate<? super T> continuePredicate): When Predicate return true Only when the extraction .
  • takeUntilOther(Publisher<?> other): Extract elements until another stream begins to produce elements .

In the code list 9 in , The first line of statements outputs numbers 1 To 10; The second line of the statement outputs numbers 991 To 1000; The third line of statements outputs numbers 1 To 9; The fourth line of the statement outputs numbers 1 To 10, bring Predicate return true The elements of are also included .

detailed list 9. take Examples of series operators

1

2

3

4

Flux.range(1, 1000).take(10).subscribe(System.out::println);

Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);

Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

reduce and reduceWith

reduce and reduceWith Operators accumulate all elements contained in the stream , Get a... That contains the results of the calculation Mono Sequence . The cumulative operation is through a BiFunction To represent the . You can specify an initial value during operation . If there is no initial value , The first element of the sequence is the initial value .

In the code list 10 in , The first line of statements adds elements in the stream , The result is 5050; The second line is also an addition operation , But through a Supplier The initial value is given as 100, So the result is 5150.

detailed list 10. reduce and reduceWith Examples of operator usage

1

2

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

merge and mergeSequential

merge and mergeSequential Operators are used to combine multiple streams into one Flux Sequence . The difference is merge Merge... In the actual order in which all elements in the stream are generated , and mergeSequential In the order in which all streams are subscribed , Merge in streams .

Code list 11 We used merge and mergeSequential The operator . The streams that are merged are every 100 Milliseconds produce an element , But each element in the second stream is produced later than the first stream 50 millisecond . In the use of merge The result of , The elements from the two streams are intertwined in chronological order ; While using mergeSequential The result stream of is the first to produce all the elements in the first stream , And then all the elements in the second stream .

detailed list 11. merge and mergeSequential Examples of operator usage

1

2

3

4

5

6

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

        .toStream()

        .forEach(System.out::println);

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

        .toStream()

        .forEach(System.out::println);

flatMap and flatMapSequential

flatMap and flatMapSequential The operator converts each element of the flow into a flow , Then merge all the elements in the stream .flatMapSequential and flatMap The difference between mergeSequential and merge The difference between them is the same .

In the code list 12 in , The elements in the stream are converted every 100 Milliseconds produce a different number of streams , Merge again . Because the first stream contains fewer elements , So at the beginning of the result stream, the elements of the two streams are intertwined , And then there's only the elements in the second stream .

detailed list 12. flatMap Examples of operator usage

1

2

3

4

Flux.just(5, 10)

        .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

        .toStream()

        .forEach(System.out::println);

concatMap

concatMap The operator is also used to convert each element of a flow into a flow , Then merge all streams . And flatMap The difference is ,concatMap The transformed flow will be merged according to the order of elements in the original flow ; And flatMapSequential The difference is ,concatMap The subscription to the transformed stream is dynamic , and flatMapSequential All streams have been subscribed before the merge .

Code list 13 With the code list 12 similar , Just put flatMap Instead of concatMap, The result stream contains all the elements of the first stream and the second stream in turn .

detailed list 13. concatMap Examples of operator usage

1

2

3

4

Flux.just(5, 10)

        .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

        .toStream()

        .forEach(System.out::println);

combineLatest

combineLatest The operator merges all the newly generated elements in the stream into a new element , As an element in the returned result stream . As long as new elements are created in any one of these streams , The merge operation will be performed once , As a result, new elements are created in the flow . stay Code list 14 in , The newly generated elements in the stream are collected into an array , adopt Arrays.toString Method to convert an array to String.

detailed list 14. combineLatest Examples of operator usage

1

2

3

4

5

Flux.combineLatest(

        Arrays::toString,

        Flux.intervalMillis(100).take(5),

        Flux.intervalMillis(50, 100).take(5)

).toStream().forEach(System.out::println);

Message processing

When it needs to be handled Flux or Mono The news in , As shown in the previous code listing , Can pass subscribe Method to add the corresponding subscription logic . Calling subscribe Method can specify the type of message to be processed . You can only process the normal messages contained in it , You can also process error messages and completion messages at the same time . Code list 15 Pass through subscribe() Method handles both normal and error messages .

detailed list 15. adopt subscribe() Method to handle normal and error messages

1

2

3

Flux.just(1, 2)

        .concatWith(Mono.error(new IllegalStateException()))

        .subscribe(System.out::println, System.err::println);

Normal message processing is relatively simple . When an error occurs , There are many different strategies . The first strategy is through onErrorReturn() Method returns a default value . In the code list 16 in , When an error occurs , Flow will produce default values 0.

detailed list 16. Returns the default value when an error occurs

1

2

3

4

Flux.just(1, 2)

        .concatWith(Mono.error(new IllegalStateException()))

        .onErrorReturn(0)

        .subscribe(System.out::println);

The second strategy is through switchOnError() Method to use another stream to generate elements . In the code list 17 in , When an error occurs , Will produce Mono.just(0) The corresponding stream , That's numbers 0.

detailed list 17. Use another stream when an error occurs

1

2

3

4

Flux.just(1, 2)

        .concatWith(Mono.error(new IllegalStateException()))

        .switchOnError(Mono.just(0))

        .subscribe(System.out::println);

The third strategy is through onErrorResumeWith() Method to select the flow of generated elements to be used according to different exception types . In the code list 18 in , Depending on the exception type, different flows are returned as the data source when an error occurs . Because the type of exception is IllegalArgumentException, The elements produced are -1.

detailed list 18. When an error occurs, the flow is selected according to the exception type

1

2

3

4

5

6

7

8

9

10

11

Flux.just(1, 2)

        .concatWith(Mono.error(new IllegalArgumentException()))

        .onErrorResumeWith(e -> {

            if (e instanceof IllegalStateException) {

                return Mono.just(0);

            } else if (e instanceof IllegalArgumentException) {

                return Mono.just(-1);

            }

            return Mono.empty();

        })

        .subscribe(System.out::println);

When an error occurs , You can also use retry Operator to retry . The retry action is implemented by subscribing to the sequence again . In the use of retry Operator can specify the number of retries . Code list 19 The number of retries specified in is 1, The output is 1,2,1,2 And error messages .

detailed list 19. Use retry Operator to retry

1

2

3

4

Flux.just(1, 2)

        .concatWith(Mono.error(new IllegalStateException()))

        .retry(1)

        .subscribe(System.out::println);

Scheduler

The reactive flow and the various operations that can be performed on it are described earlier , Through the scheduler (Scheduler) You can specify how these operations are performed and the thread in which they are performed . There are several different scheduler implementations .

  • Current thread , adopt Schedulers.immediate() Method to create .
  • A single reusable thread , adopt Schedulers.single() Method to create .
  • Use elastic thread pool , adopt Schedulers.elastic() Method to create . Threads in the thread pool can be reused . When needed , New threads will be created . If a thread is idle for too long , It will be destroyed . The scheduler is suitable for I/O Operation related flow processing .
  • Use thread pools optimized for parallel operations , adopt Schedulers.parallel() Method to create . The number of threads depends on CPU The number of cores . The scheduler is suitable for computing intensive flow processing .
  • Use a scheduler that supports task scheduling , adopt Schedulers.timer() Method to create .
  • From what already exists ExecutorService Object to create a scheduler , adopt Schedulers.fromExecutorService() Method to create .

Some operators already use certain types of schedulers by default . such as intervalMillis() Method to create a stream using the Schedulers.timer() Scheduler created . adopt publishOn() and subscribeOn() Method can switch the scheduler that performs the operation . among publishOn() Method switches the execution of the operator , and subscribeOn() Method switches the way elements in the stream are executed .

In the code list 20 in , Use create() Method to create a new Flux object , The only element included is the name of the current thread . Then there are two pairs publishOn() and map() Method , Its function is to switch the scheduler in execution first , Add the current thread name as a prefix . Finally through subscribeOn() Method to change the way the flow is executed when it is generated . The result of running is [elastic-2] [single-1] parallel-1. The name of the innermost thread parallel-1 From the Schedulers.parallel() Scheduler , The middle thread name single-1 From the first map Before operation Schedulers.single() Scheduler , The name of the outermost thread elastic-2 From the second map Before operation Schedulers.elastic() Scheduler .

detailed list 20. Use scheduler to switch operator execution

1

2

3

4

5

6

7

8

9

10

11

Flux.create(sink -> {

    sink.next(Thread.currentThread().getName());

    sink.complete();

})

.publishOn(Schedulers.single())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.publishOn(Schedulers.elastic())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.subscribeOn(Schedulers.parallel())

.toStream()

.forEach(System.out::println);

test

In the use of Reactor When the code is tested , Need to use io.projectreactor.addons:reactor-test library .

Use StepVerifier

A typical scenario for testing is for a sequence , Verify that the elements contained in it are as expected .StepVerifier It can be used to verify the elements contained in the sequence one by one . In the code list 21 in , The stream that needs to be validated contains a and b Two elements . adopt StepVerifier.create() Method to package a flow and then verify it .expectNext() Method is used to declare the value of the next element in the stream expected at test time , and verifyComplete() Method to verify whether the flow ends normally . A similar approach is verifyError() To verify that the stream terminated due to an error .

detailed list 21. Use StepVerifier Verify the elements in the flow

1

2

3

4

StepVerifier.create(Flux.just("a", "b"))

        .expectNext("a")

        .expectNext("b")

        .verifyComplete();

Operation test time

Some sequences are generated in time , Like every other 1 Minutes to create a new element . During the test , It is not possible to spend the actual time waiting for each element to be generated . You need to use StepVerifier Virtual time function provided . adopt StepVerifier.withVirtualTime() Method to create a virtual clock StepVerifier. adopt thenAwait(Duration) Method to move the virtual clock forward .

In the code list 22 in , The stream that needs to be validated contains two elements that are generated every one day , And the first element generation delay is 4 Hours . Through StepVerifier.withVirtualTime() Method package flow after ,expectNoEvent() Method is used to verify in 4 No messages were generated within hours , Then verify the first element 0 produce ; next thenAwait() Method to move the virtual clock forward one day , Then verify the second element 1 produce ; Finally, verify that the flow ends normally .

detailed list 22. Operation test time

1

2

3

4

5

6

7

StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))

        .expectSubscription()

        .expectNoEvent(Duration.ofHours(4))

        .expectNext(0L)

        .thenAwait(Duration.ofDays(1))

        .expectNext(1L)

        .verifyComplete();

Use TestPublisher

TestPublisher Its function is to control the generation of elements in the flow , Even in case of violation of reaction flow specifications . In the code list 23 in , adopt create() Method to create a new TestPublisher object , And then use next() Method to produce elements , Use complete() Method to end the flow .TestPublisher It is mainly used to test operators created by developers themselves .

detailed list 23. Use TestPublisher Create the flow used by the test

1

2

3

4

5

6

7

8

9

final TestPublisher<String> testPublisher = TestPublisher.create();

testPublisher.next("a");

testPublisher.next("b");

testPublisher.complete();

 

StepVerifier.create(testPublisher)

        .expectNext("a")

        .expectNext("b")

        .expectComplete();

debugging

Because of the difference between reactive programming paradigm and traditional programming paradigm , Use Reactor It is difficult to debug the code when there is a problem . In order to better help developers debug ,Reactor The corresponding auxiliary functions are provided .

Enable debug mode

When you need to get more flow related execution information , You can add a code listing at the beginning of the program 24 To enable debugging mode . After debugging mode is enabled , All operators hold extra information about the execution chain when they execute . When an error occurs , This information will be output as part of the exception stack information . Through this information, we can analyze the specific problem in the execution of which operator .

detailed list 24. Enable debug mode

1

Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

But when debugging mode is enabled , There's a cost to recording this extra information . It's usually only after a mistake , Consider enabling debug mode . But when debugging mode is enabled to find the problem , Previous mistakes may not be easy to reproduce . In order to reduce the possible cost , You can limit the debugging mode to be enabled only for certain types of operators .

Use checkpoints

Another way is through checkpoint Operator to enable debug mode for a particular stream processing chain . Code list 25 in , stay map The operator is followed by a name test Check point of . When an error occurs , The checkpoint name appears in the exception stack information . For important or complex flow processing chains in programs , Checkpoints can be enabled at critical locations to help locate possible problems .

detailed list 25. Use checkpoint The operator

1

Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

logging

Another practical function in development and debugging is to log stream related events . This can be done by adding log Operator to implement . In the code list 26 in , Added log Operator and specifies the name of the log classification .

detailed list 26. Use log Operators record events

1

Flux.range(1, 2).log("Range").subscribe(System.out::println);

In the actual run time , The resulting output is like a code listing 27 Shown .

detailed list 27. log Log generated by operator

1

2

3

4

5

6

7

8

13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

13:07:56.753 [main] INFO Range - | request(unbounded)

13:07:56.754 [main] INFO Range - | onNext(1)

1

13:07:56.754 [main] INFO Range - | onNext(2)

2

13:07:56.754 [main] INFO Range - | onComplete()

“ cold ” And “ heat ” Sequence

All the previous code listings created were cold sequences . Cold sequence means that no matter when subscribers subscribe to the sequence , Always receive all messages generated in the sequence . And the corresponding thermal sequence , It's constantly generating news , Subscribers can only get messages generated after their subscription .

In the code list 28 in , The original sequence contains 10 The interval is 1 Second element . adopt publish() How to put a Flux Object conversion to ConnectableFlux object . Method autoConnect() The function of is when ConnectableFlux The message is generated when the object has a subscriber . Code source.subscribe() The purpose of is to subscribe to the ConnectableFlux object , Let it start generating data . Then the current thread sleeps 5 Second , The second subscriber can only get the last 5 Elements , So the output is numbers 5 To 9.

detailed list 28. Thermal sequence

1

2

3

4

5

6

7

8

9

final Flux<Long> source = Flux.intervalMillis(1000)

        .take(10)

        .publish()

        .autoConnect();

source.subscribe();

Thread.sleep(5000);

source

        .toStream()

        .forEach(System.out::println);

Conclusion

Reactive programming paradigm for developers accustomed to the traditional programming paradigm , It's a challenge to change the way of thinking , It's also a chance full of more possibilities .Reactor As a new Java library , It can be used as the basis of reactive application . This paper deals with Reactor Library made a detailed introduction , Include Flux and Mono Sequence creation 、 Use of common operators 、 Scheduler 、 Error handling and testing and debugging techniques .

The resources

版权声明
本文为[ZhaoYingChao88]所创,转载请带上原文链接,感谢

  1. [front end -- JavaScript] knowledge point (IV) -- memory leakage in the project (I)
  2. This mechanism in JS
  3. Vue 3.0 source code learning 1 --- rendering process of components
  4. Learning the realization of canvas and simple drawing
  5. gin里获取http请求过来的参数
  6. vue3的新特性
  7. Get the parameters from HTTP request in gin
  8. New features of vue3
  9. vue-cli 引入腾讯地图(最新 api,rocketmq原理面试
  10. Vue 学习笔记(3,免费Java高级工程师学习资源
  11. Vue 学习笔记(2,Java编程视频教程
  12. Vue cli introduces Tencent maps (the latest API, rocketmq)
  13. Vue learning notes (3, free Java senior engineer learning resources)
  14. Vue learning notes (2, Java programming video tutorial)
  15. 【Vue】—props属性
  16. 【Vue】—创建组件
  17. [Vue] - props attribute
  18. [Vue] - create component
  19. 浅谈vue响应式原理及发布订阅模式和观察者模式
  20. On Vue responsive principle, publish subscribe mode and observer mode
  21. 浅谈vue响应式原理及发布订阅模式和观察者模式
  22. On Vue responsive principle, publish subscribe mode and observer mode
  23. Xiaobai can understand it. It only takes 4 steps to solve the problem of Vue keep alive cache component
  24. Publish, subscribe and observer of design patterns
  25. Summary of common content added in ES6 + (II)
  26. No.8 Vue element admin learning (III) vuex learning and login method analysis
  27. Write a mini webpack project construction tool
  28. Shopping cart (front-end static page preparation)
  29. Introduction to the fluent platform
  30. Webpack5 cache
  31. The difference between drop-down box select option and datalist
  32. CSS review (III)
  33. Node.js学习笔记【七】
  34. Node.js learning notes [VII]
  35. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  36. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  37. 【JQuery框架,Java编程教程视频下载
  38. [jQuery framework, Java programming tutorial video download
  39. Vue Router根据后台数据加载不同的组件(思考-&gt;实现-&gt;不止于实现)
  40. Vue router loads different components according to background data (thinking - & gt; Implementation - & gt; (more than implementation)
  41. 【Vue,阿里P8大佬亲自教你
  42. 【Vue基础知识总结 5,字节跳动算法工程师面试经验
  43. [Vue, Ali P8 teaches you personally
  44. [Vue basic knowledge summary 5. Interview experience of byte beating Algorithm Engineer
  45. 【问题记录】- 谷歌浏览器 Html生成PDF
  46. [problem record] - PDF generated by Google browser HTML
  47. 【问题记录】- 谷歌浏览器 Html生成PDF
  48. [problem record] - PDF generated by Google browser HTML
  49. 【JavaScript】查漏补缺 —数组中reduce()方法
  50. [JavaScript] leak checking and defect filling - reduce() method in array
  51. 【重识 HTML (3),350道Java面试真题分享
  52. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  53. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  54. [re recognize HTML (3) and share 350 real Java interview questions
  55. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  56. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  57. 【重识 HTML ,nginx面试题阿里
  58. 【重识 HTML (4),ELK原来这么简单
  59. [re recognize HTML, nginx interview questions]
  60. [re recognize HTML (4). Elk is so simple