通过Spring Boot Webflux实现Reactor Kafka

解道jdon 2021-05-04 12:34:53
spring 实现 boot reactor webflux


Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。

我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在这里找到。

Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单击提交按钮时,表单将提交给网关的API。API具有针对Kafka群集上的未确认事务主题的反应流,这个未确认事务的主题的另外一边消费者是PaymentValidator,监听要验证的传入消息。然后,这些消息通过响应管道,验证方法将其打印到命令行。

通过Reactive Streams向Kafka发送消息

我们的应用程序构建在Spring 5和Spring Boot 2之上,使我们能够快速设置和使用Project Reactor。

Gateway应用程序的目标是设置从Web控制器到Kafka集群的Reactive流。这意味着我们需要特定的依赖关系来弹簧webflux和reactor-kafka。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController提供支付API,为paymentGateway类的doPayment方法创建一个Reactive流。

/ ** 
     *调用返回的Mono将被发送到Spring Webflux,后者依赖于multi-reactor 事件循环和NIO 
     *以非阻塞方式处理请求,从而实现更多的并发请求。结果将
     通过一个名为Server Sent Events 发送。
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
         当调用doPayment方法时,我们发送付款信息,获得Mono <Void>作为响应。
         当我们的付款成功发送事件到Kafka主题
         ** / 
        return paymentGateway.doPayment(payment);
    }

paymentGateway需要一个kafkaProducer,它使我们能够将消息作为管道的一部分放在Kafka主题中。它可以使用KafkaSender.create方法轻松创建,传递许多生产者选项。

 public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
        kafkaProducer = KafkaSender.create(producerOptions);
    }

创建之后,kafkaProducer可以用来轻松地将我们的消息发送到选择的Kafka主题,成为控制器中启动的管道的一部分。因为消息是以非阻塞方式发送到Kafka集群的,所以我们可以使用项目Reactor的事件循环接收并将来自Web API的大量并发消息路由到Kafka。

 @Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
        String payload = toBinary(payment);
        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    }
    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }

从Kafka主题创建反应流

当没有消费者监听时,向主题发送消息没有多大意义,因此我们的第二个应用程序将使用一个反应管道来监听未确认的事务主题。为此,使用KafkaReceiver.create方法创建kafkaReceiver对象,类似于我们之前创建kafkaProducer的方法。

通过使用kafkaReceiver.receive方法,我们可以获得receiverRecords的Flux。进入我们读取的主题中每条消息都放入receiverRecord中。流入应用程序后,它们会进一步通过反应管道。然后,这些消息传递processEvent方法,该方法调用paymentValidator,该方法将一些信息输出到控制台。最后,在receiverOffset上调用acknowledge方法,向Kafka集群发送一条消息已被处理的确认。

   public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("unconfirmed-transactions"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        kafkaReceiver = KafkaReceiver.create(consumerOptions);
        /**
         * We create a receiver for new unconfirmed transactions
         */
        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
                    final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
                    processEvent(paymentEvent);
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }
    private void processEvent(PaymentEvent paymentEvent) {
        paymentValidator.calculateResult(paymentEvent);
    }
    private <T> T fromBinary(String object, Class<T> resultType) {
        try {
            return objectMapper.readValue(object, resultType);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

可以在此处找到此示例的代码

版权声明
本文为[解道jdon]所创,转载请带上原文链接,感谢
https://www.jdon.com/52083

  1. JS: event flow
  2. Front end performance optimization: rearrangement and redrawing
  3. JS - deep and shallow copy
  4. JavaScript异步编程3——Promise的链式使用
  5. JavaScript asynchronous programming 3 -- chain use of promise
  6. Vue.js组件的使用
  7. The use of vue.js component
  8. How to judge whether a linked list has links
  9. Element UI custom theme configuration
  10. Text image parallax effect HTML + CSS + JS
  11. Spring的nohttp宣言:消灭http://
  12. Vue3 intermediate guide - composition API
  13. Analysis of URL
  14. These 10 widgets that every developer must know
  15. Spring's nohttp Manifesto: eliminate http://
  16. Learn more about JS prototypes
  17. Refer to await to JS to write an await error handling
  18. A short article will directly let you understand what the event loop mechanism is
  19. Vue3 uses mitt for component communication
  20. Characteristics and thinking of ES6 symbol
  21. Two way linked list: I'm no longer one-way driving
  22. Vue event and form processing
  23. Reactive TraderCloud实时外汇开源交易平台
  24. Reactive tradercloud real time foreign exchange open source trading platform
  25. Node.js REST API的10个最佳实践
  26. Ten best practices of node.js rest API
  27. Fiddler advanced usage
  28. Process from Vue template to render
  29. Promise up (asynchronous or synchronous)
  30. Principle and implementation of promise
  31. Vs code plug in sharing - run code
  32. Vue practical notes (1) introduction of Ant Design
  33. Vue actual combat notes (2) introduction of element plus
  34. Introduction to webpack
  35. Webpack construction process
  36. Vue notes
  37. The experience and lessons of moving from ruby megalith architecture to go microservice
  38. Using leancloud to add artitalk module to hexo blog
  39. Implementation of chrome request filtering extension
  40. Detailed introduction of beer import declaration elements and label quarantine [import knowledge]
  41. Gallop workflow engine design series 01 process element design
  42. VUE移动端音乐APP学习【十六】:播放器歌词显示开发
  43. Vue Mobile Music App learning [16]: player lyrics display development
  44. jquery cookie
  45. jquery cookie
  46. 体面编码之JavaScript
  47. JavaScript for decent coding
  48. React17 系统精讲 结合TS打造旅游电商平台
  49. React17 system combined with TS to build tourism e-commerce platform
  50. 2021-05-04 hot news
  51. HttpSession对象与Cooike的关系 以及 Cookie对象构造函数问题
  52. gRPC-Web:替代REST的gRPC的Javascript库包
  53. The relationship between httpsession object and cooike and the construction of cookie object
  54. Grpc Web: a JavaScript library package to replace rest grpc
  55. Building reactive rest API with Java - kalpa Senanayake
  56. PDF转HTML工具——用springboot包装pdf2htmlEX命令行工具
  57. Pdf to HTML tool -- Wrapping pdf2htmlex command line tool with springboot
  58. PDF转HTML工具——用springboot包装pdf2htmlEX命令行工具
  59. Pdf to HTML tool -- Wrapping pdf2htmlex command line tool with springboot
  60. Vue.js比jQuery更容易学习