RSocket 入門:Spring Boot 通道

工程 | Ben Wilcock | 2020年4月6日 | ...

閱讀時間:約 6 分鐘。練習時間:約 20 分鐘。

如果您和我一樣,仍處於 RSocket 旅程的起點,請查看 RSocket 協定背後的動機。這份簡短但富有洞察力的文件包含一則與我產生強烈共鳴的訊息——「不匹配的抽象概念會增加系統開發的成本。」

從軟體設計的角度來看,RSocket 的四種互動模型提供了顯著的好處。這表示我們可以使用適用於每個用例的正確互動模型來建模組件到組件之間的通訊。這種更有效率的模型可以為您節省大量的編碼時間和精力!

到目前為止,在本系列中,我們已經探索了 請求-回應單向發送請求-串流 訊息傳遞。今天,您將把通道新增到您的用戶端和伺服器程式碼中。

什麼是通道?

通道是雙向管道,允許資料流在任一方向流動。透過通道,從用戶端到伺服器的資料流可以與從伺服器到用戶端的資料流並存。通道在現實世界中有許多用途。通道可以傳輸視訊會議串流、發送和接收雙向聊天訊息、使用 delta 和 diff 同步資料,或提供報告、觀察和監控系統的方法。

RSocket 中的通道並不比串流或請求-回應更複雜。也就是說,您將在下面實作的場景比您之前嘗試過的稍微複雜一些,因此最好在開始之前先了解它。

在接下來的練習中,伺服器將訊息串流傳輸到用戶端。用戶端控制伺服器串流中訊息的頻率。它透過使用「延遲」設定串流來完成此操作。用戶端串流中的設定告訴伺服器,它發送的每條訊息之間應該暫停多久。可以將其視為訊息頻率撥盤。當頻率設定高時,暫停時間較短,因此您會看到大量伺服器發送的訊息。當頻率設定低時,暫停時間較長,因此您看到的伺服器發送的訊息會較少。考慮到這個結果,讓我們開始編碼。

完整的程式碼範例可在 GitHub 上取得。RSocketController 位於 rsocket-server 資料夾中的 io.pivotal.rsocketserver 套件中。RSocketShellClient 位於 rsocket-client 資料夾中的 io.pivotal.rsocketclient 套件中。

步驟 1:將通道方法新增至 RSocketController

在伺服器端的 RSocketController 類別中,新增一個名為 channel() 的方法,該方法接受 Flux<Duration> 並傳回 Flux<Message>。此方法簽章—flux-in、flux-out—將此方法識別為 RSocket 通道方法。使用值 "channel"@MessageMapping() 註釋該方法。此方法的程式碼如下。

    @MessageMapping("channel")
    Flux<Message> channel(final Flux<Duration> settings) {
        return settings
                    .doOnNext(setting -> log.info("\nFrequency setting is {} second(s).\n", setting.getSeconds()))
                    .switchMap(setting -> Flux.interval(setting)
                                                   .map(index -> new Message(SERVER, CHANNEL, index)))
                                                   .log();
    }

在程式碼中,.doOnNext() 正在監聽來自用戶端的設定串流。每次有新的延遲設定到達時,它都會將訊息寫入日誌。.switchMap() 為每個新設定建立一個新的 Flux。這個新的 flux 會根據延遲設定中包含的 .interval() 延遲發出一個新的 Message 物件。伺服器將這些新訊息在串流中傳送回用戶端。

步驟 2:將通道方法新增至 RSocketShellClient

在用戶端 RSocketShellClient 類別中,新增一個新的 channel() 方法,並使用 @ShellMethod() 註釋對其進行註釋。新增方法用途的描述作為註釋值,如下例所示。

    @ShellMethod("Stream some settings to the server. Stream of responses will be printed.")
    public void channel(){

// Code goes here

}

接下來,在方法中,新增建立設定串流的程式碼。程式碼如下所示

Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));

Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                                        .doOnNext(d -> log.info("\nSending setting for {}-second interval.\n", d.getSeconds()));

