領先一步
VMware 提供訓練和認證,以加速您的進展。
了解更多閱讀時間:約 7 分鐘。 編碼時間:約 20 分鐘。
如果您一直在關注我在 RSocket Spring Boot 伺服器入門 系列文章,您一定聽過我多次提到「用戶端和伺服器」。但是,使用 RSocket,用戶端和伺服器之間的界線是模糊的。使用 Rsocket,伺服器可以向用戶端發送訊息,而用戶端可以像伺服器一樣回應這些請求。
事實上,RSocket 文件 並未使用「用戶端」或「伺服器」這些術語。文件改用「請求者」和「回應者」這些術語。在 RSocket 中,任何組件都可以作為請求者,任何組件都可以作為回應者,甚至可以同時作為兩者。在 RSocket 中,請求者和回應者之間的所有這種來回通訊都發生在單一「雙向」連線上。
今天,您將利用這些功能,透過程式設計您的 rsocket-client 來回應來自伺服器的請求。在伺服器端程式碼中,您將監聽用戶端連線事件,當連線事件發生時,您將把新的用戶端儲存在已連線用戶端的列表中。您還將要求每個用戶端將遙測訊息串流回伺服器,只要他們的連線處於活動狀態。
首先,進行一些內務處理。Spring Boot 和 RSocket Java 程式庫最近都已更新。這些更新包含常見的錯誤修正、增強功能和棄用,因此升級符合我們的利益。
在 Maven pom.xml
的 <parent>
區段中,將 spring-boot-starter-parent
變更為版本 2.3.0.RELEASE
,如下所示。您需要執行兩次,因為您有兩個不同的專案 — rsocket-client
和 rsocket-server
。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
若要重新整理您的程式碼,請在兩個專案的根資料夾中執行以下命令
./mvnw clean compile
現在您可以繼續進行編碼任務。
用戶端需要能夠回應來自伺服器的訊息。在 rsocket-client
專案的 RSocketShellClient.java
中,新增一個名為 ClientHandler
的新內部類別,如下所示
@Slf4j
class ClientHandler {
@MessageMapping("client-status")
public Flux<String> statusUpdate(String status) {
log.info("Connection {}", status);
return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
}
}
此類別包含一個名為 statusUpdate()
的單一方法,並使用 @MessageMapping
註釋進行裝飾,就像 rsocket-server
專案中的那些一樣。用戶端使用此類別和此方法來擷取和回應來自伺服器的請求。回應本身是訊息串流。每 5 秒,用戶端會告訴伺服器其目前的可用記憶體。將其視為用戶端遙測資料。
為了使其運作,您必須向 RSocket 連線「註冊」此類別。您將在下一節中執行此操作。
在用戶端可以回應來自伺服器的訊息之前,它必須向 RSocket 連線註冊 ClientHandler
。修訂後的建構子程式碼如下所示。請注意建構子的方法簽章變更,以新增 RSocketStrategies
變數。Spring 會將此變數提供給您的建構子。將您舊的建構子程式碼替換為下面列出的新程式碼。
public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
// (1)
String client = UUID.randomUUID().toString();
log.info("Connecting using client ID: {}", client);
// (2)
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
// (3)
this.rsocketRequester = rsocketRequesterBuilder
.setupRoute("shell-client")
.setupData(client)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
// (4)
this.rsocketRequester.rsocket()
.onClose()
.doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED"))
.subscribe();
}
在上面的程式碼中,您首先產生並儲存一個唯一的 ID,用於識別此用戶端實例 (1)。接下來,您使用 RSocket strategies
和新的 ClientHandler
實例 (2) 建立新的 SocketAcceptor
。然後使用 RSocketRequesterBuilder
註冊新的 SocketAcceptor
(3)。最後,透過處理 RSocket onClose()
事件 (4) 來確保妥善處理斷線。
用戶端程式碼到此為止。讓我們繼續進行伺服器端的變更。
在 rsocket-server
專案中要做的第一件事是建立 RSocketRequester
用戶端的集合,方法是在 RSocketController.java
類別中新增一個類別層級變數,如下所示
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
接下來,新增一個連線處理常式,方法是新增一個類似於此的新方法
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
// The code for the method will go HERE
}
@ConnectMapping
註釋可讓您監聽用戶端連線事件的發生。使用此事件,您可以排程兩項工作。第一項是將每個新用戶端新增到 CLIENTS
列表。第二項是呼叫每個用戶端並啟動其遙測串流。
將以下程式碼新增到您剛建立的方法中
requester.rsocket()
.onClose() // (1)
.doFirst(() -> {
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester); // (2)
})
.doOnError(error -> {
log.warn("Channel to client {} CLOSED", client); // (3)
})
.doFinally(consumer -> {
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client); // (4)
})
.subscribe();
首先要注意的是呼叫 requester.rsocket().onClose()
方法 (1)。此方法會傳回反應式 Mono
物件,其中包含您需要的所有回呼。
mono 的 doFirst()
方法在任何呼叫 subscribe()
之前,但在 mono 的初始建立之後呼叫。使用此方法將用戶端的 requester
物件新增到 CLIENTS
列表 (2)。
此程式碼可能會讓人感覺違反直覺 — 在用戶端連線時呼叫
onClose()
,然後使用產生的 mono 來儲存對新用戶端的參考。有時,事件驅動的 API 會感覺有點奇怪。但可以將其視為此 RSocket 連線的 mono 向您發送「我活著」事件。您正在使用該建立事件來觸發將每個用戶端的參考儲存在列表中。
當連線出現問題時,RSocket 會呼叫 mono 的 doOnError()
方法。這包括用戶端已選擇關閉連線的情況。您可以使用提供的 error
變數來決定要採取的動作。在上面的程式碼中,錯誤只是記錄為警告 (3)。
當 RSocket 連線已關閉時,會觸發 mono 的 doFinally()
方法。此方法是執行從列表 CLIENTS
中移除用戶端的程式碼的理想位置 (4)。
最後,subscribe()
會啟動您已新增到 mono 的反應式程式碼,並表示您已準備好處理事件。
當每個用戶端連線時,請求遙測讀數串流。為此,再次在 connectShellClientAndAskForTelemetry()
方法中工作,您需要向您稍早新增的 client-status
訊息處理常式發送請求。此程式碼如下所示
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
.subscribe();
使用 requester
,以路由 "client-status"
為目標。發送字串 "OPEN"
作為訊息資料,並擷取 String
類型的 Flux
。每個到達的字串都包含用戶端目前的可用記憶體讀數。將此讀數記錄到主控台。最後,別忘了 subscribe()
以啟動您的反應式程式碼。
現在是測試您的程式碼的時候了。開啟終端機視窗並移至 rsocket-server
目錄,然後執行伺服器,如下所示
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
伺服器在 localhost
連接埠 7000
上啟動。
開啟第二個終端機視窗並移至 rsocket-client
目錄。從那裡,建置並執行用戶端,如下所示
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
啟動後,您會注意到用戶端和伺服器組件的主控台中都有新的記錄項目。在 rsocket-client
視窗中,您會看到記錄項目顯示用戶端的唯一 ID — UUID 格式 — 以及 "OPEN"
連線狀態,如下所示
Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>
至少等待 10 秒,然後在 shell:>
提示符號下輸入 exit
。rsocket-client 現在會與伺服器斷線並關閉。
現在切換到 rsocket-server
視窗。記錄項目看起來像這樣
Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED
當用戶端連線時,它會新增到用戶端列表中,並且主控台會列印其用戶端 ID 和狀態「CONNECTED」。然後,每 5 秒,記錄會顯示用戶端目前的「可用記憶體」讀數。最後,當用戶端斷線時,其 RSocket 通道的狀態會變成「CLOSED」,並且用戶端會「DISCONNECTED」。
您可以按下終端機視窗中的 Ctrl-C
來停止 rsocket-server
程序。
呼叫用戶端的能力非常強大。它非常適合各種情況,包括行動裝置、物聯網或微服務。而且由於所有這些都可以透過 TCP 或 WebSockets 發生,因此您已經擁有您需要的所有基礎架構,而無需訴諸像訊息代理程式這樣的重量級解決方案。
您在這裡涵蓋了很多內容。您學習了如何將伺服器變成「請求者」並將用戶端變成「回應者」。您發現了如何監聽連線事件。您還了解了一些關於如何處理來自 rsocket 連線的錯誤和事件的資訊。而且,儘管在本練習中,您選擇了「request-stream」作為您的伺服器-用戶端通訊,但您可以根據您的需求使用四種 RSocket 互動模式中的任何一種。