Flux 3 的旅程 - 跳躍執行緒與排程器

工程 | Simon Baslé | 2019 年 12 月 13 日 | ...

這篇部落格文章是系列文章中的第三篇,旨在深入探討 Reactor 更進階的概念和內部運作。

在這篇文章中,我們將探討執行緒模型、某些(大多數)運算子如何與並行無關、Scheduler 抽象化,以及如何使用 publishOn 等運算子在序列中途從一個執行緒跳到另一個執行緒。

本系列源自 Flight of the Flux 演講,我發現其內容更適合部落格文章的形式。

下表將在其他文章發佈時更新連結,但以下是計劃內容

  1. 組裝 vs 訂閱
  2. 偵錯注意事項
  3. 跳躍執行緒與排程器(本文)
  4. 內部運作:工作竊取
  5. 內部運作:運算子融合

如果您錯過了 Reactive Streams 和 Reactor 基本概念的介紹,請前往網站的 學習區塊參考指南

事不宜遲,讓我們開始吧

執行緒模型

Reactor 運算子通常是與並行無關的:它們不強制特定的執行緒模型,而只是在調用其 onNext 方法的 Thread 上執行。

正如我們在本系列的第一篇文章中看到的,執行訂閱調用的 Thread 也會產生影響:subscribe 調用會被鏈接起來,直到到達產生資料的 Publisher(運算子鏈的最左側部分),然後這個 Publisher 通過 onSubscribe 提供一個 Subscription,依序向下傳遞到鏈中,被請求等等... 再次強調,預設情況下,這個資料生產過程會在啟動訂閱的 Thread 上開始。

對於此情況有一個普遍的例外:處理時間概念的運算子。任何此類運算子都將預設在 Schedulers.parallel() 排程器上運行計時器/延遲/等等...

還有一些其他的例外情況,它們也在這個 parallel() Scheduler 上運行。它們可以通過至少有一個接受 Scheduler 參數的重載來識別。

但是 Scheduler 是什麼?我們為什麼需要它?

Scheduler 抽象化

在 Reactor 中,Scheduler 是一個抽象概念,它讓使用者可以控制執行緒。Scheduler 可以產生 WorkerWorker 在概念上是 Threads,但不一定由 Thread 支援(我們稍後會看到一個例子)。Scheduler 也包含時鐘的概念,而 Worker 純粹是關於排程任務。

interface Scheduler extends Disposable {
    
  Disposable schedule(Runnable task);
  Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
  Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  
  long now(TimeUnit unit);
  
  Worker createWorker();
  
  interface Worker extends Disposable {
    Disposable schedule(Runnable task);
    Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  }
}

Reactor 提供了幾個預設的 Scheduler 實作,每個實作都有其關於如何管理 Workers 的特性。它們可以通過 Schedulers 工廠方法實例化。以下是它們典型用法的經驗法則

  • 當 API 需要 Scheduler 但您不想更改執行緒時,Schedulers.immediate() 可以用作空物件
  • Schedulers.single() 適用於可以在唯一的 ExecutorService 上運行的單次性任務
  • Schedulers.parallel() 適用於 CPU 密集但生命週期短的任務。它可以並行執行 N 個此類任務(預設情況下 N == CPU 數量
  • Schedulers.elastic()Schedulers.boundedElastic() 適用於生命週期更長的任務(例如,阻塞 IO 任務)。elastic 會按需產生執行緒,沒有限制,而最近引入的 boundedElastic 則以創建的執行緒數量為上限執行相同的操作。

每種 Scheduler 類型都有一個由上述方法返回的預設全域實例,但可以使用 Schedulers.new*** 工廠方法創建新實例(例如,Schedulers.newParallel("myParallel", 10)) 來創建一個自訂的並行 Scheduler,其中 N = 10)。

parallel 類型由 N 個 worker 支援,每個 worker 都基於 ScheduledExecutorService。如果您向其提交 N 個長時間運行的任務,則無法執行更多工作,因此它適用於生命週期短的任務。

elastic 類型也由基於 ScheduledExecutorService 的 worker 支援,但它會按需創建這些 worker 並將它們池化。不再使用的 Workerdispose() 時返回池中,並將在此處保留配置的 TTL 持續時間,以便新的傳入任務可以重複使用閒置的 worker。但是,如果沒有可用的閒置 Worker,它會繼續創建新的 worker。

