領先一步
VMware 提供培訓和認證,以加速您的進度。
瞭解更多在本文中,我們將繼續探討反應式程式設計系列,重點將不再是學習基本的 API,而是更具體的用例以及編寫實際執行某些有用操作的程式碼。 我們將看到反應式是並行程式設計的有用抽象,但它也具有一些非常底層的功能,我們應該學會尊重和謹慎地對待它們。 如果我們開始充分利用這些功能,我們就可以控制應用程式中以前不可見的層,這些層被容器、平台和框架隱藏起來。
成為反應式會迫使您以不同的方式看待世界。 不是要求某件事然後得到它(或沒有得到它),而是所有東西都以序列 (Publisher
) 的形式傳遞,並且您必須訂閱它。 您不必等待答案,而是必須註冊一個回調。 當您習慣它時,這並不難,但是除非整個世界都顛倒過來並變成反應式的,否則您會發現您需要與舊式阻塞 API 互動
假設我們有一個阻塞方法,它返回一個 HttpStatus
private RestTemplate restTemplate = new RestTemplate();
private HttpStatus block(int value) {
return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
.getStatusCode();
}
並且我們想要使用不同的參數重複調用它並匯總結果。 這是一個經典的「分散-收集」用例,例如,如果您有一個分頁後端需要總結多個頁面中的「前 N 名」項目。 由於非反應式(阻塞)操作的細節與分散-收集模式無關,因此我們可以將它們推送到一個名為 block()
的方法中,並稍後實現它。 這是一個(糟糕的)範例,它調用後端並匯總到類型為 Result
的物件中
Flux.range(1, 10) // (1)
.log()
.map(this::block) // (2)
.collect(Result::new, Result::add) // (3)
.doOnSuccess(Result::stop) // (4)
進行 10 次調用
此處為阻塞程式碼
收集結果並匯總到單個物件中
最後停止計時(結果為 Mono<Result>
)
不要在家裡做這個。 這是一個「糟糕的」範例,因為雖然 API 在技術上被正確使用,但我們知道它會阻塞調用執行緒; 此程式碼或多或少等效於一個 for 迴圈,其中呼叫到 block()
位於迴圈主體中。 更好的實現方式是將對 block()
的呼叫推送到背景執行緒。 我們可以透過將它包裝在一個返回 Mono<HttpStatus>
的方法中來做到這一點
private Mono<HttpStatus> fetch(int value) {
return Mono.fromCallable(() -> block(value)) // (1)
.subscribeOn(this.scheduler); // (2)
}
阻塞程式碼位於 Callable 內部,以延遲執行
在背景執行緒上訂閱慢速發布者
scheduler
被宣告為一個共享欄位:Scheduler scheduler = Schedulers.parallel()
。 然後我們可以宣告我們想要 flatMap()
這個序列而不是使用 map()
Flux.range(1, 10)
.log()
.flatMap( // (1)
this::fetch, 4) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop)
下降到一個新的發布者以並行處理
flatMap 中的並行提示
如果我們想在非反應式環境中執行上面的分散-收集程式碼,我們可以像這樣使用 Spring MVC
@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
return Flux.range(1, 10)
...
.doOnSuccess(Result::stop)
.toFuture();
}
如果您閱讀 @RequestMapping
的 Javadoc,您會發現一個方法可以返回一個 CompletableFuture
「應用程式使用它在自己選擇的單獨執行緒中產生傳回值」。 在這種情況下,單獨的執行緒由「scheduler」提供,它是一個執行緒池,因此處理發生在多個執行緒上,每次 4 個,因為 flatMap()
的呼叫方式。
具有背景執行緒的分散-收集是一種有用的模式,但它並不完美 — 它沒有阻塞呼叫者,但它阻塞了某些東西,因此它只是轉移了問題。 有一些實際的含義。 我們有一個 HTTP 伺服器,它可能具有非阻塞 IO 處理常式,將工作傳回給一個執行緒池,每個 HTTP 請求一個執行緒 — 所有這些都發生在 servlet 容器(例如 Tomcat)內部。 請求以非同步方式處理,因此 Tomcat 中的 worker 執行緒不會被阻塞,並且我們在「scheduler」中建立的執行緒池最多以 4 個並行執行緒進行處理。 我們正在處理 10 個後端請求(呼叫 block()
),因此使用 scheduler 的最大理論好處是延遲降低 4 倍。 換句話說,如果單個執行緒中一個接一個地處理所有 10 個請求需要 1000 毫秒,我們可能會看到 HTTP 服務的單個傳入請求的處理時間為 250 毫秒。 我們應該強調「可能」:只有在處理執行緒沒有爭用(在 Tomcat worker 和應用程式 scheduler 兩個階段)的情況下,它才會這麼快。 如果您有一台具有大量核心、非常低的並發性(即少量用戶端連接到您的應用程式),並且幾乎沒有兩個用戶端同時發出請求的機會的伺服器,那麼您可能會看到接近理論上的改進。 一旦有多個用戶端嘗試連接,他們都將爭奪相同的 4 個執行緒,並且延遲會漂移上升,甚至可能比單個用戶端在沒有背景處理的情況下體驗到的延遲更糟。 我們可以透過使用更大的執行緒池建立 scheduler 來改善並行用戶端的延遲,例如
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
(16 個執行緒。)現在我們使用更多記憶體來儲存執行緒及其堆疊,並且我們可以預期在低並發性下會看到更低的延遲,但如果我們的硬體少於 16 個核心,則在高並發性下不一定會看到。 我們也不期望在高負載下獲得更高的吞吐量:如果執行緒存在爭用,則管理這些資源的成本很高,並且必須以某種方式反映在重要的指標中。 如果您對這種權衡的更詳細分析感興趣,可以在 Rob Harrop 的部落格系列中找到關於效能指標如何在負載下擴展的一些詳細分析。
提示
預設情況下,Tomcat 會分配 100 個執行緒來處理 HTTP 請求。 如果我們知道所有處理都將在我們的 scheduler 執行緒池上進行,那麼這是過多的。 存在阻抗不匹配:scheduler 執行緒池可能成為瓶頸,因為它的執行緒少於上游 Tomcat 執行緒池。 這突顯了一個事實,即效能調整可能非常困難,而且,雖然您可以控制所有配置,但這是一個微妙的平衡。
如果我們使用一個根據需求調整其容量的 scheduler,我們可以做得比固定的執行緒池更好。 Reactor 為此提供了一個便利,因此如果您使用 Schedulers.elastic()
嘗試相同的程式碼(您可以隨時呼叫它;只有一個實例),您會看到在負載下會建立更多執行緒。
從阻塞式(blocking)轉向反應式(reactive)是一種很有用的模式,而且使用 Spring MVC 中現有的技術很容易實現(如上所示)。反應式旅程的下一個階段是完全擺脫應用程式線程中的阻塞,而這需要新的 API 和新的工具。最終,我們必須在堆疊的各個層面都是反應式的,包括伺服器和客戶端。這是 Spring Reactive 的目標,它是一個新的框架,與 Spring MVC 正交,但滿足相同的需求,並使用類似的程式設計模型。
注意
Spring Reactive 最初是一個獨立的專案,但在 5.0 版本(第一個里程碑是 2016 年 6 月)中被整合到 Spring Framework 中。
在我們的 scatter-gather 範例中,要實現完全的反應式,第一步是將 classpath 上的 spring-boot-starter-web
替換為 spring-boot-starter-webflux
。在 Maven 中:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
或在 Gradle 中:
dependencies {
compile('org.springframework.boot:spring-boot-starter-webflux')
...
}
然後在 controller 中,我們可以簡單地捨棄到 CompletableFuture
的橋樑,並返回一個 Mono
類型的物件:
@RequestMapping("/parallel")
public Mono<Result> parallel() {
return Flux.range(1, 10)
.log()
.flatMap(this::fetch, 4)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
將此程式碼放入 Spring Boot 應用程式中,它將在 Tomcat、Jetty 或 Netty 中運行,具體取決於它在 classpath 上找到的內容。 Tomcat 是該 starter 中的預設伺服器,因此如果要切換,必須排除它並提供另一個伺服器。三者在啟動時間、記憶體使用量和運行時資源使用量方面都具有非常相似的特性。
我們仍然在 block()
中有阻塞式的後端呼叫,因此我們仍然需要在線程池上進行訂閱,以避免阻塞呼叫者。如果我們有一個非阻塞的客戶端,例如,不用 RestTemplate
,而是使用新的 WebClient
,我們可以這樣做來使用非阻塞的客戶端:
private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
private Mono<HttpStatus> fetch(int value) {
return this.client.perform(HttpRequestBuilders.get("http://example.com"))
.extract(WebResponseExtractors.response(String.class))
.map(response -> response.getStatusCode());
}
請注意,WebClient.perform()
(或更準確地說是 WebResponseExtractor
)具有反應式返回類型,我們已將其轉換為 Mono<HttpStatus>
,但我們尚未訂閱它。我們希望框架來完成所有的訂閱,所以現在我們在各個層面都是反應式的。
警告
Spring Reactive 中返回 Publisher
的方法**是**非阻塞的,但通常返回 Publisher
(或 Flux
、Mono
或 Observable
)的方法只是一種提示,它可能不是阻塞的。如果您正在編寫這類方法,請務必分析(最好是測試)它們是否會阻塞,並明確地告知呼叫者它們是否可能會阻塞。
注意
我們剛才使用的使用非阻塞客戶端來簡化 HTTP 堆疊的技巧在常規 Spring MVC 中也同樣有效。上述 fetch()
方法的結果可以轉換為 CompletableFuture
,並從常規的 @RequestMapping
方法中傳遞出去(例如在 Spring Boot 1.3 中)。
現在我們可以移除 HTTP 請求處理程序中對 fetch()
的呼叫之後的並發提示:
@RequestMapping("/netty")
public Mono<Result> netty() {
return Flux.range(1, 10) // (1)
.log() //
.flatMap(this::fetch) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
進行 10 次調用
下降到一個新的發布者以並行處理
考虑到我们根本不需要额外的可调用和订阅者线程,这段代码比我们必须桥接到阻塞式客户端时要简洁得多,这可以归因于代码在各个层面都是反应式的。反应式的 WebClient
返回一个 Mono
,它立即驱动我们选择转换链中的 flatMap()
,然后我们需要的代码就直接出现了。编写起来是一种更棒的体验,而且更易于阅读,因此更容易维护。此外,由于没有线程池化和没有并发提示,因此我们无需将 4 这个神奇因子插入到我们的性能预期中。某个地方存在一个限制,但它不再是由我们在应用层中的选择强加的,也不是由服务器“容器”中的任何东西限制的。这不是魔法,而且仍然有物理定律,因此后端呼叫仍然需要大约 100 毫秒左右,但在低争用情况下,我们甚至可能会看到所有 10 个请求在大致相同的时间内完成,这与一个请求所需的时间大致相同。随着服务器上的负载增加,延迟和吞吐量自然会降低,但这种降低的方式是由缓冲区争用和内核网络控制的,而不是由应用程序线程管理控制的。这是一种控制反轉,反轉到應用程式碼下方的堆疊的較低層級。
請記住,相同的應用程式碼在 Tomcat、Jetty 或 Netty 上運行。目前,Tomcat 和 Jetty 支援是在 Servlet 3.1 非同步處理之上提供的,因此不再僅限於每個線程一個請求。它建立在反應式橋樑之上,該橋樑將 Servlet 3.1 概念適應於反應式範例。對於 Reactor Netty 而言,背壓和反應式支援是內建的。根據您對 HTTP 客戶端庫的選擇,伺服器和客戶端可能會共享相同的 HTTP 資源,並進一步最佳化。我們將在本系列的另一篇文章中稍後再回顧這一點。
提示
在範例程式碼中,「reactive」範例具有 Maven profiles「tomcat」、「tomcatNext」(適用於 Tomcat 8.5)、「jetty」和「netty」,因此您可以輕鬆試用所有不同的伺服器選項,而無需更改任何程式碼。
注意
許多應用程式中的阻塞程式碼不是 HTTP 後端呼叫,而是資料庫互動。目前,很少有資料庫支援非阻塞客戶端(MongoDB 和 Couchbase 是值得注意的例外,但即使是這些也不如 HTTP 客戶端成熟)。在所有資料庫供應商在客戶端方面趕上之前,線程池和阻塞到反應式模式將會有很長的壽命。
我們已經縮減了基本的 scatter-gather 用例,直到程式碼非常簡潔,並且非常符合其運行的硬體。我們編寫了一些簡單的程式碼,並且使用 Spring 將其很好地堆疊和協調成一個可用的 HTTP 服務。在晴朗的日子裡,每個人都對結果感到非常滿意。但是,一旦出現錯誤,例如行為不端的網路連線,或者延遲較差的後端服務,我們就會遭受損失。
遭受損失的第一個,也是最明顯的方式是,我們編寫的程式碼是聲明性的,因此很難進行除錯。發生錯誤時,診斷可能非常不透明。使用原始的、底層的 API,例如沒有 Spring 的 Reactor,甚至降到沒有 Reactor 的 Netty 的層級,可能會使情況變得更糟,因為那樣我們將不得不自己建立大量的錯誤處理,每次與網路互動時都要重複樣板程式碼。至少在 Spring 和 Reactor 混合使用的情況下,我們可以期望看到針對迷路、未捕獲異常記錄的堆疊追蹤。儘管它們可能不容易理解,因為它們發生在我們無法控制的線程上,並且它們有時會顯示為非常底層的問題,來自堆疊中不熟悉的部分。
另一個痛苦的來源是,如果我們犯了一個錯誤並在我們的反應式回呼函數中阻塞,我們將會阻礙**所有**在同一線程上的請求。使用基於 servlet 的容器,每個請求都與一個線程隔離,並且阻塞不會阻礙其他請求,因為它們是在不同的線程上處理的。阻塞所有請求仍然是一個麻煩的起因,但它只會顯示為每個請求的延遲增加大約一個常數因子。在反應式世界中,阻塞單個請求可能會導致所有請求的延遲增加,而阻塞所有請求可能會使伺服器癱瘓,因為沒有額外的緩衝區和線程來承擔鬆弛。
能夠控制非同步處理中的所有移動部件是很棒的:每個層都有一個線程池大小和一個佇列。我們可以使其中的一些層具有彈性,並嘗試根據它們所做的工作量來調整它們。但是在某個時候,它變成了一種負擔,我們開始尋找更簡單或更精簡的東西。對可擴展性的分析得出的結論是,通常最好減少額外的線程,並使用物理硬體施加的約束。這是一個「機械同情」的例子,正如 LMAX 在 Disruptor Pattern 中著名地利用它來取得巨大成效。
我們已經開始看到反應式方法的威力,但請記住,權力伴隨著責任。它是激進的,也是根本的。它是「撕毀它並重新開始」的領域。因此,您也希望理解反應式並不能解決所有問題。事實上,它並不是任何問題的解決方案,它僅僅有助於解決一類特定的問題。從使用它獲得的好處可能超過學習它、修改您的 API 以使其在各個層面都是反應式的,以及之後維護程式碼的成本,因此請謹慎行事。