建立用於消耗資料並產生 Spring Cloud Stream Sink 應用程式的函數

工程 | Soby Chacko | 2020 年 8 月 3 日 | ...

這是部落格系列的第 4 部分,我們將在其中介紹用於 Spring Cloud Stream 應用程式的 Java 函數。

本系列的其他部分。

第 1 部分 - 一般介紹

第 2 部分 - 函數組合

第 3 部分 - Supplier 函數和 Source 應用程式

在本系列的上一篇部落格中,我們看到了如何使用 java.util.function.Supplier 來產生 Spring Cloud Stream source。在這個新版本中,我們將看到如何使用 java.util.function.Consumerjava.util.function.Function 開發和測試消耗函數。稍後,我們將簡要說明如何從這個 consumer 產生 Spring Cloud Stream sink 應用程式。

撰寫 Consumer

撰寫 consumer 背後的概念相對簡單。我們從某些外部來源消耗資料,並將其交給 consumer 中的業務邏輯。正如我們在先前的部落格中看到的 Supplier 案例一樣,動作發生在業務邏輯實作內部。如果我們使用程式庫來幫助我們完成所有繁重的工作,例如 Spring Integration,那麼這就變成簡單地將收到的資料透過適當的 API 委派給程式庫的問題。但是,如果沒有可用的此類程式庫,我們需要自己編寫所有程式碼。讓我們以一個具體的例子來說明這一點。

撰寫 Apache Pulsar 的 Consumer

Apache Pulsar 是一個流行的訊息中介軟體系統。讓我們假設一下,我們想要撰寫一個通用的 Java Consumer,它從某處接收資料,然後將其轉發到 Pulsar。在不深入細節的情況下,這是一個簡單的 Consumer,可以完成此任務。基本的實作程式碼取自此處

@Bean
public org.apache.pulsar.client.api.Producer producer() {
  String pulsarBrokerRootUrl = "pulsar://127.0.0.1:6650";
  PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
  String topic = "persistent://sample/standalone/ns1/my-topic";
  return client.createProducer(topic);
}

@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
  return payload -> {
     producer.send(payload);
  };
}

再次強調,這裡展示僅用於說明目的,可能不是將資料發送到 Apache Pulsar 的完整實作。儘管如此,這演示了我們想要傳達的概念。查看 consumer,我們可以看到程式碼很簡單;我們在 lambda 運算式內部所做的只是在 Apache Pulsar Producer 上呼叫 send 方法。

我們可以將上述 consumer 注入到應用程式中,並以程式設計方式調用它的 accept 方法,提供資料。正如我們在先前的部落格中看到的,下圖表達了在諸如 Spring Cloud Data Flow 之類的平台上,獨立運行函數或作為資料協調管道一部分運行的想法。

Stream Applications Layered Architecture for Functions

好的,這個 consumer 非常簡單,我們可能會這樣想。如果我們想做一些更複雜的事情呢?下面,我們將完全做到這一點。

撰寫 RSocket 的消耗函數

RSocket 是一個雙向二進制協議,Spring Framework 為其提供了出色的支援。RSocket 提供了一種單向發送 (fire and forget) 模型,允許我們向 RSocket 伺服器發送訊息而無需接收回應。我們想要使用 TCP 為此模型撰寫一個 consumer,其中 consumer 接收外部資料,然後將其推送到 RSocket 伺服器。RSocket 的 Java 實作基於 Project Reactor。因此,當我們撰寫 consumer 時,我們需要使用反應式類型和模式(類似於先前的部落格中的反應式 feed supplier)。

當使用單向發送策略時,RSocket 會傳回 Mono<Void>,我們的 consumer 需要從函數傳回它。但是,在 java.util.function.Consumer 的情況下,我們無法傳回任何內容。因此,我們必須撰寫一個簽名為 Function<String, Mono<Void>> rsocketConsumer() 的函數。由於該函數傳回 Mono<Void>,這在語義上等同於撰寫 consumer。函數的使用者需要取得對 Mono 的引用並訂閱它。在我們已經為 MongoDBCassandra 提供的現成 consumer 中也使用了類似的模式。

設定專案時,請包含以下 Maven 依賴項。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

來自 Spring Boot 的這個 starter 依賴項將過渡性地將所有 RSocket 依賴項引入我們的專案。

在我們編寫函數程式碼之前,讓我們先編寫一個 ConfigurationProperties 類別來定義函數需要的一些核心屬性。

@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {

  private String host = "localhost";

  private int port = 7000;

  private String route;
…
}

正如我們所看到的,使用前綴 rsocket.consumer,我們定義了三個屬性 - hostport 用於 RSocket 伺服器,而 route 是伺服器上的端點。