boundedElastic 類型在概念上與 elastic 類型非常相似,只是它對它創建的基於 ScheduledExecutorServiceWorker 數量設置了上限。超過此點後,其 createWorker() 方法會返回一個 facade Worker,它將對任務進行排隊,而不是立即提交它們。一旦具體的 Worker 變得可用,它就會與 facade 交換,並開始實際提交任務(使其表現得好像您只是剛提交了任務,包括延遲的任務)。此外,可以對 Scheduler 實例的所有 facade worker 可以排隊的延遲任務總數設置上限。

排程器總是基於 ExecutorService 嗎?

正如我們上面所說,不是的。我們實際上已經看到了一個例子:immediate() Scheduler。這個排程器不會修改程式碼運行的 Thread

但在 reactor-test 程式庫中有一個更有用的例子:VirtualTimeScheduler。這個 Scheduler 在目前的 Thread 上執行,但會將提交給它的所有任務都蓋上它們應該運行的時間戳記。

然後它管理一個虛擬時鐘(感謝 Scheduler 也具有時鐘的職責),可以手動推進。這樣做時,將會執行排隊在新的虛擬時間戳記之前或在新的虛擬時間戳記執行的任務。

這在測試場景中非常有用,在這些場景中,您有一個具有長時間間隔/延遲的 FluxMono,並且您想要測試邏輯而不是時序。例如,類似於 Mono.delay(Duration.ofHours(4)) 的東西可以在 100ms 以下的時間內運行...

人們也可以想像圍繞 Actor 系統、ForkJoinPool、即將到來的 Loom fibers 等實作 Scheduler...

關於 Thread

通常,人們會詢問關於在 Scheduler 的執行緒和執行緒之間來回切換的問題。從主執行緒切換到排程器顯然是可能的,但是從任意執行緒切換到執行緒是不可能的。這完全是 Java 的限制,因為沒有辦法將任務提交到執行緒(例如,沒有 MainThreadExecutorService)。

將排程器應用於運算子

既然我們熟悉了 Reactor 中執行緒的建構區塊,讓我們看看這如何在運算子的世界中轉化。

我們已經確定,大多數運算子會繼續在發出訊號的 Thread 上工作,除了基於時間的運算子(例如 Mono.delaybufferTimeout() 等...)。

Reactor 的哲學是通過組合運算子來為您提供執行正確操作的工具。執行緒也不例外:認識一下 subscribeOnpublishOn

這兩個運算子只是簡單地取得一個 Scheduler,並將在該排程器的其中一個 Worker 上切換執行。當然,兩者之間存在重大差異:)

publishOn(Scheduler s) 運算子

當您想要跳躍執行緒時,這是您需要的基本運算子。來自其來源的傳入訊號會在給定的 Scheduler發佈,有效地將執行緒切換到該排程器的其中一個 worker。

這對於 onNextonCompleteonError 訊號有效。也就是說,從上游來源流向下游訂閱者的訊號。

因此,本質上,出現在此運算子之下的每個處理步驟都將在新的 Scheduler s 上執行,直到另一個運算子再次切換(例如,另一個 publishOn)。

讓我們舉一個刻意粗略的阻塞調用範例。但請記住,反應式鏈中的阻塞調用始終是粗略的!:)

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

在上面的範例中,假設此程式碼在執行緒上執行,則每個 Flux.fromIterable 都會在同一個 Thread 上發出其 List 的內容。然後,我們在 map 內部使用命令式阻塞 Web 用戶端來提取每個 url 的主體,這會「繼承」該執行緒(因此會阻塞它)。因此,每個 subscribe 中的資料消耗 lambda 也會在主執行緒上運行。

因此,所有這些 url 都會在主執行緒上循序處理

main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E

如果我們引入 publishOn,我們可以使此程式碼更高效,以便 Flux 不會互相阻塞

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

這可能會為我們提供類似於以下輸出的結果

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

現在第一個列表和第二個列表交錯了,太棒了!

subscribeOn(Scheduler s) 運算子

在前面的範例中,我們看到了如何使用 publishOn 通過在提供的 Scheduler 上切換該阻塞工作的觸發器(要提取的 url)的發佈,從而在單獨的 Thread 上偏移阻塞工作。

