RSocket 入門指南:Spring Boot 請求-串流

工程 | Ben Wilcock | 2020 年 3 月 23 日 | ...

時間:約 15 分鐘。

在本系列的先前文章中,您已在 Spring Boot 中使用 RSocket 體驗了 請求-回應fire-and-forget 訊息傳遞。這次您將嘗試 RSocket 另一個全新的訊息傳遞模型 — 請求-串流。

在本練習中,您將學習如何使用傳統的「用戶端請求伺服器串流」方法來串流資料。

我之前沒有提到的一點是,RSocket 允許您在任一方向使用其訊息傳遞模型。因此,如果您想使用較不常見的「伺服器請求用戶端串流」模型,對於 RSocket 來說也沒問題。此外,還有許多非 Java 的 RSocket 實作可供選擇,包括 Go、Javascript 和 .Net — 如果您的架構包含 Java 可能不是最佳選擇的平台,這將非常理想。

釐清了這一點後,請按照以下步驟將串流資料功能添加到您先前文章中現有的 RSocket 用戶端和伺服器程式碼中。

如果您還沒閱讀先前關於 伺服器端用戶端 請求-回應訊息傳遞,或 fire-and-forget 的文章,現在正是時候!程式碼範例在 GitHub 上。

步驟 1:新增伺服器端串流方法

再次於您的伺服器端 RSocketController 類別中,新增一個名為 .stream() 的新方法,其簽章為 — 「接受物件,返回 Flux」— 這是 RSocket 對此訊息傳遞模型的期望。使用 @MessageMapping 註解標註這個新方法,並指定適當的對應名稱 — 例如 "stream"。此方法的範例程式碼如下。

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("Received stream request: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(SERVER, STREAM, index))
                .log();
    }

RSocketController 位於 rsocket-server 資料夾中的 io.pivotal.rsocketserver 套件中。

.stream() 方法的唯一參數 Message 來自先前討論的 io.pivotal.rsocketserver.data 套件。此訊息構成用戶端請求資料串流的基礎。上面的程式碼會在收到用戶端請求後立即將其記錄到主控台。

此方法傳回的 Flux 物件是 Project Reactor 的一部分,也用於 Spring Framework 的 反應式支援 中。

RSocket 使用 Flux 是因為它可以大幅簡化反應式資料串流的處理。Flux 是資料的「發布者」。它描述了 0 到 N 個元素的串流,並提供了許多用於處理串流資料的運算子 — 類似於 Java 8 的串流 API

在上面的程式碼中,每秒會將一個新的 Long 元素添加到 Flux 中 — 透過 .interval() 呼叫設定 — 基本上提供了一個恆定的資料串流。.map() 函數使用 Long 作為索引值建立一個新的訊息物件,而在最後一行,呼叫 .log() 方法會將所有流經 Flux 的元素(包括錯誤等)列印到主控台。

步驟 2:新增用戶端串流方法

在用戶端專案的 RSocketShellClient 類別中,首先,新增一個對 Disposable 物件的全域參考,如下所示

private static Disposable disposable;

RSocketShellClient 位於 rsocket-client 資料夾中的 io.pivotal.rsocketclient 套件中。

這個 Disposable 物件可讓您在資料串流開始後控制它。

接下來,在您的 RSocketShellClient 中新增一個 .stream() 方法。使用 @ShellMethod 註解標註此方法。此方法的範例程式碼如下。

    @ShellMethod("Send one request. Many responses (stream) will be printed.")
    public void stream() {
        log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
        this.disposable = this.rsocketRequester
                .route("stream")
                .data(new Message(CLIENT, STREAM))
                .retrieveFlux(Message.class)
                .subscribe(er -> log.info("Response received: {}", er));
    }

在上面的程式碼中,rsocketRequester 被告知將請求路由到伺服器的 .stream() 方法,方法是指定 "stream" 作為 .route()。新的訊息物件為您的請求提供 .data()。因為您希望伺服器返回串流,所以您在 rsocketRequester 上使用了 .requestFlux() 方法,指定返回的 Flux 包含 Message 類型的元素。最後,您在 .subscribe() 方法中設定了一個日誌函數作為您串流的訂閱者。

請注意如何保留 rsocketRequester 產生的 Disposable。您需要它來停止串流。

步驟 3:新增用戶端停止串流方法

透過保留對串流的參考,您可以在想要停止串流時處置它。若要將您的串流取消功能新增至您的 RSocketShellClient,請新增一個名為 .s() 的新方法,並使用 @ShellMethod 進行註解,如下所示

    @ShellMethod("Stop streaming messages from the server.")
    public void s(){
        if(null != disposable){
            disposable.dispose();
        }
    }

在此方法中,呼叫 disposable.dispose() 會取消串流。有了這個方法,若要停止串流,請在 shell:> 提示符號中輸入 s,然後按下 Enter 鍵。串流隨即停止。您的編碼任務現在已完成。接下來,測試用戶端和伺服器是否能協同運作。

步驟 4:建置並執行 RSocket 伺服器

開啟終端機視窗並移動到 rsocket-server 目錄。使用 Maven 和 Spring Boot 外掛程式執行伺服器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost 連接埠 7000 上啟動。

步驟 5:建置並執行 RSocket 用戶端

開啟第二個終端機視窗並移動到 rsocket-client 目錄。從那裡,建置並執行 RSocket 用戶端應用程式,如下所示

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

啟動後,Spring Shell 會向您顯示新的提示符號

shell:>

您在提示符號中輸入 stream 以從伺服器請求串流。用戶端會傳送一個 Message 作為其串流請求。串流中的每個 Message 都會在伺服器傳送時和用戶端接收時印出。用戶端上的主控台日誌看起來像這樣

shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)

若要停止串流,請在 shell:> 提示符號中輸入 s,然後按下 Enter 鍵。

步驟 5:整理

您可以透過在 shell:> 提示符號中輸入 exit 來停止 rsocket-client,如下所示。

shell:>exit

您可以透過在其終端機視窗中按下 Ctrl-C 來停止 rsocket-server 程序。

運作方式

RSocketShellClient 中的 .stream() 方法使用 RSocketRequester 向伺服器傳送單一請求訊息。此請求啟動從伺服器到用戶端的資料串流。然後,用戶端會將接收到的每則訊息記錄到主控台。

伺服器端的 RSocketController 會檢查請求訊息的中繼資料以取得 route。此訊息的路由設定為 "stream",因此伺服器會將訊息傳遞至對應的 .stream(Message request) 方法。然後,伺服器會繼續每秒向用戶端傳送訊息串流,直到用戶端要求停止。

用戶端可以隨時停止串流。在 .s() 方法中,這是透過在原始串流訂閱傳回的 Disposable 上呼叫 .dispose() 來完成的。

最後的想法

在這篇文章中,您學習了如何在 Spring Boot 中使用 RSocket 建構請求-串流功能。在下一篇文章中,我們將新增通道訊息傳遞。下次見!

取得 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部