現在我們有了組態屬性,讓我們建立一個 Configuration 類別來配置我們的函數 Bean。

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

  @Bean
  public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
                                            RsocketConsumerProperties rsocketConsumerProperties) {
     final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
           rsocketConsumerProperties.getPort());

     return input -> rSocketRequester
                 .flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
                       .data(input)
                       .send());
  }
}

我們將來自 Spring Boot 自動組態的 builder 注入到函數中,這有助於我們建立 RSocketRequester。使用這個 builder,我們建立一個帶有 TCP 連接的 Mono<RSocketRequester>connectTcp API 方法採用 RSocket 主機和埠資訊。一旦我們獲得了對 RSocketRequester 的處理,我們就可以在函數中提供的 lambda 內部使用它。

我們在 Mono<RSocketRequester> 上呼叫 flatMap,對於每個傳入的訊息,我們指定 route 和需要發送的資料,然後再呼叫最終將資料推送到 RSocket 伺服器的 send 方法。

這就是撰寫一個消耗資料然後使用單向發送互動模型將其發送到 RSocket 伺服器的函數所需的一切。請記住,由於 Spring Framework 在底層提供的各種 RSocket 支援和抽象,此程式碼看起來非常簡單。

讓我們撰寫一個快速測試來驗證該函數是否按預期工作。

正如我們在先前的部落格中對反應式 supplier 所做的那樣,將以下依賴項添加到專案中。這有助於我們測試反應式元件。

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

以下是包含其他必要元件的測試。

@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {

  @Autowired
  Function<Message<?>, Mono<Void>> rsocketConsumer;

  @Autowired
  TestController controller;

  @Test
  void testRsocketConsumer() {

     rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
           .subscribe();

     StepVerifier.create(this.controller.fireForgetPayloads)
           .expectNext("Hello RSocket")
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  @ComponentScan
  static class RSocketConsumerTestApplication{}

  @Controller
  static class TestController {
     final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();

     @MessageMapping("test-route")
     void someMethod(String payload) {
        this.fireForgetPayloads.onNext(payload);
     }
  }
}

測試元件的快速說明。

  • 我們在 SpringBootApplication 上提供屬性 spring.rsocket.server.port。這允許 Spring Boot 自動組態預設的 RSocket 伺服器以進行測試。在這裡將埠硬編碼為 7000,因為這是 Spring Boot 在自動組態元件時使用的預設埠。這與我們在上面的屬性中使用的預設值相同。我們還指定了我們想要在測試中使用的 route

  • 提供了一個 Controller,其中包含一個使用 MessageMapping 註釋的方法,它攔截到達我們在測試中指定的 route 的訊息。伺服器上 route 的每個傳入記錄都傳遞到 Flux 中,稍後可以在測試期間重新播放以進行斷言。

  • 在測試中,我們在注入的 RSocket consumer 上呼叫 apply 方法,並為其提供測試訊息。

  • 最後,我們使用 StepVerifier 來驗證訊息是否已成功發送到 RSocket 伺服器。

從 RSocket Consumer 產生 Spring Cloud Stream Sink 應用程式

上一篇部落格中,我們詳細介紹了如何從 Supplier 函數產生 Spring Cloud Stream source 應用程式。您可以遵循我們在那裡使用的相同模式,從我們上面撰寫的 RSocket 函數產生 sink 應用程式。我們不會在這裡重新討論所有相關的細節。使用這裡提供的許多不同的 sink 應用程式作為範本。當我們使用 Spring Cloud Stream 中的測試 binder 測試函數時,將訊息發送到 InputDestination。Spring Cloud Stream 會將其下游發送到 RSocket 伺服器。然後,我們可以使用與上述單元測試中使用的相同的驗證策略。請參閱此處以獲取有關使用測試 binder 測試 Spring Cloud Stream 元件的更多資訊。

結論

在這篇部落格文章中,我們看到了如何撰寫一個普通的 consumer,它使用 Apache Pulsar 作為範例來消耗資料並對其執行操作。然後,我們探討了如何以 Function<String, Mono<Void>> 的形式開發反應式 consumer,並以 RSocket 單向發送策略來指導我們。我們還示範了如何單元測試這個反應式 consumer。請遵循本文中列出的步驟來撰寫您自己的資料 consumer,如果您這樣做,請考慮貢獻 pull request。

敬請期待…​

請在未來幾週內關注更多深入探討和重點主題。在該系列的下一篇部落格中,我們將開始一系列案例研究,在其中我們將探索現有的函數和應用程式。

取得 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進制檔案。

瞭解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

檢視全部