領先一步
VMware 提供培訓和認證,以加速您的進展。
深入瞭解本月稍早,我們發布了 Reactor 2020.0 的第一個里程碑。此週期代號為 Europium
,接續 Dysprosium 週期(包含 reactor-core 3.3.x 和 reactor-netty 0.9.x)。
它包含 reactor-core 3.4.0
和 reactor-netty 1.0.0
。
在這篇部落格文章中,我們將介紹 reactor-core 里程碑的一些重點,並簡要提及 M2 的內容。
關於 reactor-netty,我們將在此處連結到另一篇部落格文章,一旦發布就會提供。
另請注意,已採用新的版本控制方案,該方案已在整個 Spring 系列產品中採用:請參閱參考指南和這篇部落格文章。
Processor
的變更核心中的主要變更是長期以來針對 Reactor 中 Processor
實作及其公開方式的努力。
這是 reactor-core 3.4.0-M1
的主要重點,目標是逐步淘汰 FluxProcessor
(以及在某種程度上 MonoProcessor
)的具體形式的使用。
Processor
是一個來自 Reactive Streams 的介面,最初旨在表示反應式管線中的「步驟」,可以在程式庫之間共享。但如今,運算子主要直接實作為 Publisher/Subscriber
對,因此在 Reactor 中,處理器最終涵蓋了不同的用例(最常見的是,從一個 Publisher
多播到多個 Subscriber
)。
因此,大多數時候,使用者將處理器視為「手動建立 Flux
」的一種方式:他們不是將 Processor
連接到父發布者(也就是將其用作 Subscriber
),而是直接呼叫其 onNext
/onComplete
/onError
方法。不幸的是,這是一種有問題的方法,因為此類呼叫必須以符合 Reactive Streams 規範的方式進行,這意味著它們需要外部同步。
從歷史上看,這已透過在 FluxProcessor
上引入 sink()
方法來緩解。其想法是,如果您想以這種手動方式使用 FluxProcessor
,您需要實例化您想要的處理器形式,然後呼叫其 sink()
方法一次,並從此處使用產生的 FluxSink
來觸發訊號給訂閱者。在下游,FluxProcessor
本身會被公開(作為可以組合運算子的 Flux
)。
從可發現性的角度來看,這仍然是有問題的,因為滿足最常見用例的「正確方法」是最難想出來的。
在 3.4.0 中,我們打算扭轉局面,將 Sink
使用模式置於聚光燈下,作為第一等公民,並使 Processor
使用模式更難以意外發現或誤用。
這個第一個里程碑朝著這個方向邁出了第一步:
FluxProcessor
的具體實作,這些實作現在預定在 3.5.0
中移除Sinks
實用程式類別,其中包含旨在手動觸發的 sink 的工廠方法在 M1 中,處理器的形式仍然存在,但工廠方法已複製到 Processors
類別,但這已在 M2 中重新設計。我們打算在 M2 中將形式的選擇移到 Sinks
上。屆時將有一種方法可以從那裡將 Sink
轉換為 FluxProcessor
,從而消除 M2 中對 Processors
的需求。
在 M1
中,所有具體 xxxProcessor
(例如 UnicastProcessor.create()
)上的工廠方法都已移動到 Processors
以處理基本情況,或移動到 Processors.more()
以處理允許更精細調整的重載。這些方法透過前綴區分形式
UnicastProcessor
-> Processors.unicast()
和 Processors.more().unicast(...)
EmitterProcessor
-> Processors.multicast()
和 Processors.more().multicast(...)
DirectProcessor
-> Processors.more().multicastNoBackpressure()
ReplayProcessor
-> Processors.replayAll()
/replay(int)
/replayTimeout(Duration)
/replaySizeAndTimeout(int, Duration)
以及 Processors.more()
上的類似方法所有這些處理器在概念上都具有相同的輸入和輸出類型 <T>
,因此它們是 FluxProcessor<T,T>
。M1 中引入了一個方便的介面 FluxIdentityProcessor<T>
,但除了減少泛型的數量之外,它沒有帶來太多好處,因此它可能會在 M2 中移除。
但是,我們說應該優先使用 Sinks
而不是從 Processors
使用 FluxProcessor
。在這種情況下,人們首先會獲得一個 sink,並將其轉換為 Flux
或 Mono
,以便應用程式的其餘部分在其上組合,如下例所示
//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();
//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();
//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();
請注意,該類別目前提供的變體少於 Processors
,但這正在為 M2 重新考慮。
3.3.0 版中已棄用的幾個類別已被移除
TopicProcessor
WorkQueueProcessor
Schedulers.boundedElastic()
自 3.3.0 版以來已推出,我們認為現在可以棄用其祖先 elastic()
,而不是僅僅建議使用 boundedElastic 而不是 elastic。
在 3.5.0 版的更遠期程中,elastic
Scheduler
將被移除。
這裡有很多內容要介紹,我們將在另一篇部落格文章中進行介紹。
請試用 M1!
我們已經在 M2 中對 sinks 和 processors 進行進一步的變更,以及其他主題,例如 Context
運算子、避免在 subscribe
中拋出例外,以及改進關於指標的說明。
一如既往,非常歡迎針對 M1 和目前的 M2 快照提供意見回饋。
同時,祝您反應式編碼愉快! Reactor 團隊。