領先一步
VMware 提供訓練和認證,以加速您的進展。
了解更多Reactor 3.6.0 即將推出,並將於 11 月 14 日 GA。這篇部落格文章描述了即將發佈的版本中包含的新功能!
今天,每個人都在談論 Java 21 和 Project Loom。Project Reactor 團隊聽到了這些聲音,並在我們的生態系統中看到了該專案的價值。在這個即將發佈的版本中,我們引入了對 VirtualThread
實作的支援。
讓我們考慮以下程式碼範例
package io.projectreactor.samples;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class LoomSample {
public static void main(String[] args) {
Flux.using(
() -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
Flux::fromStream,
Stream::close
)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> Thread.currentThread() + " " + v)
.log()
.blockLast();
}
}
上面的程式碼以反應式方式從文字檔中讀取所有行。不幸的是,Files.lines
方法是一個已知的系統 I/O 呼叫,會造成阻塞。因此,我們將所有這些操作排程在共用的 Schedulers.boundedElastic()
排程器上。Schedulers.boundedElastic()
是主要的共用排程器,用於 卸載系統中可能執行的所有阻塞呼叫,這已不是秘密。它被用於簡單的 HTTP 阻塞呼叫,以及用於包裝一些不可避免的阻塞系統互動,例如產生隨機 UUID
。但是,它預設使用 平台 Thread
實例,這可能會增加系統的競爭。
現在,透過 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler
實作可以取代預設的實作,以使用虛擬執行緒而不是平台執行緒搭配 Schedulers.boundedElastic()
。您只需要在 Java 21+ 上執行您的應用程式,並設定 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true
系統屬性即可。
您可能已經注意到,您將擁有一個 VirtualThread
實例來執行排程的工作。
您可能從我們之前的部落格聽說過,從 Reactor 3.5.0 開始,我們引入了一種機制,用於從 Reactor Context
在諸如 handle
和 tap
等運算子中自動還原 ThreadLocal
。稍後,在 reactor 3.5.3 中,我們在 Project Reactor 中可用的整組運算子中新增了自動還原 ThreadLocal
值的功能。
static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1>
}
public static void main(String[] args) {
logger.info("Setting Trace ID test-123-567-890");
TRACE_ID.set("test-123-567-890"); <1>
Hooks.enableAutomaticContextPropagation(); <2>
Mono.fromCallable(() -> {
logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
return UUID.randomUUID();
})
.subscribeOn(Schedulers.boundedElastic()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
.block();
}
上面的程式碼產生一個隨機 UUID
,它將 <3> 阻塞產生程序卸載到專用的工作執行緒。為了啟用自動 ThreadLocal
傳播魔法,您需要讓 Micrometer Context Propagation 程式庫在執行階段可用,註冊 <1> 必要的 ThreadLocal
實例,然後呼叫 Hooks
API <2> 以啟用這種特定的 傳播模式。如果我們檢查上面的程式碼輸出,我們會看到指定的 <1> TRACE_ID
ThreadLocal
在所有位置 <3> <4> 都一致可用,無論 Thread
切換與否。
[ INFO] (main) Setting Trace ID test-123-567-890 <1>
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>
Thread main
上設定的追蹤 IDThread boundedElastic-1
上可用雖然這種機制非常接近每個人想要的功能,但它受到 Reactor 擁有的生產者和轉換器的限制。為了了解它可能無法完美運作的地方,讓我們修改上面的範例,並新增與外部基於 Reactive Streams 的程式庫(例如 JDK11 HttpClient
)的整合。
static HttpClient jdkHttpClient = HttpClient.newHttpClient();
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}
public static void main(String[] args) {
logger.info("Setting Trace ID");
TRACE_ID.set("test-123-567-890");
Hooks.enableAutomaticContextPropagation();
Mono.fromFuture(() -> {
logger.info("[" + TRACE_ID.get() + "] Preparing request");
return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
.uri(URI.create("https://httpbin.org/drip"))
.GET()
.build(),
HttpResponse.BodyHandlers.ofPublisher());
})
.flatMapMany(r -> {
logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
return FlowAdapters.toPublisher(r.body()); <2>
})
.collect(new ByteBufferToStringCollector()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
.block();
}
在修改後的範例中,我們進行網路呼叫 <1>,然後讀回回應。回應本文表示為 Flow.Publisher
<2>,我們將其展平並轉換為字串表示 <3>。一旦此程式碼執行,可能的輸出之一可能如下所示
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>
我們可以從輸出中觀察到,使用 reactor 3.5.3+,使用外部 Publisher
可能會導致內容遺失 <1>,因為我們不知道是否需要進行額外的提升來還原遺失的 ThreadLocal
實例。
使用 reactor 3.6.x,此輸出始終一致
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>
在此版本中,我們強化了 ThreadLocal
值的還原機制,並新增了額外的邏輯來偵測任何外部 Publisher
實作。一旦偵測到這些實作,我們會裝飾它們,以確保您在我們的管道中操作時永遠不會遺失 ThreadLocal
值。
在 reactor 3.6.x 中,我們擁抱了 多版本 jar (MRJ) 支援,並且已經新增了 改進,盡可能消除反射。我們 計劃 擴展 MRJ 的使用,並在即將發佈的版本中使用所有 JDK9+ 功能!
敬請期待!所有來源都可以在 Github 上找到