領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多這篇文章是系列文章的一部分
Spring Cloud Sleuth 最近成為 Micrometer Tracing,這是 Micrometer 專案的一部分。 大部分追蹤檢測都集中在 Micrometer 中的新 Observability API 之下。 這些專案的目標是啟用任何應用程式的可觀察性 - 以指標、追蹤和包含相關識別碼的日誌的形式。 為了實現此目標,程式庫需要一種傳輸上下文資訊的方法。 當應用程式以任何形式處理非同步時,這項任務就變得相當具有挑戰性。 在上一篇文章中,我們介紹了使用 ThreadLocal
和 Reactor Context
進行上下文傳播的基本概念。
Spring Cloud Sleuth 在非同步上下文傳播的方法中經歷了多次轉變。 由於 Sleuth 處理不需要具有反應式 API 的第三方檢測程式庫,因此建立一種使上下文可用的方法至關重要。 這些程式庫通常不假設非同步,而是依賴靜態 ThreadLocal
狀態。 多年來,ThreadLocal
為 JVM 應用程式提供了隱式上下文儲存,用於驅動可觀察性功能。 隨著時間的推移,Project Reactor 在底層原始元件之上引入了各種掛鉤和包裝機制,以便實現反應式和命令式之間的橋接。 在本文中,我們旨在探索將上下文傳播到 ThreadLocal
值的方法,並討論它們的潛在錯誤。 我們將探索 Sleuth 採用的方法,並總結我們發現的一個良好的折衷方案,該方案既高效又在語義上合理。
在我們描述 Sleuth 引入的方法之前,我們應該考慮在命令式和反應式世界之間架起橋樑所存在的危險。
我們在上一篇文章中討論了 Thread
切換和相關副作用的一些潛在問題。 現在,我們將通過使用 Reactor 的外掛程式機制來解決我們可能遇到的問題,從而更多地探索反應式程式設計的特性。
總結 Spring Cloud Sleuth 遇到的所有問題是一個不斷變化的目標。 此外,許多組織都有自己的上下文傳播機制的實現,例如,用於填充 SLF4J 的 MDC
。 本文並非旨在全面總結所有潛在的陷阱。 相反,它旨在建立一些直覺,幫助您理解最終的真相:你要么按照反應式程式設計規則行事,要么準備好在最意想不到的時刻失敗。
正如我們所知,反應式鏈可以使用不同的 Thread
傳播訊號。 從我們在 上一篇文章 中了解到的,當在另一個 Thread
上繼續執行時,在執行任務時恢復上下文是有意義的。 Project Reactor 將管理 Thread
的任務委託給 Scheduler
。 它還提供了一個專用掛鉤,允許攔截特定工作單位的排程和執行:Schedulers.onScheduleHook
。 它的工作方式與上一篇文章中的 WrappedExecutor
類似。 讓我們看一個我們可能考慮使用它的場景。
在第 1 部分中,我們了解到我們不能依賴 ThreadLocal
值在反應式鏈中始終可用。 如果我們嘗試在訂閱時初始化它,並在 doFinally
運算符中清除它呢? 我們的應用程式可以使用有限數量的 Thread
並發處理許多請求。 由於這些平台 Thread
可以重複使用,我們需要在處理另一個請求之前清除與一個請求關聯的任何 ThreadLocal
狀態,以便不同的請求不會使用剩餘的相關識別碼。
以下代碼範例是對我們在上一部分中編寫的代碼的修改,在上一部分中我們沒有使用 Reactor Context
。
handleRequest
方法的潛在實現可能如下所示
Mono<Void> handleRequest() {
return Mono.fromSupplier(() -> {
initRequest(); // <1>
return "test-product";
}).flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.doOnSuccess(v -> log("Done."))
.doFinally(signalType ->
CORRELATION_ID.remove()); // <2>
}
在 <1>
中,我們設定 ThreadLocal
值,在 <2>
中,我們嘗試清除它。
我們還修改了我們執行的操作,以便能夠在 addProduct
方法中新增一個人為的延遲
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return Mono.<Void>empty()
.delaySubscription(Duration.ofMillis(10),
Schedulers.single()); // <1>
});
}
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return Mono.just(true);
});
}
請注意,在 <1>
中,我們通過延遲訂閱並使用 Schedulers.single()
在 10 毫秒後啟動訂閱來引入非同步。 delaySubscription
將使用該 Scheduler
的底層 ScheduledExecutorService
,並在延遲後在另一個 Thread
上啟動訂閱。
從上一篇文章中,我們知道在這種情況下我們需要恢復 ThreadLocals
,因此我們使用前面提到的 Scheduler
外掛程式來實現這一點
Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);
在 Reactor 的 Scheduler
上執行的每個任務都將恢復 ThreadLocal
值,因此我們應該是安全的。
現在,讓我們模仿兩個連續的請求,並以記錄來驗證 CORRELATION_ID
是否已正確清除
log("Got first request, calling handler");
handleRequest().block();
log("Got second request, calling handler");
log("There should be no correlationId on this line!");
handleRequest().block();
日誌如下
[ main][ null] Got first request, calling handler // <1>
[ main][ 8658769170992364531] Adding product: test-product
[ single-1][ 8658769170992364531] Notifying shop about: test-product
[ single-1][ 8658769170992364531] Done.
[ main][ 8658769170992364531] Got second request, calling handler
[ main][ 8658769170992364531] There should be no correlationId on this line!
[ main][ 711436174608061530] Adding product: test-product
[ single-1][ 711436174608061530] Notifying shop about: test-product
[ single-1][ 711436174608061530] Done.
與 “test-product”
處理相關的日誌具有正確的相關識別碼。 但是,請求之間發生了什麼? 我們期望在 doFinally
中清除 ThreadLocal
。 不幸的是,請求之間的日誌仍然包含一個識別碼。 那時發生了什麼?
請注意,“Notifying shop about”
日誌發生在 Thread
single-1
上。 訊號已在該 Thread
上傳遞,因此我們在那裡清除了 ThreadLocal
,但使主 Thread
受到污染(在 <1>
中)。 現在,我們的處理程式之外的執行可以使用錯誤的相關識別碼用於不同的目的。 我們可以嘗試通過向伺服器層(分派請求)新增清除邏輯,並確保用於請求的每個 Thread
不受污染來緩解此問題。 如果我們的管道更複雜,這將不會保存所有其他潛在的 Scheduler
Thread
。
這種方法在允許應用程式在反應式鏈中透明地使用 ThreadLocal
值方面取得了相當大的進展。 從效能的角度來看,這也是合理的,因為它不會在每個運算符周圍設定和重設 ThreadLocal
,而只會在處理項目時發生 Thread
切換時才設定和重設 ThreadLocal
。 但是,它也表明仍然存在未解決的副作用。 在下一個範例中,我們將體驗並嘗試處理不同的場景。
對於使用 ThreadLocal
作為上下文元數據傳輸機制的策略,另一個常見問題是當使用 Reactor 之外的其他非同步程式庫並且它自行切換 Thread
時。 當執行變更為不受包裝的 ExecutorService
控制的不同 Thread
時,上下文將會遺失。
讓我們看看實際效果。 我們將重複使用到目前為止我們看到的大部分代碼,並對 notifyShop
方法進行一項變更。 它現在通過使用以下方法來模擬遠端呼叫
Mono<Boolean> makeRequest(String productName) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}
因此 notifyShop
如下所示
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName);
});
}
如果我們觸發一次處理程式
handleRequest().block();
我們會得到以下輸出
[ main][ 683056557275963407] Adding product: test-product
[ single-1][ 683056557275963407] Notifying shop about: test-product
[l-worker-1][ null] Done!
這些日誌縮短了 Thread
名稱以提高可見性,但 l-worker-1
實際上是 ForkJoinPool.commonPool-worker-1
的縮短版本。
正如我們所看到的,我們的執行在我們無法控制的通用 ForkJoinPool
上繼續進行。 一個問題是我們從該 Thread
切換開始不再看到我們的相關識別碼,但另一個問題是我們在實際上缺少相關資訊的 Thread
上執行清除。
我們可以使用在上一篇文章中介紹的 Executor
或任務包裝來潛在地(部分地)改善這種情況,但我們並不總是擁有這種控制權 - 例如,如果我們呼叫使用 CompletableFuture
的外部程式庫。
我們幾乎準備好討論 Sleuth 的策略了。Schedulers.onScheduleHook
在反應式處理中可能發生的不明顯的 Thread
切換方面,提供了有限的功能。 我們需要對操作的執行有更多的控制。 我們將通過引入兩種外部服務通信方式來演示其局限性。
現在,addProduct
方法會發出遠端請求,並在我們控制的 Scheduler
上發布結果。 通常會將繁重的運算卸載到不同的 Thread
。 為此,我們使用 publishOn
運算子。
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return makeRequest(productName)
.publishOn(Schedulers.single())
.then();
});
}
notifyShop
方法模擬將結果對應到可能多個 Publisher
。 如果回應是複合結果,這可能是一種典型的場景 - 例如,如果回應是 JSON 陣列,並且我們打算將每個項目作為對另一個服務的單獨調用進行處理,或豐富個別結果。 讓我們使用一個簡化的版本,只取單一結果。
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName)
.flatMapMany(result ->
Flux.just("result")
.map(x -> result))
.take(1)
.single();
});
}
現在讓我們跳過處理常式,手動啟動關聯識別符,然後訂閱這些鏈。
initRequest();
addProduct("test-product")
.doOnSuccess(v -> log("Added."))
.block();
initRequest();
notifyShop("test-product")
.doOnSuccess(v -> log("Notified."))
.block();
讓我們看看輸出。
[ main][ 6606077262934500649] Adding product: test-product
[ single-1][ null] Added.
[ main][ 182687922231622589] Notifying shop about: test-product
[l-worker-1][ null] Notified.
這是預期的,因為 doOnSuccess
中發生的兩個日誌都是由於 CompletableFuture
在 ForkJoinPool
Thread
上傳遞值而觸發的。 即使我們有 Scheduler
包裝,結果也會首先在我們不控制的 Thread
上傳遞,因此即使 addProduct
中使用的 publishOn
也無濟於事。
我們能做些什麼來改善這種情況嗎? Reactor 有一個細粒度的外掛程式系統,可讓我們裝飾任何管道中的任何運算子。 我們可以嘗試使用它來恢復關聯識別符。
外掛程式將使用自定義 Subscriber
實現,該實現會在訂閱時捕獲關聯識別符。
static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
final CoreSubscriber<T> delegate;
Long correlationId;
public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
this.delegate = delegate;
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
this.correlationId = CORRELATION_ID.get();
}
@Override
public void onNext(T t) {
CORRELATION_ID.set(this.correlationId);
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
CORRELATION_ID.set(this.correlationId);
delegate.onError(t);
}
@Override
public void onComplete() {
CORRELATION_ID.set(this.correlationId);
delegate.onComplete();
}
}
要更改運算子以讓我們的實現委派對實際 Subscriber
實例的調用,我們可以使用 Operators.lift
方法。
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber));
首先,我們將嘗試一個外掛程式,它可以讓我們更改鏈中的每個運算子。
Hooks.onEachOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
讓我們再次執行我們的範例並檢查輸出。
[ main][ 7295088917002526647] Adding product: test-product
[ single-1][ 7295088917002526647] Added.
[ main][ 383851863754448684] Notifying shop about: test-product
[l-worker-1][ 383851863754448684] Notified.
哇! 我們甚至在如此複雜的情況下都設法取得關聯識別符。 初始的訂閱行為捕獲了 ThreadLocal
值,並在每個步驟中恢復它。 甚至 notifyShop
方法中使用的 flatMap
(它會自行訂閱)也能正常工作,因為在訂閱另一個 Thread
之前,ThreadLocal
會從先前的捕獲中填入! 這聽起來確實很棒,但是這種方法也有缺點。 第一個也是最明顯的一個是效能。 傳播發生在每個運算子的每一個步驟中。 使用該技術,我們先裝飾每個物件,並且在每個步驟中都進行 ThreadLocal
存取。 所有這些都很昂貴。 若要了解更多資訊,請觀看 Oleh 關於反應式效能的演講。
因此,讓我們嘗試一種不同的方法。 這次,我們將使用一個外掛程式,它會附加到鏈中被視為最後一個的每個運算子 - 直接在 subscribe()
呼叫之前的運算子。
可以對反應式鏈進行一項觀察:在同步運算子的情況下,我們不需要在每個單獨的操作(例如,filter
或 map
)中恢復最初捕獲的內容,而只需在訂閱鏈中的最後一個運算子時恢復。 只要不涉及 Thread
邊界交叉,此機制就可以正常工作。 為了支援可能跨越這些邊界的運算子(例如,涉及訂閱新的 Publisher
的 flatMap
),涉及一種特殊的技巧。 它將對應的結果視為內部 Publisher
s 操作的最後一個運算子。
讓我們嘗試這種方法
Hooks.onLastOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
然後執行
[ main][ 2122332013640150746] Adding product: test-product
[ single-1][ 2122332013640150746] Added.
[ main][ 459477771449275997] Notifying shop about: test-product
[l-worker-1][ null] Notified.
它在 addProduct
中使用 publishOn
正常工作,但在 notifyShop
中使用 flatMap
失敗。
讓我們分析為什麼 notifyShop
會失敗。 我們對 block()
的呼叫會捕獲 ThreadLocal
並將其恢復到下游傳輸的每個訊號。 在 flatMapMany
中完成的對應,我們正在處理先前提到過的非同步邊界。 實際上,我們的外掛程式應用於內部來源 (Flux.just().map().single()
)。
但是,儘管在 flatMapMany
中內部呼叫了自定義 Subscriber
並嘗試恢復 ThreadLocal
值,但這些努力仍然沒有幫助。 觸發內部訂閱的訊號是在我們不控制的 Thread
上啟動的,因此我們首先沒有 ThreadLocal
可以捕獲。
在 publishOn
運算子的情況下,情況有所不同。 對它的訂閱在我們控制的 Thread
中開始。 因此,當訊號作為 makeRequest()
方法的結果進行處理時,它僅在我們控制的 Thread
上傳遞。 在與 flatMapMany
的情況不同的 Thread
邊界之後,會發生 .doOnSuccess(v -> log("Added."))
的執行。
這就是為什麼 onEachOperator
涵蓋更多案例的原因 - 它在每個步驟中恢復初始值,而與非同步邊界無關。 不過,onLastOperator
的效能比 onEachOperator
略好。
如果我們將它與先前的 hook 結合使用,我們可以再使用一個外掛程式來完全控制反應式傳遞。 Spring Cloud Sleuth 也使用它。 我們正在考慮一個最近推出的外掛程式 Hooks.addQueueWrapper
。 不過,我們不會詳細探討它。 它可以解決 Reactor 中工作竊取機制引入的問題。 非同步運算子(例如 flatMap
)可以在將訊號傳遞給運算子的各種 Thread
上取得進展。 想像一個背壓場景,其中處理暫時停滯。 在某些時候,一個新的 Thread
可以接管並發出 Subscription.request(n)
呼叫,這會導致累積的值立即傳遞。 現在您可以問自己:「什麼是累積的值?」 這是一個好問題。 Reactor 中的許多運算子都使用內部 Queue
使背壓成為可能或保留序列傳遞語意。 由於這些 Queue
的排空可以在任何 Thread
上發生,因此上下文資訊應附加到儲存在 Queue
中的每個訊號 - 也就是說,用於關聯目的的 ThreadLocal
值。 這就是我們需要 Queue
包裝器的原因 - 在將值提交到 Queue
中時,我們會捕獲 ThreadLocal
狀態。 從 Queue
檢索值時,狀態將恢復。
在展示了在反應式流條款之外運作的風險以及我們可以使用的傳播 ThreadLocal
上下文的機制之後,讓我們總結一下 Spring Cloud Sleuth 使用的四種策略
DECORATE_ON_EACH
DECORATE_ON_LAST
DECORATE_QUEUES
MANUAL
前三種策略嘗試使用反應式運算子的某些屬性,以及 Reactor 的 外掛程式機制,並使用 ThreadLocal
作為內部傳輸機制以及與檢測程式庫共用上下文資料的方法。 前三種策略也假設使用 Schedulers.onScheduleHook 進行 Scheduler
包裝。 另一方面,最後一種策略利用 Reactor 的 Subscriber
綁定的 Context
。
這個策略使用了我們之前見過的 Hooks.onEachOperator
外掛程式。 即使 Sleuth 增加了許多最佳化以避免在不必要時進行還原,效能影響仍然非常顯著。 通常,此方法非常有效。 儘管如此,它非常激進,如果 operator 需要更改上下文,則處理起來可能會很麻煩。 下游的 operators 看不到任何更改,因為來自初始訂閱的上下文會在每個步驟中還原。
Hooks.onLastOperator
用於提高效能。 這種方法可能會失敗,因為它提供了靈活性。 如果上游 operator 修改了上下文,下游操作將會看到變更。 這帶來了風險,如果一個 operator 清除了該上下文,則該上下文將會遺失,直到另一個信號被排程到封裝的 Scheduler
。 另一個風險是我們在之前的範例中看到的,訂閱發生在某些 Thread
上,但請求資料發生在另一個不在 Reactor 控制範圍內的 Thread
上。
DECORATE_QUEUES
是先前策略的演進,它修正了一些錯誤的場景(請求資料在頻外發生或多個 Threads
發布資料),但並非所有場景都修正了。 Hooks.addQueueWrapper
外掛程式以我們之前描述的方式使用。 Queue
包裝的一個已知問題是,在處理完一個項目後,沒有可靠的方法可以進行清理。 從 Queue
檢索項目時,上下文會還原。 沒有任何範圍圍繞通過下游 operators 的項目處理。 因此,這種方法也容易汙染 ThreadLocal
儲存空間。 最近在排水程序中進行了一些改進,以限制影響。
在這個策略中,Sleuth 唯一做的事情就是在訂閱時將 ThreadLocal
中的值捕獲到 Reactor 的 Context
中作為快照。 由使用者在相關位置提取該快照並填充 ThreadLocal
,以使其可用於儀表庫。 對於受支援的追蹤儀器(例如 Zipkin 和 Brave),Sleuth 使用範圍的概念來還原 ThreadLocal
- ThreadLocal
會針對儀器還原,並在快照關閉後立即消失。 這是效能最佳的方法,儘管它需要使用者手動(顧名思義)處理。
在本地範圍內使用 Reactor Context 填充 ThreadLocal
被證明既高效能又符合反應式鏈的工作方式。 將上下文與 Subscriber
關聯是一種經過驗證的方法,不會意外地導致上下文資料遺失。 在下一篇文章中,我們將展示 Reactor 3.5 和 Micrometer 1.10 如何將手動方法提升到一個新的水平,並提供一個結構化的方法來實現跨反應式和命令式邊界的上下文傳播。