reactor-core 3.6.0 版本有哪些新功能?

工程 | Oleh Dokuka | 2023 年 10 月 31 日 | ...

Reactor 3.6.0 即將推出,並將於 11 月 14 日 GA。這篇部落格文章描述了即將發佈的版本中包含的新功能!

虛擬執行緒支援

今天,每個人都在談論 Java 21Project Loom。Project Reactor 團隊聽到了這些聲音,並在我們的生態系統中看到了該專案的價值。在這個即將發佈的版本中,我們引入了對 VirtualThread 實作的支援。

為何它很方便?

讓我們考慮以下程式碼範例

package io.projectreactor.samples;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class LoomSample {

   public static void main(String[] args) {
      Flux.using(
                () -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
                Flux::fromStream,
                Stream::close
           )
          .subscribeOn(Schedulers.boundedElastic())
          .map(v -> Thread.currentThread() + " " + v)
          .log()
          .blockLast();
   }
}

上面的程式碼以反應式方式從文字檔中讀取所有行。不幸的是,Files.lines 方法是一個已知的系統 I/O 呼叫,會造成阻塞。因此,我們將所有這些操作排程在共用的 Schedulers.boundedElastic() 排程器上。Schedulers.boundedElastic() 是主要的共用排程器,用於 卸載系統中可能執行的所有阻塞呼叫,這已不是秘密。它被用於簡單的 HTTP 阻塞呼叫,以及用於包裝一些不可避免的阻塞系統互動,例如產生隨機 UUID。但是,它預設使用 平台 Thread 實例,這可能會增加系統的競爭。

現在,透過 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 實作可以取代預設的實作,以使用虛擬執行緒而不是平台執行緒搭配 Schedulers.boundedElastic()。您只需要在 Java 21+ 上執行您的應用程式,並設定 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系統屬性即可。

Reactive Bounded Elastic on VirtualThreads

您可能已經注意到,您將擁有一個 VirtualThread 實例來執行排程的工作。

更佳的自動內容傳播

您可能從我們之前的部落格聽說過,從 Reactor 3.5.0 開始,我們引入了一種機制,用於從 Reactor Context 在諸如 handletap 等運算子中自動還原 ThreadLocal。稍後,在 reactor 3.5.3 中,我們在 Project Reactor 中可用的整組運算子中新增了自動還原 ThreadLocal 值的功能。

static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1> 
}

public static void main(String[] args) {
   logger.info("Setting Trace ID test-123-567-890");
   TRACE_ID.set("test-123-567-890"); <1>

   Hooks.enableAutomaticContextPropagation(); <2> 

   Mono.fromCallable(() -> {
          logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
          return UUID.randomUUID();
       })
       .subscribeOn(Schedulers.boundedElastic()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
       .block();
}

上面的程式碼產生一個隨機 UUID,它將 <3> 阻塞產生程序卸載到專用的工作執行緒。為了啟用自動 ThreadLocal 傳播魔法,您需要讓 Micrometer Context Propagation 程式庫在執行階段可用,註冊 <1> 必要的 ThreadLocal 實例,然後呼叫 Hooks API <2> 以啟用這種特定的 傳播模式。如果我們檢查上面的程式碼輸出,我們會看到指定的 <1> TRACE_ID ThreadLocal 在所有位置 <3> <4> 都一致可用,無論 Thread 切換與否。

[ INFO] (main) Setting Trace ID test-123-567-890 <1> 
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>

  1. Thread main 上設定的追蹤 ID
  2. 相同的追蹤 ID 在 Thread boundedElastic-1 上可用

雖然這種機制非常接近每個人想要的功能,但它受到 Reactor 擁有的生產者和轉換器的限制。為了了解它可能無法完美運作的地方,讓我們修改上面的範例,並新增與外部基於 Reactive Streams 的程式庫(例如 JDK11 HttpClient)的整合。

static HttpClient jdkHttpClient = HttpClient.newHttpClient();

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}

public static void main(String[] args) {
   logger.info("Setting Trace ID");
   TRACE_ID.set("test-123-567-890");

   Hooks.enableAutomaticContextPropagation();

   Mono.fromFuture(() -> {
          logger.info("[" + TRACE_ID.get() + "] Preparing request");
          return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
                                             .uri(URI.create("https://httpbin.org/drip"))
                                             .GET()
                                             .build(),
                HttpResponse.BodyHandlers.ofPublisher());
       })
       .flatMapMany(r -> {
          logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
          return FlowAdapters.toPublisher(r.body()); <2>
       })
       .collect(new ByteBufferToStringCollector()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
       .block();
}

在修改後的範例中,我們進行網路呼叫 <1>,然後讀回回應。回應本文表示為 Flow.Publisher <2>,我們將其展平並轉換為字串表示 <3>。一旦此程式碼執行,可能的輸出之一可能如下所示

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>

我們可以從輸出中觀察到,使用 reactor 3.5.3+,使用外部 Publisher 可能會導致內容遺失 <1>,因為我們不知道是否需要進行額外的提升來還原遺失的 ThreadLocal 實例。

使用 reactor 3.6.x,此輸出始終一致

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>

在此版本中,我們強化了 ThreadLocal 值的還原機制,並新增了額外的邏輯來偵測任何外部 Publisher 實作。一旦偵測到這些實作,我們會裝飾它們,以確保您在我們的管道中操作時永遠不會遺失 ThreadLocal 值。

還有什麼?多版本 Jar 支援!

在 reactor 3.6.x 中,我們擁抱了 多版本 jar (MRJ) 支援,並且已經新增了 改進,盡可能消除反射。我們 計劃 擴展 MRJ 的使用,並在即將發佈的版本中使用所有 JDK9+ 功能!

敬請期待!所有來源都可以在 Github 上找到

取得 Spring 電子報

隨時掌握 Spring 電子報的最新資訊

訂閱

領先一步

VMware 提供訓練和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部