由於 map 運算子在其來源執行緒上運行,因此通過在 map 之前放置 publishOn 來切換該來源執行緒可以按預期工作。

但是,如果該 url 提取方法是由其他人編寫的,並且他們遺憾地忘記添加 publishOn 呢?有沒有辦法影響上游Thread

在某種程度上,是有的。這就是 subscribeOn 可以派上用場的地方。

此運算子會更改執行 subscribe 方法的位置。由於 subscribe 訊號向上流動,因此它會直接影響來源 Flux 訂閱和開始產生資料的位置。

因此,它可以看起來像是在向上和向下作用於運算子的反應式鏈的部分(只要混合中沒有拋入 publishOn

//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .map(url -> blockingWebClient.get(url)); //oops!
}

//your code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

就像我們的第二個 publishOn 範例一樣,該程式碼將正確輸出類似於以下的內容

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

那麼發生了什麼事?

subscribe 調用仍然在執行緒上運行,但它們將 subscribe 訊號傳播到其來源 subscribeOn。反過來,subscribeOn 會將相同的訊號從 fetchUrls 傳播到其自身的來源,但會在 boundedElastic Worker

fetchUrls 返回的 Flux 序列中,map 會在 boundedElastic worker 執行緒上訂閱,range 也是如此。range 開始產生資料,仍然在 boundedElastic worker 執行緒上。

這會沿著資料路徑繼續向下,每個訂閱者都在其來源執行緒(即 boundedElastic 執行緒)上執行 onNext

最後,在 subscribe(...) 調用中配置的 lambda 也會在 boundedElastic 執行緒上執行。

重要

重要的是要區分訂閱的動作和傳遞給 subscribe() 方法的 lambda。此方法會訂閱其來源 Flux,但 lambda 會在處理結束時執行,當資料已流經所有步驟(包括跳躍到另一個執行緒的步驟)時。

因此,執行 lambda 的 Thread 可能與訂閱 Thread 不同,即調用 subscribe 方法的執行緒。

如果我們是 fetchUrls 程式庫的作者,我們可以通過利用稍微不同的方式使用 subscribeOn,讓每次提取都在其自己的 Worker 上運行,從而使程式碼更加高效

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

如果我將兩者混合使用會怎麼樣?

subscribeOn 將在整個訂閱階段(從下到上)以及資料路徑上起作用,直到遇到 publishOn(或基於時間的運算子)。

讓我們考慮以下範例

Flux.just("hello")
    .doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));

這將印出

just elastic-1
publish boundedElastic-1
hello delayed parallel-1

我們應該逐步解開發生的事情

  • 在這裡,subscribe執行緒上調用,但由於正上方的 subscribeOn,訂閱會迅速切換到 elastic 排程器。
  • 它上面的所有運算子也在 elastic 上訂閱,從下到上。
  • justelastic 排程器上發出其值。
  • 第一個 doOnNext 在同一個執行緒上接收該值並將其列印出來:just elastic-1
  • 然後在從上到下的資料路徑上,我們遇到 publishOn:來自 doOnNext 的資料在 boundedElastic 排程器上向下游傳播。
  • 第二個 doOnNextboundedElastic 上接收其資料,並相應地列印 publish bounderElastic-1
  • delayElements 是一個時間運算子,因此預設情況下它會在 Schedulers.parallel() 排程器上發佈資料。
  • 在資料路徑上,subscribeOn 除了在同一個執行緒上傳播訊號之外,什麼也不做。
  • 在資料路徑上,傳遞給 subscribe(...) 的 lambda 會在接收到資料訊號的執行緒中執行,因此 lambda 會列印 hello delayed parallel-1

結論

在本文中,我們了解了 Scheduler 抽象化以及它如何實現進階用法,例如 VirtualTimeScheduler

然後,我們學習了如何在反應式序列的中間切換執行緒(或更確切地說是 Scheduler worker),以及 publishOnsubscribeOn 之間的區別。

在下一篇文章中,我們將更深入地研究程式庫的內部結構,以描述為確保 Reactor 的效能而進行的一些最佳化。

同時,祝您反應式編碼愉快!

取得 Spring 電子報

隨時關注 Spring 電子報

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部