Reactor-Core 3.2 里程碑 1 的搶先看

發佈 | Simon Baslé | 2018 年 1 月 16 日 | ...

各位 Reactive Spring 社群的朋友們,您好!

雖然團隊仍在努力開發 3.1 版本,但我們也希望讓社群搶先了解未來 3.2 版本的內容。

特別是,3.2.0.RELEASE 版本中最大的亮點是新增了我們一直以來稱為「錯誤模式」、「繼續模式」,或最近更正式的「錯誤策略」的功能。

概念

概念其實很簡單:如果可以從運算子內執行的使用者程式碼中的例外狀況中恢復,並允許序列繼續執行,會怎麼樣呢?

讓我們舉個例子,假設您有以下方法

public Flux<Integer> divide100By(Flux<Integer> dividers) {
  return dividers.map(div -> 100 / div);
}

萬一 dividers 來源在某個時間點發出 0,產生的 Flux 會立即終止,並發出 ArithmeticExceptiononError 訊號。

如果來源剛好是,例如 `Flux.range(0, 10)`,那麼仍然有 9 個完全有效的值可以被映射。

您要如何做才能讓這種暫時性的例外狀況(而且只有這種例外狀況)被忽略,並讓後續有效的值有機會被處理呢?

目前的變通方案

在 Reactor 3.1 中,有一個變通方案,您可以透過使用 `flatMap` 為每個元素建立內部序列來應用,進而將錯誤恢復運算子應用於這些細微的序列。

public Flux<Integer> divide100By(Flux<Integer> dividers) {
  return dividers.flatMap(div -> (1)
      Mono.just(100 / div) (2)
          .doOnError(e -> { (3)
              if (e instanceof ArithmeticException) process(e); (4)
          })
          .onErrorResume(ArithmeticException.class, e -> Mono.empty()) (5)
      );
}
  1. 我們使用 `flatMap` 而不是 `map`,為每個值產生一個小的內部 `Mono`。

  2. 這個 `Mono` 基本上就是舊的 `map` 運算…​

  3. …​並加入了錯誤恢復。

  4. 首先,我們要確保在「恢復」之前處理(例如記錄)`ArithmeticException`(而且僅限於這些)。

  5. 然後我們使用 `onErrorResume` 和 `Mono.empty()` 來有效地忽略結果序列中的例外狀況。

這樣做是可行的,但寫起來有點麻煩(雖然 `compose` 和 `transform` 可以幫助共用這類程式碼)。而且,我們也從單一的 `map` 運算子轉變為具有多個內部運算子的 `flatMap`。

由於需要協調多個來源,`flatMap` 比 `map` 有更多的 overhead。即使像是運算子融合之類的東西減少了這種成本,但它仍然存在。

介紹錯誤策略

如果我們想要進一步減少這種處理的 overhead,困難之處在於我們現在必須在每個運算子的實作層級上工作。

鏈中的每個運算子都必須以某種方式知道,例外狀況應該被捕獲,但不應透過 `onError` 傳播,而是以不同的方式處理。這是一個相當大的改變,而且是橫向的!

請注意這在概念上聽起來像是篩選器,但用於錯誤。 就像篩選器一樣,這意味著在其中一個 `onNext` 拋出例外狀況後繼續處理其來源的運算子,也應該從其來源請求至少一個以上的元素。

即使它可以隔離到特殊的執行路徑,對於運算子來說,這仍然是一個複雜的核心實作變更。

然後還有 API 的問題:作為建構子參數,或在 Flux 中使用「錯誤恢復」布林值的額外多載來實作,將會非常繁瑣… 我們真的需要將 Flux API 中的方法數量加倍才能支援該功能嗎?

幸運的是,不需要:自 3.1 以來,我們有了 Context,這是一種將此類資訊傳播到鏈中每個(Reactor)運算子的好方法。

這就是我們為錯誤策略功能採取的途徑

  • 僅針對特定運算子(`map`、`filter`、`flatMap`、`handle` 等等)新增支援。 這些運算子具有特殊的 javadoc 標籤來記錄這一點。

  • 此功能透過在其 Context 中放入特殊金鑰,針對給定的 Flux 啟用。

  • 每個受支援的運算子在其 `onNext` 實作中都有一個特殊的路徑,用於檢查該金鑰,如果找到該金鑰,將會改變其處理錯誤的方式。

  • 此功能透過 `errorStrategyContinue()` API 向使用者公開。

  • 它可以更細緻:可以篩選哪些例外狀況可以恢復,也可以為此類恢復的例外狀況設定自訂處理常式。

重要事項

務必記住一件事,由於此功能是透過 Context 啟用的,因此它遵循與 Context 相同的傳播規則。 例如,它將在 `flatMap` 中的內部序列上啟用。 如果不希望這樣,請在 `flatMap` 內使用 `errorStrategyStop()` 回到預設行為(這不會超出 `flatMap` 的範圍)。 它也會向後傳播,在 `errorStrategyXXX` 之前的運算子上啟用。

以下是我們先前的範例在使用錯誤策略後變成的樣子

public Flux<Integer> divide100By(Flux<Integer> dividers) {
  return dividers.map(div -> 100 / div) (1)
      .errorStrategyContinue(ArithmeticException.class, (2)
          (error, value) -> process(error)); (3)
}
  1. 回到簡單的 `map`

  2. 我們僅從 `ArithmeticException` 中恢復。

  3. 我們將此類例外狀況傳遞給我們的內部處理常式(請注意,我們也可以存取導致例外狀況的原始值,如果有的話)。

使用里程碑 1 進行測試

我們剛剛發佈了 3.2.0.M1 里程碑[1],其中主要包含錯誤策略功能,我們希望您能測試一下?

注意

這是一個如此橫向的變更,即使您不打算使用它,使用該 artifact 執行測試也很有價值,以驗證如果您沒有明確使用 `errorStrategyContinue()`,您應該不會看到任何行為上的改變(因為該功能包含在特定的執行路徑中)。

為了取得此里程碑,請將 repo.spring.io/milestone 儲存庫新增至您的 Maven 或 Gradle 建置組態,並提取 reactor-core 3.2.0.M1 artifact。

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.2.0.M1</version>
</dependency>

特別是對於該功能的 API,目前尚未定案。 因此,如果您有任何意見回饋,請透過在 GitHub 上開啟 issue 或在 Gitter 上討論該功能來告訴我們。

在此期間,祝您編碼愉快!


1. 附註:此里程碑發佈較早,當時 3.1.3.RELEASE 仍在開發中,因此請注意,它不包含 3.1.3 和後續 3.1.x 版本中的所有修正。

訂閱 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供訓練和認證,加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

檢視全部