Reactor 2020.0 (代號 Europium) 的第一個里程碑

工程 | Simon Baslé | 2020 年 7 月 10 日 | ...

本月稍早,我們發布了 Reactor 2020.0 的第一個里程碑。此週期代號為 Europium,接續 Dysprosium 週期(包含 reactor-core 3.3.x 和 reactor-netty 0.9.x)。

它包含 reactor-core 3.4.0 和 reactor-netty 1.0.0

在這篇部落格文章中,我們將介紹 reactor-core 里程碑的一些重點,並簡要提及 M2 的內容。

關於 reactor-netty,我們將在此處連結到另一篇部落格文章,一旦發布就會提供。

另請注意,已採用新的版本控制方案,該方案已在整個 Spring 系列產品中採用:請參閱參考指南這篇部落格文章

關於 Processor 的變更

核心中的主要變更是長期以來針對 Reactor 中 Processor 實作及其公開方式的努力。

這是 reactor-core 3.4.0-M1 的主要重點,目標是逐步淘汰 FluxProcessor(以及在某種程度上 MonoProcessor)的具體形式的使用。

Processor 是一個來自 Reactive Streams 的介面,最初旨在表示反應式管線中的「步驟」,可以在程式庫之間共享。但如今,運算子主要直接實作為 Publisher/Subscriber 對,因此在 Reactor 中,處理器最終涵蓋了不同的用例(最常見的是,從一個 Publisher 多播到多個 Subscriber)。

因此,大多數時候,使用者將處理器視為「手動建立 Flux」的一種方式:他們不是將 Processor 連接到父發布者(也就是將其用作 Subscriber),而是直接呼叫其 onNext/onComplete/onError 方法。不幸的是,這是一種有問題的方法,因為此類呼叫必須以符合 Reactive Streams 規範的方式進行,這意味著它們需要外部同步。

從歷史上看,這已透過在 FluxProcessor 上引入 sink() 方法來緩解。其想法是,如果您想以這種手動方式使用 FluxProcessor,您需要實例化您想要的處理器形式,然後呼叫其 sink() 方法一次,並從此處使用產生的 FluxSink 來觸發訊號給訂閱者。在下游,FluxProcessor 本身會被公開(作為可以組合運算子的 Flux)。

從可發現性的角度來看,這仍然是有問題的,因為滿足最常見用例的「正確方法」是最難想出來的。

在 3.4.0 中,我們打算扭轉局面,將 Sink 使用模式置於聚光燈下,作為第一等公民,並使 Processor 使用模式更難以意外發現或誤用。

這個第一個里程碑朝著這個方向邁出了第一步:

  • 棄用所有 FluxProcessor 的具體實作,這些實作現在預定在 3.5.0 中移除
  • 公開一個 Sinks 實用程式類別,其中包含旨在手動觸發的 sink 的工廠方法

在 M1 中,處理器的形式仍然存在,但工廠方法已複製到 Processors 類別,但這已在 M2 中重新設計。我們打算在 M2 中將形式的選擇移到 Sinks 上。屆時將有一種方法可以從那裡將 Sink 轉換為 FluxProcessor,從而消除 M2 中對 Processors 的需求。

在 M1 中遷移遠離具體處理器

M1 中,所有具體 xxxProcessor(例如 UnicastProcessor.create())上的工廠方法都已移動到 Processors 以處理基本情況,或移動到 Processors.more() 以處理允許更精細調整的重載。這些方法透過前綴區分形式

  • UnicastProcessor -> Processors.unicast()Processors.more().unicast(...)
  • EmitterProcessor -> Processors.multicast()Processors.more().multicast(...)
  • DirectProcessor -> Processors.more().multicastNoBackpressure()
  • ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及 Processors.more() 上的類似方法

所有這些處理器在概念上都具有相同的輸入和輸出類型 <T>,因此它們是 FluxProcessor<T,T>。M1 中引入了一個方便的介面 FluxIdentityProcessor<T>,但除了減少泛型的數量之外,它沒有帶來太多好處,因此它可能會在 M2 中移除。

但是,我們說應該優先使用 Sinks 而不是從 Processors 使用 FluxProcessor。在這種情況下,人們首先會獲得一個 sink,並將其轉換為 FluxMono,以便應用程式的其餘部分在其上組合,如下例所示

//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();

//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();

//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();

請注意,該類別目前提供的變體少於 Processors,但這正在為 M2 重新考慮。

棄用和移除

3.3.0 版中已棄用的幾個類別已被移除

  • TopicProcessor
  • WorkQueueProcessor

Schedulers.boundedElastic() 自 3.3.0 版以來已推出,我們認為現在可以棄用其祖先 elastic(),而不是僅僅建議使用 boundedElastic 而不是 elastic。

在 3.5.0 版的更遠期程中,elastic Scheduler 將被移除。

Reactor-Netty 即將達到 1.0

這裡有很多內容要介紹,我們將在另一篇部落格文章中進行介紹。

結論

請試用 M1

我們已經在 M2 中對 sinks 和 processors 進行進一步的變更,以及其他主題,例如 Context 運算子、避免在 subscribe 中拋出例外,以及改進關於指標的說明。

一如既往,非常歡迎針對 M1 和目前的 M2 快照提供意見回饋。

同時,祝您反應式編碼愉快! Reactor 團隊。

取得 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

深入瞭解

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

深入瞭解

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

檢視全部