RSocket 入門指南:伺服器呼叫用戶端

工程 | Ben Wilcock | 2020 年 5 月 12 日 | ...

閱讀時間:約 7 分鐘。 編碼時間:約 20 分鐘。

如果您一直在關注我在 RSocket Spring Boot 伺服器入門 系列文章,您一定聽過我多次提到「用戶端和伺服器」。但是,使用 RSocket,用戶端和伺服器之間的界線是模糊的。使用 Rsocket,伺服器可以向用戶端發送訊息,而用戶端可以像伺服器一樣回應這些請求。

事實上,RSocket 文件 並未使用「用戶端」或「伺服器」這些術語。文件改用「請求者」和「回應者」這些術語。在 RSocket 中,任何組件都可以作為請求者,任何組件都可以作為回應者,甚至可以同時作為兩者。在 RSocket 中,請求者和回應者之間的所有這種來回通訊都發生在單一「雙向」連線上。

今天,您將利用這些功能,透過程式設計您的 rsocket-client 來回應來自伺服器的請求。在伺服器端程式碼中,您將監聽用戶端連線事件,當連線事件發生時,您將把新的用戶端儲存在已連線用戶端的列表中。您還將要求每個用戶端將遙測訊息串流回伺服器,只要他們的連線處於活動狀態。

如果您一直在關注 系列,您可以按照以下指示進行編碼。程式碼也可以從 GitHub 下載。

步驟 1:更新 Spring Boot 和 RSocket

首先,進行一些內務處理。Spring BootRSocket Java 程式庫最近都已更新。這些更新包含常見的錯誤修正、增強功能和棄用,因此升級符合我們的利益。

在 Maven pom.xml<parent> 區段中,將 spring-boot-starter-parent 變更為版本 2.3.0.RELEASE,如下所示。您需要執行兩次,因為您有兩個不同的專案 — rsocket-clientrsocket-server

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.0.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

若要重新整理您的程式碼,請在兩個專案的根資料夾中執行以下命令

./mvnw clean compile

現在您可以繼續進行編碼任務。

步驟 2:為用戶端新增請求-回應訊息處理常式

用戶端需要能夠回應來自伺服器的訊息。在 rsocket-client 專案的 RSocketShellClient.java 中,新增一個名為 ClientHandler 的新內部類別,如下所示

@Slf4j
class ClientHandler {
 
 @MessageMapping("client-status")
 public Flux<String> statusUpdate(String status) {
   log.info("Connection {}", status);
   return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
 }
}

此類別包含一個名為 statusUpdate() 的單一方法,並使用 @MessageMapping 註釋進行裝飾,就像 rsocket-server 專案中的那些一樣。用戶端使用此類別和此方法來擷取和回應來自伺服器的請求。回應本身是訊息串流。每 5 秒,用戶端會告訴伺服器其目前的可用記憶體。將其視為用戶端遙測資料。

為了使其運作,您必須向 RSocket 連線「註冊」此類別。您將在下一節中執行此操作。

步驟 3:在用戶端的建構子中註冊 ClientHandler

在用戶端可以回應來自伺服器的訊息之前,它必須向 RSocket 連線註冊 ClientHandler。修訂後的建構子程式碼如下所示。請注意建構子的方法簽章變更,以新增 RSocketStrategies 變數。Spring 會將此變數提供給您的建構子。將您舊的建構子程式碼替換為下面列出的新程式碼。

public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
 
 // (1)
 String client = UUID.randomUUID().toString();
 log.info("Connecting using client ID: {}", client);
  
 // (2)
 SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
  
 // (3) 
 this.rsocketRequester = rsocketRequesterBuilder
 .setupRoute("shell-client")
 .setupData(client)
 .rsocketStrategies(strategies)
 .rsocketConnector(connector -> connector.acceptor(responder))
 .connectTcp("localhost", 7000)
 .block();
  
 // (4)
 this.rsocketRequester.rsocket()
 .onClose()
 .doOnError(error -> log.warn("Connection CLOSED"))
 .doFinally(consumer -> log.info("Client DISCONNECTED"))
 .subscribe();
 }

在上面的程式碼中,您首先產生並儲存一個唯一的 ID,用於識別此用戶端實例 (1)。接下來,您使用 RSocket strategies 和新的 ClientHandler 實例 (2) 建立新的 SocketAcceptor。然後使用 RSocketRequesterBuilder 註冊新的 SocketAcceptor (3)。最後,透過處理 RSocket onClose() 事件 (4) 來確保妥善處理斷線。

用戶端程式碼到此為止。讓我們繼續進行伺服器端的變更。

步驟 4:在伺服器上記住用戶端

rsocket-server 專案中要做的第一件事是建立 RSocketRequester 用戶端的集合,方法是在 RSocketController.java 類別中新增一個類別層級變數,如下所示

private final List<RSocketRequester> CLIENTS = new ArrayList<>();

接下來,新增一個連線處理常式,方法是新增一個類似於此的新方法

  @ConnectMapping("shell-client")
 void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
 // The code for the method will go HERE
 }

