Flux 1 的旅程 - 組裝 vs 訂閱

工程 | Simon Baslé | 2019年03月06日 | ...

這篇部落格文章是系列文章中的第一篇,旨在深入探討 Reactor 更進階的概念和內部運作。

它改編自我的 Flux 的旅程 演講,我發現其內容更適合部落格文章的形式。

當其他文章發布時,我會更新下表中的連結,以下是計劃的內容

  1. 組裝 vs 訂閱 (本文)
  2. 偵錯注意事項
  3. 跳躍執行緒與排程器
  4. 內部運作:工作竊取
  5. 內部運作:運算子融合

如果您錯過了 Reactive Streams 的簡介和 Reactor 的基本概念,請前往網站的 學習區塊參考指南

事不宜遲,讓我們開始吧

組裝時間

當您第一次學習 JVM 上的 Reactive Streams反應式程式設計時,您首先學到的是 PublisherSubscriber 之間的高階關係:一個產生資料,另一個消費資料。很簡單,對吧?此外,Publisher 似乎會將資料推送Subscriber

但是當使用像 Reactor (或 RxJava2) 這樣的 Reactive Streams 程式庫時,您很快就會遇到以下口頭禪

在您訂閱之前,什麼都不會發生

有時,您可能會讀到這兩個程式庫都實作了「推-拉混合模型」。等一下!

我們稍後會回到這個問題,但要理解這句話,您首先需要意識到,預設情況下,Reactor 的反應式類型是惰性的

FluxMono (運算子) 上呼叫方法不會立即觸發行為。相反,會傳回一個新的 Flux (或 Mono) 實例,您可以在其上繼續組合更多運算子。因此,您建立了一個運算子鏈 (或一個運算子非循環圖),它代表您的非同步處理管線

這個宣告式階段稱為組裝時間

讓我們舉一個範例,其中用戶端應用程式向伺服器發出 HTTP 請求,期望得到 HttpResponse

Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made

可以使用流暢的 API 簡化此操作

Mono<String> quote = makeHttpRequest()
    .map(req -> parseJson(req))
    .map(json -> json.getString("quote"));

一旦您完成宣告管線,就會出現兩種情況:要麼您將代表處理管線的 Flux/Mono 傳遞到另一段程式碼,要麼您觸發管線。

前者表示您將 Mono 傳回的程式碼可能會應用其他運算子,從而產生新的衍生管線。由於運算子會建立新的實例 (就像洋蔥一樣),您自己的 Mono 不會被變更,因此它可以進一步裝飾多次,結果可能大相逕庭

//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);

//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

//by this point, none of the 3 "pipelines" have triggered an HTTP request

將其與 CompletableFuture 進行比較,後者本質上不是惰性的:一旦您引用了 CompletableFuture,就表示處理已經在進行中...

考慮到這一點,讓我們看看如何觸發反應式管線。

訂閱時間

到目前為止,我們已經組裝了一個非同步管線。也就是說,我們透過使用運算子來實例化 FluxMono 變數,這會產生其他 Flux/Mono,其行為像洋蔥一樣分層。

但是資料尚未開始流經這些宣告的管線。

那是因為資料流動的觸發器不是管線的宣告,而是對它的訂閱。記住

在您訂閱之前,什麼都不會發生

訂閱的行為是說「好的,這個管線代表資料的轉換,而我對該資料的最終形式感興趣」。最常見的方法是呼叫 Flux.subscribe(valueConsumer, errorConsumer)

這種感興趣的訊號會向後傳播通過運算子鏈,直到來源運算子,即實際產生初始資料的 Publisher

makeHttpRequest() //<5>
    .map(req -> parseJson(req)) //<4>
    .map(json -> json.getString("quote")) //<3>
    .flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
    .subscribe(System.out::println, Throwable::printStackTrace); //<1>
  1. 我們訂閱了文字 Flux,聲明我們要將每個單字列印到主控台 (並列印任何錯誤的堆疊追蹤)
  2. 這種興趣會傳訊到 flatMapMany 步驟...
  3. ...然後將其訊號向上傳遞到 json map 步驟...
  4. ...然後是請求 map 步驟...
  5. ...最後到達 makeHttpRequest() (我們將其視為我們的來源)

