領先一步
VMware 提供訓練與認證,加速您的進展。
了解更多這篇部落格文章是系列文章中的第一篇,旨在深入探討 Reactor 更進階的概念和內部運作。
它改編自我的 Flux 的旅程
演講,我發現其內容更適合部落格文章的形式。
當其他文章發布時,我會更新下表中的連結,以下是計劃的內容
如果您錯過了 Reactive Streams 的簡介和 Reactor 的基本概念,請前往網站的 學習區塊 和 參考指南。
事不宜遲,讓我們開始吧
當您第一次學習 JVM 上的 Reactive Streams 和反應式程式設計時,您首先學到的是 Publisher
和 Subscriber
之間的高階關係:一個產生資料,另一個消費資料。很簡單,對吧?此外,Publisher
似乎會將資料推送到 Subscriber
。
但是當使用像 Reactor (或 RxJava2) 這樣的 Reactive Streams 程式庫時,您很快就會遇到以下口頭禪
在您訂閱之前,什麼都不會發生
有時,您可能會讀到這兩個程式庫都實作了「推-拉混合模型」。等一下!拉?
我們稍後會回到這個問題,但要理解這句話,您首先需要意識到,預設情況下,Reactor 的反應式類型是惰性的。
在 Flux
或 Mono
(運算子) 上呼叫方法不會立即觸發行為。相反,會傳回一個新的 Flux
(或 Mono
) 實例,您可以在其上繼續組合更多運算子。因此,您建立了一個運算子鏈 (或一個運算子非循環圖),它代表您的非同步處理管線。
這個宣告式階段稱為組裝時間。
讓我們舉一個範例,其中用戶端應用程式向伺服器發出 HTTP 請求,期望得到 HttpResponse
Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made
可以使用流暢的 API 簡化此操作
Mono<String> quote = makeHttpRequest()
.map(req -> parseJson(req))
.map(json -> json.getString("quote"));
一旦您完成宣告管線,就會出現兩種情況:要麼您將代表處理管線的 Flux
/Mono
傳遞到另一段程式碼,要麼您觸發管線。
前者表示您將 Mono
傳回的程式碼可能會應用其他運算子,從而產生新的衍生管線。由於運算子會建立新的實例 (就像洋蔥一樣),您自己的 Mono
不會被變更,因此它可以進一步裝飾多次,結果可能大相逕庭
//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
//by this point, none of the 3 "pipelines" have triggered an HTTP request
將其與 CompletableFuture
進行比較,後者本質上不是惰性的:一旦您引用了 CompletableFuture
,就表示處理已經在進行中...
考慮到這一點,讓我們看看如何觸發反應式管線。
到目前為止,我們已經組裝了一個非同步管線。也就是說,我們透過使用運算子來實例化 Flux
和 Mono
變數,這會產生其他 Flux
/Mono
,其行為像洋蔥一樣分層。
但是資料尚未開始流經這些宣告的管線。
那是因為資料流動的觸發器不是管線的宣告,而是對它的訂閱。記住
在您訂閱之前,什麼都不會發生
訂閱的行為是說「好的,這個管線代表資料的轉換,而我對該資料的最終形式感興趣」。最常見的方法是呼叫 Flux.subscribe(valueConsumer, errorConsumer)
。
這種感興趣的訊號會向後傳播通過運算子鏈,直到來源運算子,即實際產生初始資料的 Publisher
makeHttpRequest() //<5>
.map(req -> parseJson(req)) //<4>
.map(json -> json.getString("quote")) //<3>
.flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
.subscribe(System.out::println, Throwable::printStackTrace); //<1>
Flux
,聲明我們要將每個單字列印到主控台 (並列印任何錯誤的堆疊追蹤)flatMapMany
步驟...map
步驟...map
步驟...makeHttpRequest()
(我們將其視為我們的來源)此時,來源被觸發。它以適當的方式產生資料:在這裡,它會向產生 JSON 的端點發出 HTTP 請求,然後發出 HTTP 回應。
從那時起,我們就進入了執行時間。資料已開始流經管線 (以更自然的由上而下順序,或上游到下游)
HttpResponse
會發送到 parseJson
map
getString
map
flatMapMany
flatMapMany
將引言拆分為單字並個別發出每個單字subscribe
中的值處理常式會收到每個單字的通知,並將這些單字列印到主控台,每行一個單字希望這有助於您理解組裝時間和訂閱/執行時間之間的差異!
在解釋完差異並介紹這個口頭禪之後,現在可能是介紹一個例外的好時機:笑
在您訂閱之前,什麼都不會發生... 直到發生某些事情
到目前為止,我們一直在處理一種稱為 冷發布者 (Cold Publisher) 的 Flux
和 Mono
來源。正如我們所解釋的,這些 Publisher
是惰性的,並且僅在有 Subscription
時才產生資料。此外,它們會為每個個別的 Subscription
重新產生資料。
在我們的 HTTP 回應 Mono
範例中,HTTP 請求將針對每個訂閱執行
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request
順帶一提,某些運算子的行為暗示了多個訂閱。例如,retry
在發生錯誤 (onError
訊號) 時重新訂閱其來源,而 repeat
對於 onComplete
訊號執行相同的操作。
因此,對於像 HTTP 請求這樣的冷來源,像 retry
這樣的東西會重新執行請求,從而允許從暫時性的伺服器端錯誤中恢復,例如。
另一方面,熱發布者 (Hot Publisher) 並不明確:它不一定需要 Subscriber
才能開始泵送資料。它也不一定為每個新的 Subscriber
重新產生專用資料。
為了說明這一點,讓我們介紹一個新的冷發布者範例,然後我們將展示如何將冷發布者變成熱發布者
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
這會印出
clock1 1s
clock1 2s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock1 5s
clock2 3s
clock1 6s
clock2 4s
我們可以透過呼叫 share()
將 clockTicks
來源變成熱來源
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
它產生以下結果
clock1 1s
clock1 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
您會看到現在兩個訂閱共用相同的時鐘刻度。share()
透過讓來源將元素多播到新的 Subscribers
來將冷轉換為熱,但僅限於這些新訂閱之後發出的元素。由於 clock2
在 2 秒後訂閱,因此它錯過了早期的發射 1s
和 2s
。
因此,熱發布者可能不那麼惰性,即使它們通常至少需要一個初始 Subscription
才能觸發資料流。
在本文中,我們了解了實例化 Flux
/ 鏈接運算子 (又名 組裝時間)、觸發它 (又名 訂閱時間) 和執行它 (又名 執行時間) 之間的差異。
因此,我們了解到 Flux
和 Mono
大多數情況下是惰性的 (又名 冷發布者 (Cold Publisher)):在您訂閱它們之前,什麼都不會發生。
最後,我們了解了 Flux
和 Mono
的另一種風味,稱為 熱發布者 (Hot Publisher),它的行為略有不同,並且不那麼惰性。
在下一篇文章中,我們將了解為什麼這三個階段在您作為開發人員如何偵錯基於 reactor 的程式碼方面產生重大差異。
同時,祝您反應式程式設計愉快!