領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多這篇文章是系列文章的一部分
在上一篇文章中,我們以 Spring Cloud Sleuth 的 MANUAL
上下文傳播策略既高效又能提供正確語義的想法作為結論。 根據眾多經驗,Spring、Micrometer 和 Reactor 團隊創建了一個新的上下文傳播函式庫。 其目標是封裝在 ThreadLocal
值和類似 Map 的結構之間傳輸上下文資料的考量。 Micrometer 1.10 和 Reactor 3.5 都以此為基礎,在 Reactor 和命令式程式碼之間提供一流的體驗。 透過使用 Reactor Context
,我們隱式地公開了 ThreadLocal
值,Micrometer 使用這些值來檢測追蹤函式庫,並填充 SLF4J 的 MDC
,以提供包含追蹤識別符的日誌。
在本文中,我們將採取與以往不同的方法。 我們不會從頭開始建構知識,而是從您可用的現成頂層 API 開始,然後解釋幕後發生的事情。 最後,您將能夠
讓我們回顧第一篇文章中的範例,其中我們展示了 delayElement
運算符如何導致響應式鏈失去關聯識別符。 讓我們回顧一下程式碼,從我們的操作開始
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty(); // Assume we’re actually storing the product
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true); // Assume we’re actually notifying the shop
}
接著我們需要回顧綁定請求處理程式
Mono<Void> handleRequest() {
initRequest(); <1>
log("Assembling the chain"); // <2>
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1)) // <3>
.flatMap(product ->
Flux.concat(
addProduct(product), // <4>
notifyShop(product)).then())
}
從 Reactor 3.5.0 開始,Reactor Context
能夠與 Micrometer 傘下的新函式庫 context-propagation
整合。 我們將在本篇文章的結尾更詳細地描述這種整合。 在 Reactor 3.5+ 中,當 context-propagation
函式庫位於類別路徑中時,我們可以期望當我們在 handle
運算符以及新的 tap
運算符中記錄日誌時,我們的 ThreadLocal
值會存在。
為了傳播我們的自訂 ThreadLocal
,我們需要註冊一個 ThreadLocalContextAccessor
ContextRegistry.getInstance()
.registerThreadLocalAccessor("CORRELATION_ID",
CORRELATION_ID::get,
CORRELATION_ID::set,
CORRELATION_ID::remove);
目前,context-propagation
函式庫的詳細資訊對於實現我們需要的內容並非必要。 我們唯一需要知道的是,我們使用了金鑰 CORRELATION_ID
,它將與 Reactor Context
一起使用,以在我們的特殊運算符中還原 ThreadLocal
。 讓我們修改其餘程式碼以使用它們並在專用位置記錄日誌。
我們只需要對請求處理程式進行一項修改
Mono<Void> handleRequest() {
initRequest(); // <1>
log("Assembling the chain");
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.contextCapture(); // <2>
}
我們所做的唯一修改是在我們傳回給呼叫者的鏈末端的 <2>
contextCapture
運算符。 此運算符的工作是捕獲目前的 ThreadLocal
值,ThreadLocalAccessor
實例已在 ContextRegistry
中註冊這些值,並將它們以相同的金鑰儲存在 Reactor Context
中。 在此特定實作中,我們唯一的希望是訂閱會在組裝階段之後立即發生,如 <1>
中所示,我們在其中設定 ThreadLocal
值。
接下來,我們將使用 tap
運算符新增日誌記錄
Mono<Void> addProduct(String productName) {
return Mono.<Void>empty()
.tap(() -> new DefaultSignalListener<>() {
@Override
public void doOnComplete() throws Throwable {
log("Adding product: " + productName);
}
});
}
在這裡,我們正在擴展 reactor-core
的 reactor.core.observability
套件中的 DefaultSignalListener
。 我們只對點擊完成訊號感興趣,我們在其中執行日誌操作。
對於 handle
運算符,我們將更改 notifyShop
方法
Mono<Boolean> notifyShop(String productName) {
return Mono.just(true)
.handle((result, sink) -> {
log("Notifying shop about: " + productName);
sink.next(result);
});
}
讓我們看看現在,當我們呼叫我們的處理程式時,是否會得到正確的輸出
handleRequest().block();
結果如下
[ main][ 643805344761543048] Assembling the chain
[parallel-1][ 643805344761543048] Adding product: test-product
[parallel-1][ 643805344761543048] Notifying shop about: test-product
太棒了! 事實上,這與 Spring Cloud Sleuth 的 MANUAL
策略方法相同,但已整合到 Reactor 的內部,因此您無需手動還原 ThreadLocal
值。 我們選擇 tap
和 handle
是因為這些運算符可以存取 Subscriber 綁定的 Context
,並允許對具體的 Reactive Streams 訊號採取行動。
請記住:Reactor Context
用於寫入,ThreadLocals
用於讀取。
事實上,我們的請求處理程式有點危險。 如果我們延遲訂閱行為,我們將遺失關聯識別符。 請考慮
Mono<Void> requestHandler = handleRequest(); // <1>
Thread subscriberThread = new Thread(requestHandler::block); // <2>
subscriberThread.start();
subscriberThread.join();
輸出如下
[ main][ 1388809065574796038] Assembling the chain
[parallel-1][ null] Adding product: test-product
[parallel-1][ null] Notifying shop about: test-product
組裝發生在 <1>
中,ThreadLocal
在 main
中設定。 然而,訂閱發生在 <2>
中的新 Thread
上,它沒有要捕獲的 ThreadLocal
值。 因此,我們的日誌沒有關聯識別符。 我們可以使用 Mono.defer()
包裝處理程式的主體來解決此問題。 然而,與其這樣做,不如考慮我們是否首先需要實際設定 ThreadLocal
。
在調用 Reactor 鏈的命令式應用程式中,例如調用 WebClient
的 Spring MVC 控制器方法,ThreadLocal
值已建立,contextCapture
將會選取它們並將它們儲存在 Context
中。
另一方面,在像 WebFlux 這樣的響應式堆疊中,直接使用 contextWrite
更有意義。
我們知道 Reactor 將使用其 Context
的內容來還原 ThreadLocal
值。 如果我們直接將預期值儲存在 Context
中,而不是從目前狀態捕獲它們,我們將稍微提高效能,但我們也將提高與函數式程式設計範式的相容性。 讓我們試試看
Mono<Void> handleRequest() {
// initRequest(); -- no write to ThreadLocal
log("Assembling the chain");
return Mono.just("test-product")
// <1>
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.contextWrite(
Context.of("CORRELATION_ID", correlationId())); // <2>
}
讓我們執行它
[ main][ null] Assembling the chain
[parallel-1][ 7059587638538899074] Adding product: test-product
[parallel-1][ 7059587638538899074] Notifying shop about: test-product
太棒了! 我們的實際響應式鏈包含正確的關聯識別符。
不幸的是,我們在組裝階段遺失了一個。 其中一個原因是日誌不是在 handle
或 tap
運算符內發生的。 如果我們在 <1>
中使用 tap
運算符新增初始日誌,我們就會沒事。 具有關聯識別符的 Context
綁定到來自 <2>
的鏈上游。 如果我們在 contextWrite
呼叫之後新增一個日誌記錄 tap
運算符,我們將看不到正確的關聯識別符 – 該階段附加的 Context
是另一個,並且沒有我們的識別符。 我們稍後會再回到這個問題,但首先,讓我們考慮一下我們是否可以簡化我們的程式碼並避免使用特殊的運算符。
當 reactor-core 3.5.0 發佈時,它被包含在 Spring Framework 6.0 和 Spring Boot 3.0 中。 使用 Spring Cloud Sleuth 進行追蹤的現有 Spring 使用者習慣於讓日誌填充 trace-id
和 span-id
值(類似於我們的關聯識別符)。 切換到新的範例,其中可觀察性是 Spring 產品核心套件的一部分,將需要現有的應用程式重寫其日誌記錄以使用 handle
和 tap
運算符。 我們繼續思考如何使更多運算符能夠還原 ThreadLocal
值。
正如我們在上一篇文章中看到的,還原可以跨越多個運算符的 ThreadLocal
值並非易事。 選擇 handle
和 tap
是因為它們不會讓 ThreadLocal
值洩漏。 沒有訊號因執行使用者程式碼而傳播。 當使用者程式碼執行時,ThreadLocal
值會存在。 然後捕獲結果。 最後,清除 ThreadLocal
上下文。 僅在此之後,訊號的響應式傳播才會發生到下游運算符。 此外,我們希望更具選擇性,因為在每個運算符中執行還原操作會產生大量額外負荷,如第 2 部分所述。
我們仔細地重新思考了一切,並提出了一個想法,可以將其組合到以下呼叫中(從 reactor-core 3.5.3 開始)
Hooks.enableAutomaticContextPropagation();
我們可以將其新增到應用程式的 main
方法中。
我們現在可以還原我們操作方法的初始實作
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty();
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true);
}
我們保持 handleRequest
方法和新 Thread
上的訂閱不變。 讓我們執行它
[ main][ null] Assembling the chain
[parallel-1][ 8362586195225273235] Adding product: test-product
[parallel-1][ 8362586195225273235] Notifying shop about: test-product
成功!
透過此功能,我們可以將使用 Spring Cloud Sleuth 的現有程式碼庫遷移到新的 Spring Framework,而無需對我們的日誌記錄方式進行任何更改。 透過上述掛鉤,如果您將 Spring Boot Actuator 與 Micrometer Tracing 一起使用,則 SLF4J 日誌會填充追蹤資訊,而無需執行任何操作。 很快,Spring Boot 將自動為您呼叫掛鉤。
我們提到過我們會回到組裝時日誌的問題。 到目前為止,我們一直在我們的請求處理邏輯中啟動關聯識別符產生過程。 理想情況下,我們的處理程式應由伺服器呼叫,並且從呼叫程式碼訂閱產生的 Publisher (Flux 或 Mono)。 我們的處理程式恢復到初始形狀
Mono<Void> handleRequest() {
log("Assembling the chain");
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then());
}
讓我們透過將上下文附加到傳回的 Mono 來模仿伺服器程式碼
Mono<Void> requestHandler = handleRequest()
.contextWrite(Context.of("CORRELATION_ID", correlationId()));
接著我們需要執行它
requestHandler.block();
組裝時間仍然缺少關聯識別符
[ main][ null] Assembling the chain
[parallel-1][ 5508113792645841519] Adding product: test-product
[parallel-1][ 5508113792645841519] Notifying shop about: test-product
contextWrite
運算符在訂閱時間(以及其他生命週期事件)期間還原 ThreadLocal
值。 為了讓使用者程式碼在組裝時間期間具有日誌,對該程式碼的整個呼叫需要成為響應式鏈的一部分。 這樣,使用者程式碼會在外部 Mono 的訂閱期間執行,並且傳回的內部 Mono 會立即被訂閱。 對於整個執行過程,如果我們在我們的「框架」程式碼中執行此操作,則外部 Mono 的 Context
可在 ThreadLocals 中使用
Mono<Void> requestHandler = Mono.defer(() -> handleRequest())
.contextWrite(Context.of("CORRELATION_ID", correlationId()));
我們需要做的就是使用 Mono.defer()
並將 Context
附加到它。
幸運的是,Spring Framework 運作良好,並且在訂閱階段也處理我們的組裝。
這種新方法看起來非常令人鼓舞。 有人可能會想,使用過去採用的方法,這種新機制會如何崩潰? 我們對這種方法感到更有信心,因為它更符合 Reactive Streams 的本質。 並非基於 Reactor Context
的方法在其中嵌入了一個主要的誤解 – 它們向下游傳播 ThreadLocal
值 – 希望在某些時候嘗試清理。 然而,沒有語義邊界可以阻止傳播。
依賴 ThreadLocal
值的下游傳播也可能是錯誤的來源。 響應式函式庫向上游和下游傳播訊號。 一個訊號可能會觸發另一個訊號,但並非必須如此。 不同的 Thread
可以繼續處理。 由類似 flatMap
的運算符完成的某些最佳化(例如預取)可以從上游請求和排隊值,而無需我們的下游傳播機制參與。 如果我們希望即使在我們記錄回壓或取消時也能擁有上下文資訊,我們需要考慮所有可能的訊號。
一個重要的觀察來自 Context
指示邏輯邊界的方式。 當您呼叫 contextWrite
並在 Context
中儲存一個值時,所有上游運算符都可以存取修改後的版本。 所有下游運算符都看不到修改,但會看到您的修改所基於的狀態。
Subscriber 綁定的 Context
的性質是我們新方法的基礎。 我們修改了 contextWrite
運算符,以在訊號在訂閱時以及取消和請求時向上游傳播時,設定 ThreadLocal
值以反映目前的 Context
。 但是,每當訊號向下游傳播時,它都會將這些 ThreadLocal
值重設為下游 Context
中表示的值。
我們仍然需要使用 Scheduler
包裝方法。 我們還需要 Queue 包裝方法(為此,我們需要改進生命週期語義)。
但我們可能會考慮透過在這些情況下傳輸 Reactor Context
而不是捕獲 ThreadLocal
值來改善情況。 這可以提高效能。
此外,當我們使用 Reactor 無法控制的 Publishers 或使用我們無法控制的 Threads 的來源(例如使用 Mono.fromFuture()
範例來模擬遠端呼叫)時,我們仍然會遺失 ThreadLocal
值。 目前的緩解措施是引入 contextWrite
運算符的語義邊界,它實際上不會更改 Context
,如 notifyShop
方法的這種變體中所示
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return makeRequest(productName) // <1>
.contextWrite(Function.identity()) // <2>
.doOnNext(r -> log("Request done.")); // <3>
}
makeRequest
方法在系列文章的上一篇文章中定義。 如果我們假設 makeRequest
是第三方函式庫呼叫,它使用我們無法控制的 Threads,我們也無法將它執行的程式碼包裝在 <1>
中以及完成其操作的異步程式碼中。 該鏈的該部分完成的任何日誌都不會填充關聯識別符。 傳播此類上下文將是函式庫作者的責任。 然而,由於我們在 <2>
中使用了邊界,因此我們在 <3>
中的日誌包含關聯識別符。
我們打算在 reactor-core
中新增必要的功能,以便為可以以 Reactor 無法控制的方式更改 Threads 的來源提供這樣的邊界。
在命令式情境中,呼叫響應式程式碼僅使用阻塞訂閱(例如透過使用 block()
),我們計劃自動執行 contextCapture
以將目前的 ThreadLocal
值透明地傳播到響應式鏈中。 這在 Spring MVC 應用程式中與 WebClient 互動時很有用。
捕獲 ThreadLocal
狀態並在不同位置還原它的任務本身就是一個有趣的主題。 通常,我們會想到多個 ThreadLocal
值,它們彼此之間具有邏輯連接,或者一個類似 Map 的結構對應於各種考量。 我們建立了一個專用的函式庫,允許透過捕獲 ThreadLocal
的狀態並將其還原到各自的目標中,在 ThreadLocals 和任意物件之間進行轉換。 在前面的範例中,我們使用了 context-propagation 函式庫的一些 API。 它在 Micrometer 傘下開源,如果您想在您的程式碼中使用它,它也有帶有範例的參考文件。
Project Reactor 註冊了一個 ContextAccessor
,它使用 ServiceLoader JDK 機制處理 Reactor Context
。 另一方面,Micrometer 註冊了一個 ObservationThreadLocalAccessor
,它處理 Micrometer Tracing 和其他儀器機制使用單一 Observation 概念運作所需的 ThreadLocal
狀態。
我們強烈建議嘗試將 Spring Boot 與 Spring Boot Actuator 一起使用,以啟用追蹤功能,並親身體驗這種體驗有多麼具有凝聚力。
在這一系列的部落格文章中,我們介紹了上下文傳播的基礎知識,並介紹了命令式和響應式程式設計範式之間橋樑的歷史和現狀。 我們確實希望您現在可以自信地使用我們實作的功能。 在最佳情況下,如果您使用自動上下文傳播功能,則您無需做太多工作。 此外,在這個有趣的情境中,我們希望您的自訂傳播邏輯可以利用我們在本文中描述的原語。 如果您有疑問或想在 GitHub 上報告問題,可以與我們聯繫。
如果沒有我的同事審閱每一部分的幫助,這個系列文章就不會發佈。 我要感謝(按字母順序排列):Simon Baslé、Jay Bryant、Pierre De Rop、Oleh Dokuka、Marcin Grzejszczak、Robert McNees、Rossen Stoyanchev 和 Tadaya Tsuyukubo。
若要使用範例,請隨時使用我 GitHub 儲存庫中的相關套件。