領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多時間:約 15 分鐘。
在本系列的先前文章中,您已在 Spring Boot 中使用 RSocket 體驗了 請求-回應 和 fire-and-forget 訊息傳遞。這次您將嘗試 RSocket 另一個全新的訊息傳遞模型 — 請求-串流。
在本練習中,您將學習如何使用傳統的「用戶端請求伺服器串流」方法來串流資料。
我之前沒有提到的一點是,RSocket 允許您在任一方向使用其訊息傳遞模型。因此,如果您想使用較不常見的「伺服器請求用戶端串流」模型,對於 RSocket 來說也沒問題。此外,還有許多非 Java 的 RSocket 實作可供選擇,包括 Go、Javascript 和 .Net — 如果您的架構包含 Java 可能不是最佳選擇的平台,這將非常理想。
釐清了這一點後,請按照以下步驟將串流資料功能添加到您先前文章中現有的 RSocket 用戶端和伺服器程式碼中。
如果您還沒閱讀先前關於 伺服器端、用戶端 請求-回應訊息傳遞,或 fire-and-forget 的文章,現在正是時候!程式碼範例在 GitHub 上。
再次於您的伺服器端 RSocketController
類別中,新增一個名為 .stream()
的新方法,其簽章為 — 「接受物件,返回 Flux」— 這是 RSocket 對此訊息傳遞模型的期望。使用 @MessageMapping
註解標註這個新方法,並指定適當的對應名稱 — 例如 "stream"
。此方法的範例程式碼如下。
@MessageMapping("stream")
Flux<Message> stream(Message request) {
log.info("Received stream request: {}", request);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message(SERVER, STREAM, index))
.log();
}
RSocketController
位於rsocket-server
資料夾中的io.pivotal.rsocketserver
套件中。
.stream()
方法的唯一參數 Message
來自先前討論的 io.pivotal.rsocketserver.data
套件。此訊息構成用戶端請求資料串流的基礎。上面的程式碼會在收到用戶端請求後立即將其記錄到主控台。
此方法傳回的 Flux
物件是 Project Reactor 的一部分,也用於 Spring Framework 的 反應式支援 中。
RSocket 使用 Flux
是因為它可以大幅簡化反應式資料串流的處理。Flux 是資料的「發布者」。它描述了 0 到 N 個元素的串流,並提供了許多用於處理串流資料的運算子 — 類似於 Java 8 的串流 API。
在上面的程式碼中,每秒會將一個新的 Long
元素添加到 Flux 中 — 透過 .interval()
呼叫設定 — 基本上提供了一個恆定的資料串流。.map()
函數使用 Long
作為索引值建立一個新的訊息物件,而在最後一行,呼叫 .log()
方法會將所有流經 Flux 的元素(包括錯誤等)列印到主控台。
在用戶端專案的 RSocketShellClient
類別中,首先,新增一個對 Disposable
物件的全域參考,如下所示
private static Disposable disposable;
RSocketShellClient
位於rsocket-client
資料夾中的io.pivotal.rsocketclient
套件中。
這個 Disposable
物件可讓您在資料串流開始後控制它。
接下來,在您的 RSocketShellClient
中新增一個 .stream()
方法。使用 @ShellMethod
註解標註此方法。此方法的範例程式碼如下。
@ShellMethod("Send one request. Many responses (stream) will be printed.")
public void stream() {
log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
this.disposable = this.rsocketRequester
.route("stream")
.data(new Message(CLIENT, STREAM))
.retrieveFlux(Message.class)
.subscribe(er -> log.info("Response received: {}", er));
}
在上面的程式碼中,rsocketRequester
被告知將請求路由到伺服器的 .stream()
方法,方法是指定 "stream"
作為 .route()
。新的訊息物件為您的請求提供 .data()
。因為您希望伺服器返回串流,所以您在 rsocketRequester
上使用了 .requestFlux()
方法,指定返回的 Flux
包含 Message
類型的元素。最後,您在 .subscribe()
方法中設定了一個日誌函數作為您串流的訂閱者。
請注意如何保留
rsocketRequester
產生的Disposable
。您需要它來停止串流。
透過保留對串流的參考,您可以在想要停止串流時處置它。若要將您的串流取消功能新增至您的 RSocketShellClient
,請新增一個名為 .s()
的新方法,並使用 @ShellMethod
進行註解,如下所示
@ShellMethod("Stop streaming messages from the server.")
public void s(){
if(null != disposable){
disposable.dispose();
}
}
在此方法中,呼叫 disposable.dispose()
會取消串流。有了這個方法,若要停止串流,請在 shell:>
提示符號中輸入 s
,然後按下 Enter
鍵。串流隨即停止。您的編碼任務現在已完成。接下來,測試用戶端和伺服器是否能協同運作。
開啟終端機視窗並移動到 rsocket-server
目錄。使用 Maven 和 Spring Boot 外掛程式執行伺服器,如下所示
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
伺服器在 localhost
連接埠 7000
上啟動。
開啟第二個終端機視窗並移動到 rsocket-client
目錄。從那裡,建置並執行 RSocket 用戶端應用程式,如下所示
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
啟動後,Spring Shell 會向您顯示新的提示符號
shell:>
您在提示符號中輸入 stream
以從伺服器請求串流。用戶端會傳送一個 Message
作為其串流請求。串流中的每個 Message
都會在伺服器傳送時和用戶端接收時印出。用戶端上的主控台日誌看起來像這樣
shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)
若要停止串流,請在 shell:>
提示符號中輸入 s
,然後按下 Enter
鍵。
您可以透過在 shell:>
提示符號中輸入 exit
來停止 rsocket-client
,如下所示。
shell:>exit
您可以透過在其終端機視窗中按下 Ctrl-C
來停止 rsocket-server
程序。
RSocketShellClient
中的 .stream()
方法使用 RSocketRequester
向伺服器傳送單一請求訊息。此請求啟動從伺服器到用戶端的資料串流。然後,用戶端會將接收到的每則訊息記錄到主控台。
伺服器端的 RSocketController
會檢查請求訊息的中繼資料以取得 route
。此訊息的路由設定為 "stream"
,因此伺服器會將訊息傳遞至對應的 .stream(Message request)
方法。然後,伺服器會繼續每秒向用戶端傳送訊息串流,直到用戶端要求停止。
用戶端可以隨時停止串流。在 .s()
方法中,這是透過在原始串流訂閱傳回的 Disposable
上呼叫 .dispose()
來完成的。
在這篇文章中,您學習了如何在 Spring Boot 中使用 RSocket 建構請求-串流功能。在下一篇文章中,我們將新增通道訊息傳遞。下次見!