此時,來源被觸發。它以適當的方式產生資料:在這裡,它會向產生 JSON 的端點發出 HTTP 請求,然後發出 HTTP 回應。

從那時起,我們就進入了執行時間。資料已開始流經管線 (以更自然的由上而下順序,或上游下游)

  1. HttpResponse 會發送到 parseJson map
  2. 它會提取 JSON 主體並將其發送到 getString map
  3. 它會提取引言並將其傳遞給 flatMapMany
  4. flatMapMany 將引言拆分為單字並個別發出每個單字
  5. subscribe 中的值處理常式會收到每個單字的通知,並將這些單字列印到主控台,每行一個單字

希望這有助於您理解組裝時間和訂閱/執行時間之間的差異!

冷 (Cold) vs 熱 (Hot)

在解釋完差異並介紹這個口頭禪之後,現在可能是介紹一個例外的好時機:笑

在您訂閱之前,什麼都不會發生... 直到發生某些事情

冷 (Cold)

到目前為止,我們一直在處理一種稱為 冷發布者 (Cold Publisher)FluxMono 來源。正如我們所解釋的,這些 Publisher 是惰性的,並且僅在有 Subscription 時才產生資料。此外,它們會為每個個別的 Subscription 重新產生資料。

在我們的 HTTP 回應 Mono 範例中,HTTP 請求將針對每個訂閱執行

Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request

順帶一提,某些運算子的行為暗示了多個訂閱。例如,retry 在發生錯誤 (onError 訊號) 時重新訂閱其來源,而 repeat 對於 onComplete 訊號執行相同的操作。

因此,對於像 HTTP 請求這樣的冷來源,像 retry 這樣的東西會重新執行請求,從而允許從暫時性的伺服器端錯誤中恢復,例如。

熱 (Hot)

另一方面,熱發布者 (Hot Publisher) 並不明確:它不一定需要 Subscriber 才能開始泵送資料。它也不一定為每個新的 Subscriber 重新產生專用資料。

為了說明這一點,讓我們介紹一個新的冷發布者範例,然後我們將展示如何將發布者變成發布者

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

這會印出

clock1 1s
clock1 2s
clock1 3s
    clock2 1s
clock1 4s
    clock2 2s
clock1 5s
    clock2 3s
clock1 6s
    clock2 4s

我們可以透過呼叫 share()clockTicks 來源變成熱來源

Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

它產生以下結果

clock1 1s
clock1 2s
clock1 3s
    clock2 3s
clock1 4s
    clock2 4s
clock1 5s
    clock2 5s
clock1 6s
    clock2 6s

您會看到現在兩個訂閱共用相同的時鐘刻度。share() 透過讓來源將元素多播到新的 Subscribers 來將冷轉換為熱,但僅限於這些新訂閱之後發出的元素。由於 clock2 在 2 秒後訂閱,因此它錯過了早期的發射 1s2s

因此,熱發布者可能不那麼惰性,即使它們通常至少需要一個初始 Subscription 才能觸發資料流。

結論

在本文中,我們了解了實例化 Flux / 鏈接運算子 (又名 組裝時間)、觸發它 (又名 訂閱時間) 和執行它 (又名 執行時間) 之間的差異。

因此,我們了解到 FluxMono 大多數情況下是惰性的 (又名 冷發布者 (Cold Publisher)):在您訂閱它們之前,什麼都不會發生

最後,我們了解了 FluxMono 的另一種風味,稱為 熱發布者 (Hot Publisher),它的行為略有不同,並且不那麼惰性。

在下一篇文章中,我們將了解為什麼這三個階段在您作為開發人員如何偵錯基於 reactor 的程式碼方面產生重大差異。

同時,祝您反應式程式設計愉快!

取得 Spring 電子報

隨時接收 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供訓練與認證,加速您的進展。

了解更多

取得支援

Tanzu Spring 以單一訂閱方式,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援與二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看所有