領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多這篇部落格文章是系列文章中的第三篇,旨在深入探討 Reactor 更進階的概念和內部運作。
在這篇文章中,我們將探討執行緒模型、某些(大多數)運算子如何與並行無關、Scheduler
抽象化,以及如何使用 publishOn
等運算子在序列中途從一個執行緒跳到另一個執行緒。
本系列源自 Flight of the Flux
演講,我發現其內容更適合部落格文章的形式。
下表將在其他文章發佈時更新連結,但以下是計劃內容
如果您錯過了 Reactive Streams 和 Reactor 基本概念的介紹,請前往網站的 學習區塊 和 參考指南。
事不宜遲,讓我們開始吧
Reactor 運算子通常是與並行無關的:它們不強制特定的執行緒模型,而只是在調用其 onNext
方法的 Thread
上執行。
正如我們在本系列的第一篇文章中看到的,執行訂閱調用的 Thread
也會產生影響:subscribe
調用會被鏈接起來,直到到達產生資料的 Publisher
(運算子鏈的最左側部分),然後這個 Publisher
通過 onSubscribe
提供一個 Subscription
,依序向下傳遞到鏈中,被請求等等... 再次強調,預設情況下,這個資料生產過程會在啟動訂閱的 Thread
上開始。
對於此情況有一個普遍的例外:處理時間概念的運算子。任何此類運算子都將預設在 Schedulers.parallel()
排程器上運行計時器/延遲/等等...
還有一些其他的例外情況,它們也在這個 parallel()
Scheduler
上運行。它們可以通過至少有一個接受 Scheduler
參數的重載來識別。
但是 Scheduler
是什麼?我們為什麼需要它?
Scheduler
抽象化在 Reactor 中,Scheduler
是一個抽象概念,它讓使用者可以控制執行緒。Scheduler
可以產生 Worker
,Worker
在概念上是 Threads
,但不一定由 Thread
支援(我們稍後會看到一個例子)。Scheduler
也包含時鐘的概念,而 Worker
純粹是關於排程任務。
interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
long now(TimeUnit unit);
Worker createWorker();
interface Worker extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
Reactor 提供了幾個預設的 Scheduler
實作,每個實作都有其關於如何管理 Workers
的特性。它們可以通過 Schedulers
工廠方法實例化。以下是它們典型用法的經驗法則
Scheduler
但您不想更改執行緒時,Schedulers.immediate()
可以用作空物件Schedulers.single()
適用於可以在唯一的 ExecutorService
上運行的單次性任務Schedulers.parallel()
適用於 CPU 密集但生命週期短的任務。它可以並行執行 N
個此類任務(預設情況下 N == CPU 數量
)Schedulers.elastic()
和 Schedulers.boundedElastic()
適用於生命週期更長的任務(例如,阻塞 IO 任務)。elastic
會按需產生執行緒,沒有限制,而最近引入的 boundedElastic
則以創建的執行緒數量為上限執行相同的操作。每種 Scheduler
類型都有一個由上述方法返回的預設全域實例,但可以使用 Schedulers.new***
工廠方法創建新實例(例如,Schedulers.newParallel("myParallel", 10))
來創建一個自訂的並行 Scheduler
,其中 N
= 10
)。
parallel
類型由 N
個 worker 支援,每個 worker 都基於 ScheduledExecutorService
。如果您向其提交 N
個長時間運行的任務,則無法執行更多工作,因此它適用於生命週期短的任務。
elastic
類型也由基於 ScheduledExecutorService
的 worker 支援,但它會按需創建這些 worker 並將它們池化。不再使用的 Worker
在 dispose()
時返回池中,並將在此處保留配置的 TTL 持續時間,以便新的傳入任務可以重複使用閒置的 worker。但是,如果沒有可用的閒置 Worker
,它會繼續創建新的 worker。
boundedElastic
類型在概念上與 elastic
類型非常相似,只是它對它創建的基於 ScheduledExecutorService
的 Worker
數量設置了上限。超過此點後,其 createWorker()
方法會返回一個 facade Worker
,它將對任務進行排隊,而不是立即提交它們。一旦具體的 Worker
變得可用,它就會與 facade 交換,並開始實際提交任務(使其表現得好像您只是剛提交了任務,包括延遲的任務)。此外,可以對 Scheduler
實例的所有 facade worker 可以排隊的延遲任務總數設置上限。
正如我們上面所說,不是的。我們實際上已經看到了一個例子:immediate() Scheduler
。這個排程器不會修改程式碼運行的 Thread
。
但在 reactor-test
程式庫中有一個更有用的例子:VirtualTimeScheduler
。這個 Scheduler
在目前的 Thread
上執行,但會將提交給它的所有任務都蓋上它們應該運行的時間戳記。
然後它管理一個虛擬時鐘(感謝 Scheduler
也具有時鐘的職責),可以手動推進。這樣做時,將會執行排隊在新的虛擬時間戳記之前或在新的虛擬時間戳記執行的任務。
這在測試場景中非常有用,在這些場景中,您有一個具有長時間間隔/延遲的 Flux
或 Mono
,並且您想要測試邏輯而不是時序。例如,類似於 Mono.delay(Duration.ofHours(4))
的東西可以在 100ms
以下的時間內運行...
人們也可以想像圍繞 Actor 系統、ForkJoinPool
、即將到來的 Loom fibers 等實作 Scheduler
...
關於主
Thread
通常,人們會詢問關於在
Scheduler
的執行緒和主執行緒之間來回切換的問題。從主執行緒切換到排程器顯然是可能的,但是從任意執行緒切換到主執行緒是不可能的。這完全是 Java 的限制,因為沒有辦法將任務提交到主執行緒(例如,沒有 MainThreadExecutorService)。
既然我們熟悉了 Reactor 中執行緒的建構區塊,讓我們看看這如何在運算子的世界中轉化。
我們已經確定,大多數運算子會繼續在發出訊號的 Thread
上工作,除了基於時間的運算子(例如 Mono.delay
、bufferTimeout()
等...)。
Reactor 的哲學是通過組合運算子來為您提供執行正確操作的工具。執行緒也不例外:認識一下 subscribeOn
和 publishOn
。
這兩個運算子只是簡單地取得一個 Scheduler
,並將在該排程器的其中一個 Worker
上切換執行。當然,兩者之間存在重大差異:)
publishOn(Scheduler s)
運算子當您想要跳躍執行緒時,這是您需要的基本運算子。來自其來源的傳入訊號會在給定的 Scheduler
上發佈,有效地將執行緒切換到該排程器的其中一個 worker。
這對於 onNext
、onComplete
和 onError
訊號有效。也就是說,從上游來源流向下游訂閱者的訊號。
因此,本質上,出現在此運算子之下的每個處理步驟都將在新的 Scheduler
s
上執行,直到另一個運算子再次切換(例如,另一個 publishOn
)。
讓我們舉一個刻意粗略的阻塞調用範例。但請記住,反應式鏈中的阻塞調用始終是粗略的!:)
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
在上面的範例中,假設此程式碼在主執行緒上執行,則每個 Flux.fromIterable
都會在同一個 Thread
上發出其 List
的內容。然後,我們在 map
內部使用命令式阻塞 Web 用戶端來提取每個 url
的主體,這會「繼承」該執行緒(因此會阻塞它)。因此,每個 subscribe
中的資料消耗 lambda 也會在主執行緒上運行。
因此,所有這些 url 都會在主執行緒上循序處理
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
如果我們引入 publishOn
,我們可以使此程式碼更高效,以便 Flux
不會互相阻塞
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
這可能會為我們提供類似於以下輸出的結果
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
現在第一個列表和第二個列表交錯了,太棒了!
subscribeOn(Scheduler s)
運算子在前面的範例中,我們看到了如何使用 publishOn
通過在提供的 Scheduler
上切換該阻塞工作的觸發器(要提取的 url)的發佈,從而在單獨的 Thread 上偏移阻塞工作。
由於 map
運算子在其來源執行緒上運行,因此通過在 map
之前放置 publishOn
來切換該來源執行緒可以按預期工作。
但是,如果該 url 提取方法是由其他人編寫的,並且他們遺憾地忘記添加 publishOn
呢?有沒有辦法影響上游的 Thread
?
在某種程度上,是有的。這就是 subscribeOn
可以派上用場的地方。
此運算子會更改執行 subscribe
方法的位置。由於 subscribe 訊號向上流動,因此它會直接影響來源 Flux
訂閱和開始產生資料的位置。
因此,它可以看起來像是在向上和向下作用於運算子的反應式鏈的部分(只要混合中沒有拋入 publishOn
)
//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url)); //oops!
}
//your code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
就像我們的第二個 publishOn
範例一樣,該程式碼將正確輸出類似於以下的內容
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
那麼發生了什麼事?
subscribe
調用仍然在主執行緒上運行,但它們將 subscribe
訊號傳播到其來源 subscribeOn
。反過來,subscribeOn
會將相同的訊號從 fetchUrls
傳播到其自身的來源,但會在 boundedElastic Worker
上。
在 fetchUrls
返回的 Flux
序列中,map 會在 boundedElastic worker 執行緒上訂閱,range
也是如此。range
開始產生資料,仍然在 boundedElastic worker 執行緒上。
這會沿著資料路徑繼續向下,每個訂閱者都在其來源執行緒(即 boundedElastic
執行緒)上執行 onNext
。
最後,在 subscribe(...)
調用中配置的 lambda 也會在 boundedElastic
執行緒上執行。
重要
重要的是要區分訂閱的動作和傳遞給
subscribe()
方法的 lambda。此方法會訂閱其來源Flux
,但 lambda 會在處理結束時執行,當資料已流經所有步驟(包括跳躍到另一個執行緒的步驟)時。因此,執行 lambda 的
Thread
可能與訂閱Thread
不同,即調用subscribe
方法的執行緒。
如果我們是 fetchUrls
程式庫的作者,我們可以通過利用稍微不同的方式使用 subscribeOn
,讓每次提取都在其自己的 Worker
上運行,從而使程式碼更加高效
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
subscribeOn
將在整個訂閱階段(從下到上)以及資料路徑上起作用,直到遇到 publishOn
(或基於時間的運算子)。
讓我們考慮以下範例
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
這將印出
just elastic-1
publish boundedElastic-1
hello delayed parallel-1
我們應該逐步解開發生的事情
subscribe
在主執行緒上調用,但由於正上方的 subscribeOn
,訂閱會迅速切換到 elastic
排程器。elastic
上訂閱,從下到上。just
在 elastic
排程器上發出其值。doOnNext
在同一個執行緒上接收該值並將其列印出來:just elastic-1
publishOn
:來自 doOnNext
的資料在 boundedElastic
排程器上向下游傳播。doOnNext
在 boundedElastic
上接收其資料,並相應地列印 publish bounderElastic-1
。delayElements
是一個時間運算子,因此預設情況下它會在 Schedulers.parallel()
排程器上發佈資料。subscribeOn
除了在同一個執行緒上傳播訊號之外,什麼也不做。subscribe(...)
的 lambda 會在接收到資料訊號的執行緒中執行,因此 lambda 會列印 hello delayed parallel-1
在本文中,我們了解了 Scheduler
抽象化以及它如何實現進階用法,例如 VirtualTimeScheduler
。
然後,我們學習了如何在反應式序列的中間切換執行緒(或更確切地說是 Scheduler
worker),以及 publishOn
和 subscribeOn
之間的區別。
在下一篇文章中,我們將更深入地研究程式庫的內部結構,以描述為確保 Reactor 的效能而進行的一些最佳化。
同時,祝您反應式編碼愉快!