brief introduction
In the last article, we briefly introduced Reactor The history of development and the basic Flux and Mono Use , This article will further explore Reactor Advanced usage of , Let's see .
Customize Subscriber
In the previous article we mentioned 4 individual Flux Of subscribe Methods :
Disposable subscribe();
Disposable subscribe(Consumer<? super T> consumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
Disposable subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
These four methods , We need to use lambda Expressions come from definitions consumer,errorConsumer,completeSonsumer and subscriptionConsumer The four Consumer.
It's more complicated to write , It doesn't seem convenient either , Let's think about it , The four Consumer Is it right? Subscriber Defined in interface 4 There is a one-to-one correspondence ?
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
Right , So we have a simpler one subscribe Method :
public final void subscribe(Subscriber<? super T> actual)
This subscribe Method directly receives a Subscriber class . All the functions are realized .
Write it yourself Subscriber It's too troublesome ,Reactor It provides us with a BaseSubscriber Class , It has achieved Subscriber All the functions in , There are other ways to do it .
Let's take a look at BaseSubscriber The definition of :
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,
Disposable
Be careful ,BaseSubscriber It's a single use , That means , If it first subscription To Publisher1, then subscription To Publisher2, Then it will be cancelled for the first Publisher The subscription .
because BaseSubscriber Is an abstract class , So we need to inherit it , And rewrite the methods we need to implement ourselves .
Let's take a look at a custom Subscriber:
public class CustSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
BaseSubscriber There are a lot of them with hook Opening method , These methods can be rewritten , and Subscriber Natively defined on Opening method , stay BaseSubscriber All of them are final Of , They can't be rewritten .
Let's look at a definition :
@Override
public final void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
try {
hookOnSubscribe(s);
}
catch (Throwable throwable) {
onError(Operators.onOperatorError(s, throwable, currentContext()));
}
}
}
You can see , It actually calls hook Methods .
above CustSubscriber in , We rewrite two methods , One is hookOnSubscribe, Call when creating a subscription , One is hookOnNext, Upon receipt of onNext Call when signal .
In these methods , It gives us enough room for customization , In the above example, we called request(1), Indicates that another element is requested .
Other hook There are ways : hookOnComplete, hookOnError, hookOnCancel and hookFinally.
Backpressure Handle
We talked about ,reactive stream The biggest feature of is that it can handle Backpressure.
What is? Backpressure Well ? Is that when consumer When you can't handle it , You can inform producer To reduce production speed .
Let's take a look at BaseSubscriber In the default hookOnSubscribe Realization :
protected void hookOnSubscribe(Subscription subscription){
subscription.request(Long.MAX_VALUE);
}
You can see that the default is request An infinite number of values . That is to say, by default, there is no Backpressure.
By rewriting hookOnSubscribe Method , We can customize the processing speed .
except request outside , We can still do that publisher Medium limit subscriber The speed of .
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
}
stay Flux in , We have a limitRate Method , You can set publisher The speed of .
such as subscriber request(100), And then we set up limitRate(10), So at most producer One time only produces 10 Elements .
establish Flux
Next , We're going to explain how to create Flux, Generally speaking, there are 4 Methods to create Flux.
Use generate
The first method is the simplest synchronous creation generate.
Let's look at an example :
public void useGenerate(){
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});
flux.subscribe(System.out::println);
}
Output results :
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
In the example above , We use generate Method to generate elements synchronously .
generate Receive two parameters :
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
The first parameter is stateSupplier, Used to specify the state of initialization .
The second parameter is a generator, For consumption SynchronousSink, And generate new states .
In the example above , Every time we will state+1, All the way up to 10.
And then use subscribe To output all the generated elements .
Use create
Flux There is also a create Method to create Flux,create It can be synchronous or asynchronous , And support multithreading operation .
because create There is no initial state state , So it can be used in multithreading .
create One of the most useful things about this is that you can use third-party asynchronism API and Flux Connect , for instance , We have a custom EventProcessor, When dealing with the corresponding event , Will call to register to Processor Medium listener Some of the ways .
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
How do we make this Listener Response behavior and Flux It's connected ?
public void useCreate(){
EventProcessor myEventProcessor = new EventProcessor();
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
}
Use create That's enough ,create Receive one consumer Parameters :
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
This consumer The essence is to consume FluxSink object .
The above example is in MyEventListener In the event of FluxSink Object to consume .
Use push
push and create equally , It also supports asynchronous operations , But only one thread can call at the same time next, complete perhaps error Method , So it's single threaded .
Use Handle
Handle Different from the three methods above , It's an instance method .
It and generate Is very similar , It's also consumption SynchronousSink object .
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
The difference is that its parameter is a BiConsumer, There is no return value .
Let's look at an example of using :
public void useHandle(){
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
}
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
Examples of this article learn-reactive
The author of this article :flydean Program those things
Link to this article :http://www.flydean.com/reactor-core-in-depth/
In this paper, the source :flydean The blog of
Welcome to my official account. :「 Program those things 」 The most popular interpretation , The deepest dry goods , The most concise tutorial , There are so many tricks you don't know about waiting for you to discover !