每個 Mono 都包含一個單一的 Duration 設定。每個持續時間控制伺服器發送的每條訊息之間的暫停時間。總共有 3 個 mono。第一個包含 1 秒的短暫停設定。第二個具有較為寬鬆的 3 秒暫停設定,但此 mono 被告知使用 .delayElement() 方法將此持續時間的產生延遲 5 秒。第三個 mono 包含 5 秒的暫停設定,但在 15 秒過去之前不會發出其持續時間。這 3 個 mono 使用 .concat() 方法串連成一個單一的 Flux。使用 .doOnNext() 新增了日誌記錄語句,以便您可以看到程式碼執行時發生的情況。

注意:有很多方法可以建立基於 Flux 的串流,但對於本教學課程,這只是一些簡單的東西。

現在您已經在 flux 內部有了設定串流,將與伺服器通訊所需的程式碼新增到方法中

disposable = this.rsocketRequester
                    .route("channel")
                    .data(settings)
                    .retrieveFlux(Message.class)
                    .subscribe(message -> log.info("Received: {} \n(Type 's' to stop.)", message));

如果您一直在關注本系列,那麼您現在應該對這段程式碼很熟悉了。rsocketRequester 是您在建構函式中建立的全域變數。它提供您與伺服器的 RSocket 通訊連結。.route() 設定為 "channel" 以符合伺服器端程式碼中的訊息映射。.data() 是您在上面建立的 mono 串流。.retrieveFlux() 指定您期望收到 Message 物件串流,而 .subscribe() 開始您的訂閱並確保每個收到的訊息都列印到日誌中,以便您可以看到發生的情況。訂閱建立的 Disposable 物件會被保留並用於控制通道。

您可以在 這裡 看到該方法的完整程式碼。這就是我們需要的所有程式碼。讓我們執行它!

步驟 3:建置並執行 RSocket 伺服器

開啟終端機視窗並移動到 rsocket-server 目錄,然後依照以下步驟執行伺服器

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost7000 上啟動。

步驟 4:建置並執行 RSocket 用戶端

開啟第二個終端機視窗並移動到 rsocket-client 目錄。從那裡,依照以下步驟建置並執行用戶端

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

啟動後,Spring Shell 會顯示新的提示符號

shell:>

您可以在提示符號處輸入 channel 來向伺服器請求通道。

用戶端建立其延遲計時器設定串流,並開始將其傳送到伺服器。用戶端和伺服器都會列印輸出串流中的每個持續時間。伺服器會傳回訊息串流,用戶端會將其列印到日誌中。用戶端終端機看起來像這樣

shell:>channel
i.p.rsocketclient.RSocketShellClient :

Sending setting for 1-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304561)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 3-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304568)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 5-second interval.

2020-03-27 10:23:00.243 INFO 5680 --- [tor-tcp-epoll-1] i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=4, created=1585304580)
(Type 's' to stop.)

# removed log-lines

若要停止通道傳輸,請輸入 s,然後按下 Enter 鍵。

步驟 5:整理

您可以像這樣在 shell:> 提示符號處輸入 exit 來退出 rsocket-client

shell:>exit

您可以在 rsocket-server 的終端機視窗中按下 Ctrl-C 來停止該程序。

運作方式

用戶端建立 3 個持續時間元素的序列。第一個持續時間設定會立即發出,第二個在 5 秒後發出,第三個在 15 秒後發出。每次發出新的持續時間時,都會記錄到主控台。此設定串流會傳送到伺服器,並控制伺服器產生的訊息串流。

在伺服器端,每次從用戶端串流中提取新的持續時間設定時,都會建立並傳回新的訊息串流。從用戶端傳送的最新設定控制此伺服器發送串流中每條訊息之間的時間延遲。

當訂閱的可拋棄物件被用戶端處置時,通道傳輸就會停止。

最終想法

如果您已關注整個系列,那麼您現在已經看到了 RSocket 的所有四種互動模型的實際應用:請求-回應單向發送請求-串流,以及現在的通道。

透過這四種可供您使用的通訊樣式,您不太可能遇到我們在開頭討論的那些惱人的「不匹配的抽象概念」場景。有了 RSocket 在您的工具箱中,您將擁有一個靈活、低摩擦、高效能的訊息傳輸協定,您可以在您的軟體中使用它——一個專為微服務架構而建的協定。

取得 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部