瞭解反應式類型 (Understanding Reactive types)

工程 (Engineering) | Sébastien Deleuze | 2016 年 4 月 19 日 (April 19, 2016) | ...

繼之前的 Reactive SpringReactor Core 3.0 部落格文章之後,我想解釋為什麼反應式類型是有用的,以及它們與其他非同步類型的比較,這是基於我們在 Spring Framework 5 即將推出的反應式支援中所學到的。

為什麼要使用反應式類型? (Why using Reactive types?)

反應式類型的目的不是讓你更快地處理請求或資料,事實上,與常規的阻塞式處理相比,它們會引入一些額外的開銷。 它們的優勢在於它們能夠同時處理更多的請求,並更有效地處理具有延遲的操作,例如從遠端伺服器請求資料。 它們允許你提供更好的服務品質和可預測的容量規劃,方法是原生處理時間和延遲,而無需消耗更多資源。 與傳統的處理方式在等待結果時會阻塞當前線程不同,等待的反應式 API 不會產生任何成本,只請求它能夠處理的資料量,並帶來新的功能,因為它處理的是資料流,而不僅僅是一個接一個的單獨元素。

Java 8 之前 (Before Java 8)

在 Java 8 之前,非同步非阻塞行為並不容易實現,至少有兩個原因。 第一個原因是基於回呼的 API 需要冗長的匿名類別,並且不容易鏈結。 第二個原因是 Future 類型是非同步的,**但是**當你嘗試使用 get() 方法獲取結果時,它會阻塞當前線程,直到計算完成。 這就是 Spring Framework 4.0 引入 ListenableFuture 的原因,它是一個 Future 實現,增加了非阻塞的基於回呼的功能。

Lambdas、CompletableFuture 和 Stream

然後 Java 8 引入了 lambdas 和 CompletableFuture。 Lambdas 允許編寫簡潔的回呼,而 CompletionStage 介面和 CompletableFuture 類別最終允許以非阻塞方式和基於推送的方式處理 future,同時提供鏈結此類延遲結果處理的功能。

Java 8 還引入了 Stream,它的設計目的是有效地處理可以以零或極少延遲存取的資料流(包括原始類型)。 它是基於拉取的,只能使用一次,缺乏與時間相關的操作,並且可以執行並行計算,但無法指定要使用的線程池。 正如 Brian Goetz 所解釋的,它並非設計用於處理具有延遲的操作,例如 I/O 操作。 而這就是像 Reactor 或 RxJava 這樣的反應式 API 發揮作用的地方。

反應式 API (Reactive APIs)

Reactor 這樣的反應式 API 也提供了像 Java 8 Stream 這樣的運算子,但它們更普遍地適用於任何流序列(不僅僅是集合),並允許定義轉換操作的管道,這些管道將應用於通過它傳遞的資料,這要歸功於方便的 fluent API 和 lambdas 的使用。 它們的設計目的是處理同步或非同步操作,並允許你緩衝、合併、串聯或將各種轉換應用於你的資料。

最初,反應式 API 僅設計用於處理資料流,即 N 個元素,例如,使用 Reactor 的 Flux

reactiveService.getResults()
    .mergeWith(Flux.interval(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .consume(System.out::println);

但在我們在 Spring Framework 5 上的工作過程中,很明顯需要區分 1 個或 N 個元素的流,這就是 Reactor 提供 Mono 類型的原因。 MonoCompletableFuture 類型的反應式等效類型,並且允許提供一致的 API,以反應式方式處理單個和多個元素。

Mono.any(reactiveServiceA.findRecent(time), reactiveServiceB.findRecent(time)
    .timeout(Duration.ofSeconds(3), errorHandler::fallback)
    .doOnSuccess(r -> reactiveServiceC.incrementSuccess())
    .consume(System.out::println);

如果你更深入地查看 FluxMono,你會注意到這些類型實現了 Reactive Streams 規範中的 Publisher 介面。

Reactive Streams

Reactor 建立在 Reactive Streams 規範之上。 Reactive Streams 由 4 個簡單的 Java 介面PublisherSubscriberSubscriptionProcessor)、文字規範TCK 組成。 它是每個現代反應式程式庫的基石,並且是互通性目的的必備條件。

Reactive Streams 的核心關注點是處理背壓 (backpressure)。 簡而言之,背壓是一種機制,允許接收器詢問它想從發送器接收多少資料。 它允許

  • 接收器僅在其準備好處理資料時才開始接收資料
  • 控制正在處理的資料量
  • 有效處理慢速發送器/快速接收器或快速發送器/慢速接收器用例
  • 如果你請求 Long.MAX_VALUE 個元素,則從動態推拉策略切換到僅基於推送的策略

乍一看,Publisher 介面似乎很容易實現; 但以完全符合規範的方式實現它非常困難,並且用戶除了訂閱原始 Publisher 之外無法對其執行任何操作! 這就是為什麼通常最好依賴於像 Reactor 這樣的 Reactive Streams 實現來幫助你處理這個問題。

請注意,Java 9 將包含 java.util.concurrent.Flow 容器類中的 Reactive Streams 介面,進一步顯示了 Reactive Streams 在 JDK 中的相關性。

同樣重要的是要注意,趨向 Reactive Streams 和 Reactor 轉換功能 的趨勢允許在運行時輕鬆有效地從一個反應式類型轉換為另一個反應式類型。

結論 (Conclusion)

我希望這篇部落格文章能幫助你更好地理解反應式類型。

我們正在 Spring Framework、Spring Boot、Spring Data、Spring Security 和 Spring Cloud 等各種 Spring 專案中使用像 Reactor MonoFlux 這樣的類型來進行反應式支援。

但你即將推出的反應式應用程式也將直接使用這些類型,例如在 @Repository@Service@Controller 方法層級,因為構建反應式應用程式意味著在使用需要處理延遲或流的反應式語意(我們還將提供一些整合阻塞式 API 的指南)。

我們將在未來幾個月發布更多反應式部落格文章。 隨時熟悉 這個測試驅動的 Lite Rx API Hands-On,它將教你如何使用 FluxMono,並且像往常一樣,歡迎你的回饋!

如果你碰巧在五月中旬在巴塞隆納(無論如何,在巴塞隆納總是不錯的!),請不要錯過參加 Spring I/O 會議的機會。 此外,SpringOne Platform(八月初,拉斯維加斯)的註冊最近已經開始,如果你想從早鳥票價中受益。

取得 Spring 電子報 (Get the Spring newsletter)

與 Spring 電子報保持聯繫 (Stay connected with the Spring newsletter)

訂閱 (Subscribe)

搶先一步 (Get ahead)

VMware 提供培訓和認證,以加速你的進展 (VMware offers training and certification to turbo-charge your progress)。

瞭解更多 (Learn more)

取得支援 (Get support)

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案 (Tanzu Spring offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription)。

瞭解更多 (Learn more)

即將舉行的活動 (Upcoming events)

查看 Spring 社群中所有即將舉行的活動。

查看全部