@ConnectMapping 註釋可讓您監聽用戶端連線事件的發生。使用此事件,您可以排程兩項工作。第一項是將每個新用戶端新增到 CLIENTS 列表。第二項是呼叫每個用戶端並啟動其遙測串流。

將以下程式碼新增到您剛建立的方法中

requester.rsocket()
        .onClose() // (1)
        .doFirst(() -> { 
            log.info("Client: {} CONNECTED.", client);
            CLIENTS.add(requester); // (2)
        })
        .doOnError(error -> { 
            log.warn("Channel to client {} CLOSED", client); // (3)
        })
        .doFinally(consumer -> { 
            CLIENTS.remove(requester);
            log.info("Client {} DISCONNECTED", client); // (4)
        })
        .subscribe();

首先要注意的是呼叫 requester.rsocket().onClose() 方法 (1)。此方法會傳回反應式 Mono 物件,其中包含您需要的所有回呼。

mono 的 doFirst() 方法在任何呼叫 subscribe() 之前,但在 mono 的初始建立之後呼叫。使用此方法將用戶端的 requester 物件新增到 CLIENTS 列表 (2)。

此程式碼可能會讓人感覺違反直覺 — 在用戶端連線時呼叫 onClose(),然後使用產生的 mono 來儲存對新用戶端的參考。有時,事件驅動的 API 會感覺有點奇怪。但可以將其視為此 RSocket 連線的 mono 向您發送「我活著」事件。您正在使用該建立事件來觸發將每個用戶端的參考儲存在列表中。

當連線出現問題時,RSocket 會呼叫 mono 的 doOnError() 方法。這包括用戶端已選擇關閉連線的情況。您可以使用提供的 error 變數來決定要採取的動作。在上面的程式碼中,錯誤只是記錄為警告 (3)。

當 RSocket 連線已關閉時,會觸發 mono 的 doFinally() 方法。此方法是執行從列表 CLIENTS 中移除用戶端的程式碼的理想位置 (4)。

最後,subscribe() 會啟動您已新增到 mono 的反應式程式碼,並表示您已準備好處理事件。

步驟 5:要求用戶端提供可用記憶體讀數

當每個用戶端連線時,請求遙測讀數串流。為此,再次在 connectShellClientAndAskForTelemetry() 方法中工作,您需要向您稍早新增的 client-status 訊息處理常式發送請求。此程式碼如下所示

requester.route("client-status")
        .data("OPEN")
        .retrieveFlux(String.class)
        .doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
        .subscribe();

使用 requester,以路由 "client-status" 為目標。發送字串 "OPEN" 作為訊息資料,並擷取 String 類型的 Flux。每個到達的字串都包含用戶端目前的可用記憶體讀數。將此讀數記錄到主控台。最後,別忘了 subscribe() 以啟動您的反應式程式碼。

步驟 6:建置並執行 RSocket 伺服器

現在是測試您的程式碼的時候了。開啟終端機視窗並移至 rsocket-server 目錄,然後執行伺服器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost 連接埠 7000 上啟動。

步驟 7:建置並執行 RSocket 用戶端

開啟第二個終端機視窗並移至 rsocket-client 目錄。從那裡,建置並執行用戶端,如下所示

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

運作方式

啟動後,您會注意到用戶端和伺服器組件的主控台中都有新的記錄項目。在 rsocket-client 視窗中,您會看到記錄項目顯示用戶端的唯一 ID — UUID 格式 — 以及 "OPEN" 連線狀態,如下所示

Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>

至少等待 10 秒,然後在 shell:> 提示符號下輸入 exit。rsocket-client 現在會與伺服器斷線並關閉。

現在切換到 rsocket-server 視窗。記錄項目看起來像這樣

Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED

當用戶端連線時,它會新增到用戶端列表中,並且主控台會列印其用戶端 ID 和狀態「CONNECTED」。然後,每 5 秒,記錄會顯示用戶端目前的「可用記憶體」讀數。最後,當用戶端斷線時,其 RSocket 通道的狀態會變成「CLOSED」,並且用戶端會「DISCONNECTED」。

您可以按下終端機視窗中的 Ctrl-C 來停止 rsocket-server 程序。

最終想法

呼叫用戶端的能力非常強大。它非常適合各種情況,包括行動裝置、物聯網或微服務。而且由於所有這些都可以透過 TCP 或 WebSockets 發生,因此您已經擁有您需要的所有基礎架構,而無需訴諸像訊息代理程式這樣的重量級解決方案。

您在這裡涵蓋了很多內容。您學習了如何將伺服器變成「請求者」並將用戶端變成「回應者」。您發現了如何監聽連線事件。您還了解了一些關於如何處理來自 rsocket 連線的錯誤和事件的資訊。而且,儘管在本練習中,您選擇了「request-stream」作為您的伺服器-用戶端通訊,但您可以根據您的需求使用四種 RSocket 互動模式中的任何一種。

取得 Spring 電子報

隨時關注 Spring 電子報

訂閱

領先一步

VMware 提供訓練和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看所有