Realization of reactor Kafka through spring boot Webflux

Jiedao jdon 2021-05-04 12:36:46
realization reactor kafka spring boot


stay Apache Kafka Introduction , We studied the distributed streaming media platform Apache Kafka. This time, , We will focus on Reactor Kafka, This library can be created from Project Reactor To Kafka Topics Of Reactive Streams, vice versa .

We'll use two small sample applications ,Paymentprocessor Gateway and PaymentValidator. The code for these applications can be found in here find .

Paymentprocessor The gateway provides a small web page , You can generate a random credit card number ( It's obviously a forgery ), And the amount paid . When the user clicks the submit button , The form will be submitted to API.API Have a point at Kafka Reaction flow of unacknowledged transaction topics on the cluster , The other side of the subject of this unconfirmed transaction is PaymentValidator, Listen for incoming messages to verify . then , These messages go through the response pipeline , The verification method prints it to the command line .

adopt Reactive Streams towards Kafka Send a message

Our application is built on Spring 5 and Spring Boot 2 above , Enables us to quickly set up and use Project Reactor.

Gateway The goal of the application is to set up from Web Controller to Kafka Clustered Reactive flow . This means that we need specific dependencies to webflux and reactor-kafka.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController Offer payment API, by paymentGateway Class doPayment Method to create a Reactive flow .

/ ** 
     * Call returned Mono Will be sent to Spring Webflux, The latter depends on multi-reactor  The cycle of events and NIO 
     * Processing requests in a non blocking manner , So as to achieve more concurrent requests . The result will be
      Through a named Server Sent Events send out .
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
          When calling doPayment When the method is used , We send payment information , get Mono <Void> As a response .
          When our payment is successfully sent to Kafka The theme
         ** / 
        return paymentGateway.doPayment(payment);
    }

paymentGateway Need one kafkaProducer, It enables us to put messages as part of the pipeline in Kafka In the theme . It can be used KafkaSender.create Method is easy to create , Pass many producer options .

 public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
        kafkaProducer = KafkaSender.create(producerOptions);
    }

After creating ,kafkaProducer Can be used to easily send our messages to the selected Kafka The theme , Become part of the pipeline started in the controller . Because the message is sent nonblocking to Kafka Clustered , So we can use projects Reactor And will come from Web API A large number of concurrent messages are routed to Kafka.

 @Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
        String payload = toBinary(payment);
        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    }
    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }

from Kafka Themes create reaction streams

When there is no consumer monitoring , Sending a message to a topic doesn't make much sense , So our second application will use a reaction pipeline to listen for unacknowledged transaction topics . So , Use KafkaReceiver.create Method creation kafkaReceiver object , Similar to what we created before kafkaProducer Methods .

By using kafkaReceiver.receive Method , We can get receiverRecords Of Flux. In the topic we read, every message goes into receiverRecord in . After flowing into the application , They go further through the reaction tube . then , The messaging processEvent Method , This method calls paymentValidator, This method outputs some information to the console . Last , stay receiverOffset On the call acknowledge Method , towards Kafka The cluster sends an acknowledgement that the message has been processed .

   public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("unconfirmed-transactions"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        kafkaReceiver = KafkaReceiver.create(consumerOptions);
        /**
         * We create a receiver for new unconfirmed transactions
         */
        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
                    final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
                    processEvent(paymentEvent);
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }
    private void processEvent(PaymentEvent paymentEvent) {
        paymentValidator.calculateResult(paymentEvent);
    }
    private <T> T fromBinary(String object, Class<T> resultType) {
        try {
            return objectMapper.readValue(object, resultType);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

Can be in here Find the code for this example

版权声明
本文为[Jiedao jdon]所创,转载请带上原文链接,感谢
https://qdmana.com/2021/05/20210504123224092M.html

  1. Two way linked list: I'm no longer one-way driving
  2. Vue event and form processing
  3. Reactive TraderCloud实时外汇开源交易平台
  4. Reactive tradercloud real time foreign exchange open source trading platform
  5. Node.js REST API的10个最佳实践
  6. Ten best practices of node.js rest API
  7. Fiddler advanced usage
  8. Process from Vue template to render
  9. Promise up (asynchronous or synchronous)
  10. Principle and implementation of promise
  11. Vs code plug in sharing - run code
  12. Vue practical notes (1) introduction of Ant Design
  13. Vue actual combat notes (2) introduction of element plus
  14. Introduction to webpack
  15. Webpack construction process
  16. Vue notes
  17. The experience and lessons of moving from ruby megalith architecture to go microservice
  18. Using leancloud to add artitalk module to hexo blog
  19. Implementation of chrome request filtering extension
  20. Detailed introduction of beer import declaration elements and label quarantine [import knowledge]
  21. Gallop workflow engine design series 01 process element design
  22. VUE移动端音乐APP学习【十六】:播放器歌词显示开发
  23. Vue Mobile Music App learning [16]: player lyrics display development
  24. jquery cookie
  25. jquery cookie
  26. 体面编码之JavaScript
  27. JavaScript for decent coding
  28. React17 系统精讲 结合TS打造旅游电商平台
  29. React17 system combined with TS to build tourism e-commerce platform
  30. 2021-05-04 hot news
  31. HttpSession对象与Cooike的关系 以及 Cookie对象构造函数问题
  32. gRPC-Web:替代REST的gRPC的Javascript库包
  33. The relationship between httpsession object and cooike and the construction of cookie object
  34. Grpc Web: a JavaScript library package to replace rest grpc
  35. Building reactive rest API with Java - kalpa Senanayake
  36. PDF转HTML工具——用springboot包装pdf2htmlEX命令行工具
  37. Pdf to HTML tool -- Wrapping pdf2htmlex command line tool with springboot
  38. PDF转HTML工具——用springboot包装pdf2htmlEX命令行工具
  39. Pdf to HTML tool -- Wrapping pdf2htmlex command line tool with springboot
  40. Vue.js比jQuery更容易学习
  41. Node.js的Reactor模式 与异步编程
  42. Vue. JS is easier to learn than jQuery
  43. Reactor mode of node.js and asynchronous programming
  44. 详解JavaScript中的正则表达式
  45. Explain regular expressions in JavaScript
  46. 详解JavaScript中的正则表达式
  47. Explain regular expressions in JavaScript
  48. JS: closure
  49. Write your own promise in promises / A + specification
  50. Analysis of the core mechanism of webpack from loader, plugin to egg
  51. On the import and export of webpack
  52. Interpretation of lodash source code (2)
  53. Hexo series (5) writing articles
  54. 有人用过JMeter或用HttpUnit写过测试吗????
  55. Has anyone ever used JMeter or written tests in httpUnit????
  56. JavaScript异步编程4——Promise错误处理
  57. Leetcode 1846. Reduce and rearrange the largest element of an array
  58. JavaScript asynchronous programming 4 -- promise error handling
  59. SQLite是一种经典的无服务器Serverless
  60. 通过Spring Boot Webflux实现Reactor Kafka