透過 Project Reactor 2 進行上下文傳播 - Spring Cloud Sleuth 的坎坷之路

工程 | Dariusz Jędrzejczyk | 2023 年 3 月 29 日 | ...

這篇文章是系列文章的一部分

  1. 基本概念
  2. Spring Cloud Sleuth 的坎坷之路
  3. 反應式與命令式之間的統一橋接

Spring Cloud Sleuth 最近成為 Micrometer Tracing,這是 Micrometer 專案的一部分。 大部分追蹤檢測都集中在 Micrometer 中的新 Observability API 之下。 這些專案的目標是啟用任何應用程式的可觀察性 - 以指標、追蹤和包含相關識別碼的日誌的形式。 為了實現此目標,程式庫需要一種傳輸上下文資訊的方法。 當應用程式以任何形式處理非同步時,這項任務就變得相當具有挑戰性。 在上一篇文章中,我們介紹了使用 ThreadLocal 和 Reactor Context 進行上下文傳播的基本概念。

Spring Cloud Sleuth 在非同步上下文傳播的方法中經歷了多次轉變。 由於 Sleuth 處理不需要具有反應式 API 的第三方檢測程式庫,因此建立一種使上下文可用的方法至關重要。 這些程式庫通常不假設非同步,而是依賴靜態 ThreadLocal 狀態。 多年來,ThreadLocal 為 JVM 應用程式提供了隱式上下文儲存,用於驅動可觀察性功能。 隨著時間的推移,Project Reactor 在底層原始元件之上引入了各種掛鉤和包裝機制,以便實現反應式和命令式之間的橋接。 在本文中,我們旨在探索將上下文傳播到 ThreadLocal 值的方法,並討論它們的潛在錯誤。 我們將探索 Sleuth 採用的方法,並總結我們發現的一個良好的折衷方案,該方案既高效又在語義上合理。

在我們描述 Sleuth 引入的方法之前,我們應該考慮在命令式和反應式世界之間架起橋樑所存在的危險。

隱藏並發情況下的副作用的陷阱

我們在上一篇文章中討論了 Thread 切換和相關副作用的一些潛在問題。 現在,我們將通過使用 Reactor 的外掛程式機制來解決我們可能遇到的問題,從而更多地探索反應式程式設計的特性。

總結 Spring Cloud Sleuth 遇到的所有問題是一個不斷變化的目標。 此外,許多組織都有自己的上下文傳播機制的實現,例如,用於填充 SLF4J 的 MDC。 本文並非旨在全面總結所有潛在的陷阱。 相反,它旨在建立一些直覺,幫助您理解最終的真相:你要么按照反應式程式設計規則行事,要么準備好在最意想不到的時刻失敗。

排程器掛鉤

正如我們所知,反應式鏈可以使用不同的 Thread 傳播訊號。 從我們在 上一篇文章 中了解到的,當在另一個 Thread 上繼續執行時,在執行任務時恢復上下文是有意義的。 Project Reactor 將管理 Thread 的任務委託給 Scheduler。 它還提供了一個專用掛鉤,允許攔截特定工作單位的排程和執行:Schedulers.onScheduleHook。 它的工作方式與上一篇文章中的 WrappedExecutor 類似。 讓我們看一個我們可能考慮使用它的場景。

清除

在第 1 部分中,我們了解到我們不能依賴 ThreadLocal 值在反應式鏈中始終可用。 如果我們嘗試在訂閱時初始化它,並在 doFinally 運算符中清除它呢? 我們的應用程式可以使用有限數量的 Thread 並發處理許多請求。 由於這些平台 Thread 可以重複使用,我們需要在處理另一個請求之前清除與一個請求關聯的任何 ThreadLocal 狀態,以便不同的請求不會使用剩餘的相關識別碼。

以下代碼範例是對我們在上一部分中編寫的代碼的修改,在上一部分中我們沒有使用 Reactor Context

handleRequest 方法的潛在實現可能如下所示

Mono<Void> handleRequest() {
  return Mono.fromSupplier(() -> {
    initRequest(); // <1>
    return "test-product";
  }).flatMap(product ->
    Flux.concat(
      addProduct(product),
      notifyShop(product)).then())
    .doOnSuccess(v -> log("Done."))
    .doFinally(signalType ->
      CORRELATION_ID.remove()); // <2>
}

<1> 中,我們設定 ThreadLocal 值,在 <2> 中,我們嘗試清除它。

我們還修改了我們執行的操作,以便能夠在 addProduct 方法中新增一個人為的延遲

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return Mono.<Void>empty()
      .delaySubscription(Duration.ofMillis(10),
        Schedulers.single()); // <1>
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return Mono.just(true);
  });
}

請注意,在 <1> 中,我們通過延遲訂閱並使用 Schedulers.single() 在 10 毫秒後啟動訂閱來引入非同步。 delaySubscription 將使用該 Scheduler 的底層 ScheduledExecutorService,並在延遲後在另一個 Thread 上啟動訂閱。

從上一篇文章中,我們知道在這種情況下我們需要恢復 ThreadLocals,因此我們使用前面提到的 Scheduler 外掛程式來實現這一點

Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);

在 Reactor 的 Scheduler 上執行的每個任務都將恢復 ThreadLocal 值,因此我們應該是安全的。

現在,讓我們模仿兩個連續的請求,並以記錄來驗證 CORRELATION_ID 是否已正確清除

log("Got first request, calling handler");
handleRequest().block();

log("Got second request, calling handler");
log("There should be no correlationId on this line!");

handleRequest().block();

日誌如下

[      main][                null] Got first request, calling handler // <1>
[      main][ 8658769170992364531] Adding product: test-product
[  single-1][ 8658769170992364531] Notifying shop about: test-product
[  single-1][ 8658769170992364531] Done.
[      main][ 8658769170992364531] Got second request, calling handler
[      main][ 8658769170992364531] There should be no correlationId on this line!
[      main][  711436174608061530] Adding product: test-product
[  single-1][  711436174608061530] Notifying shop about: test-product
[  single-1][  711436174608061530] Done.

“test-product” 處理相關的日誌具有正確的相關識別碼。 但是,請求之間發生了什麼? 我們期望在 doFinally 中清除 ThreadLocal。 不幸的是,請求之間的日誌仍然包含一個識別碼。 那時發生了什麼?

請注意,“Notifying shop about” 日誌發生在 Thread single-1 上。 訊號已在該 Thread 上傳遞,因此我們在那裡清除了 ThreadLocal,但使主 Thread 受到污染(在 <1> 中)。 現在,我們的處理程式之外的執行可以使用錯誤的相關識別碼用於不同的目的。 我們可以嘗試通過向伺服器層(分派請求)新增清除邏輯,並確保用於請求的每個 Thread 不受污染來緩解此問題。 如果我們的管道更複雜,這將不會保存所有其他潛在的 Scheduler Thread

這種方法在允許應用程式在反應式鏈中透明地使用 ThreadLocal 值方面取得了相當大的進展。 從效能的角度來看,這也是合理的,因為它不會在每個運算符周圍設定和重設 ThreadLocal,而只會在處理項目時發生 Thread 切換時才設定和重設 ThreadLocal。 但是,它也表明仍然存在未解決的副作用。 在下一個範例中,我們將體驗並嘗試處理不同的場景。

外部來源和接收器的困難

對於使用 ThreadLocal 作為上下文元數據傳輸機制的策略,另一個常見問題是當使用 Reactor 之外的其他非同步程式庫並且它自行切換 Thread 時。 當執行變更為不受包裝的 ExecutorService 控制的不同 Thread 時,上下文將會遺失。

讓我們看看實際效果。 我們將重複使用到目前為止我們看到的大部分代碼,並對 notifyShop 方法進行一項變更。 它現在通過使用以下方法來模擬遠端呼叫

Mono<Boolean> makeRequest(String productName) {
  return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
    CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}

因此 notifyShop 如下所示

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName);
  });
}

如果我們觸發一次處理程式

handleRequest().block();

我們會得到以下輸出

[      main][  683056557275963407] Adding product: test-product
[  single-1][  683056557275963407] Notifying shop about: test-product
[l-worker-1][                null] Done!

這些日誌縮短了 Thread 名稱以提高可見性,但 l-worker-1 實際上是 ForkJoinPool.commonPool-worker-1 的縮短版本。

正如我們所看到的,我們的執行在我們無法控制的通用 ForkJoinPool 上繼續進行。 一個問題是我們從該 Thread 切換開始不再看到我們的相關識別碼,但另一個問題是我們在實際上缺少相關資訊的 Thread 上執行清除。

我們可以使用在上一篇文章中介紹的 Executor 或任務包裝來潛在地(部分地)改善這種情況,但我們並不總是擁有這種控制權 - 例如,如果我們呼叫使用 CompletableFuture 的外部程式庫。

Operator Hooks (運算子 Hook)

我們幾乎準備好討論 Sleuth 的策略了。Schedulers.onScheduleHook 在反應式處理中可能發生的不明顯的 Thread 切換方面,提供了有限的功能。 我們需要對操作的執行有更多的控制。 我們將通過引入兩種外部服務通信方式來演示其局限性。

現在,addProduct 方法會發出遠端請求,並在我們控制的 Scheduler 上發布結果。 通常會將繁重的運算卸載到不同的 Thread。 為此,我們使用 publishOn 運算子。

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return makeRequest(productName)
      .publishOn(Schedulers.single())
      .then();
  });
}

notifyShop 方法模擬將結果對應到可能多個 Publisher。 如果回應是複合結果,這可能是一種典型的場景 - 例如,如果回應是 JSON 陣列,並且我們打算將每個項目作為對另一個服務的單獨調用進行處理,或豐富個別結果。 讓我們使用一個簡化的版本,只取單一結果。

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName)
      .flatMapMany(result ->
        Flux.just("result")
          .map(x -> result))
          .take(1)
          .single();
    });
}

現在讓我們跳過處理常式,手動啟動關聯識別符,然後訂閱這些鏈。

initRequest();
addProduct("test-product")
  .doOnSuccess(v -> log("Added."))
  .block();

initRequest();
notifyShop("test-product")
  .doOnSuccess(v -> log("Notified."))
  .block();

讓我們看看輸出。

[      main][ 6606077262934500649] Adding product: test-product
[  single-1][                null] Added.
[      main][  182687922231622589] Notifying shop about: test-product
[l-worker-1][                null] Notified.

這是預期的,因為 doOnSuccess 中發生的兩個日誌都是由於 CompletableFutureForkJoinPool Thread 上傳遞值而觸發的。 即使我們有 Scheduler 包裝,結果也會首先在我們不控制的 Thread 上傳遞,因此即使 addProduct 中使用的 publishOn 也無濟於事。

我們能做些什麼來改善這種情況嗎? Reactor 有一個細粒度的外掛程式系統,可讓我們裝飾任何管道中的任何運算子。 我們可以嘗試使用它來恢復關聯識別符。

外掛程式將使用自定義 Subscriber 實現,該實現會在訂閱時捕獲關聯識別符。

static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
  final CoreSubscriber<T> delegate;
  Long correlationId;

  public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
    this.delegate = delegate;
  }

  @Override
  public void onSubscribe(Subscription s) {
    delegate.onSubscribe(s);
    this.correlationId = CORRELATION_ID.get();
  }

  @Override
  public void onNext(T t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onNext(t);
  }

  @Override
  public void onError(Throwable t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onError(t);
  }

  @Override
  public void onComplete() {
    CORRELATION_ID.set(this.correlationId);
    delegate.onComplete();
  }
}

要更改運算子以讓我們的實現委派對實際 Subscriber 實例的調用,我們可以使用 Operators.lift 方法。

Operators.lift((scannable, subscriber) ->
  new CorrelatingSubscriber<>(subscriber));

onEachOperator Hook

首先,我們將嘗試一個外掛程式,它可以讓我們更改鏈中的每個運算子。

Hooks.onEachOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

讓我們再次執行我們的範例並檢查輸出。

[      main][ 7295088917002526647] Adding product: test-product
[  single-1][ 7295088917002526647] Added.
[      main][  383851863754448684] Notifying shop about: test-product
[l-worker-1][  383851863754448684] Notified.

哇! 我們甚至在如此複雜的情況下都設法取得關聯識別符。 初始的訂閱行為捕獲了 ThreadLocal 值,並在每個步驟中恢復它。 甚至 notifyShop 方法中使用的 flatMap(它會自行訂閱)也能正常工作,因為在訂閱另一個 Thread 之前,ThreadLocal 會從先前的捕獲中填入! 這聽起來確實很棒,但是這種方法也有缺點。 第一個也是最明顯的一個是效能。 傳播發生在每個運算子的每一個步驟中。 使用該技術,我們先裝飾每個物件,並且在每個步驟中都進行 ThreadLocal 存取。 所有這些都很昂貴。 若要了解更多資訊,請觀看 Oleh 關於反應式效能的演講

onLastOperator Hook

因此,讓我們嘗試一種不同的方法。 這次,我們將使用一個外掛程式,它會附加到鏈中被視為最後一個的每個運算子 - 直接在 subscribe() 呼叫之前的運算子。

可以對反應式鏈進行一項觀察:在同步運算子的情況下,我們不需要在每個單獨的操作(例如,filtermap)中恢復最初捕獲的內容,而只需在訂閱鏈中的最後一個運算子時恢復。 只要不涉及 Thread 邊界交叉,此機制就可以正常工作。 為了支援可能跨越這些邊界的運算子(例如,涉及訂閱新的 PublisherflatMap),涉及一種特殊的技巧。 它將對應的結果視為內部 Publishers 操作的最後一個運算子。

讓我們嘗試這種方法

Hooks.onLastOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

然後執行

[      main][ 2122332013640150746] Adding product: test-product
[  single-1][ 2122332013640150746] Added.
[      main][  459477771449275997] Notifying shop about: test-product
[l-worker-1][                null] Notified.

它在 addProduct 中使用 publishOn 正常工作,但在 notifyShop 中使用 flatMap 失敗。

讓我們分析為什麼 notifyShop 會失敗。 我們對 block() 的呼叫會捕獲 ThreadLocal 並將其恢復到下游傳輸的每個訊號。 在 flatMapMany 中完成的對應,我們正在處理先前提到過的非同步邊界。 實際上,我們的外掛程式應用於內部來源 (Flux.just().map().single())。

但是,儘管在 flatMapMany 中內部呼叫了自定義 Subscriber 並嘗試恢復 ThreadLocal 值,但這些努力仍然沒有幫助。 觸發內部訂閱的訊號是在我們不控制的 Thread 上啟動的,因此我們首先沒有 ThreadLocal 可以捕獲。

publishOn 運算子的情況下,情況有所不同。 對它的訂閱在我們控制的 Thread 中開始。 因此,當訊號作為 makeRequest() 方法的結果進行處理時,它僅在我們控制的 Thread 上傳遞。 在與 flatMapMany 的情況不同的 Thread 邊界之後,會發生 .doOnSuccess(v -> log("Added.")) 的執行。

這就是為什麼 onEachOperator 涵蓋更多案例的原因 - 它在每個步驟中恢復初始值,而與非同步邊界無關。 不過,onLastOperator 的效能比 onEachOperator 略好。

addQueueWrapper Hook

如果我們將它與先前的 hook 結合使用,我們可以再使用一個外掛程式來完全控制反應式傳遞。 Spring Cloud Sleuth 也使用它。 我們正在考慮一個最近推出的外掛程式 Hooks.addQueueWrapper。 不過,我們不會詳細探討它。 它可以解決 Reactor 中工作竊取機制引入的問題。 非同步運算子(例如 flatMap)可以在將訊號傳遞給運算子的各種 Thread 上取得進展。 想像一個背壓場景,其中處理暫時停滯。 在某些時候,一個新的 Thread 可以接管並發出 Subscription.request(n) 呼叫,這會導致累積的值立即傳遞。 現在您可以問自己:「什麼是累積的值?」 這是一個好問題。 Reactor 中的許多運算子都使用內部 Queue 使背壓成為可能或保留序列傳遞語意。 由於這些 Queue 的排空可以在任何 Thread 上發生,因此上下文資訊應附加到儲存在 Queue 中的每個訊號 - 也就是說,用於關聯目的的 ThreadLocal 值。 這就是我們需要 Queue 包裝器的原因 - 在將值提交到 Queue 中時,我們會捕獲 ThreadLocal 狀態。 從 Queue 檢索值時,狀態將恢復。

Context Propagation in Spring Cloud Sleuth (Spring Cloud Sleuth 中的上下文傳播)

在展示了在反應式流條款之外運作的風險以及我們可以使用的傳播 ThreadLocal 上下文的機制之後,讓我們總結一下 Spring Cloud Sleuth 使用的四種策略

  1. DECORATE_ON_EACH
  2. DECORATE_ON_LAST
  3. DECORATE_QUEUES
  4. MANUAL

前三種策略嘗試使用反應式運算子的某些屬性,以及 Reactor 的 外掛程式機制,並使用 ThreadLocal 作為內部傳輸機制以及與檢測程式庫共用上下文資料的方法。 前三種策略也假設使用 Schedulers.onScheduleHook 進行 Scheduler 包裝。 另一方面,最後一種策略利用 Reactor 的 Subscriber 綁定的 Context

DECORATE_ON_EACH

這個策略使用了我們之前見過的 Hooks.onEachOperator 外掛程式。 即使 Sleuth 增加了許多最佳化以避免在不必要時進行還原,效能影響仍然非常顯著。 通常,此方法非常有效。 儘管如此,它非常激進,如果 operator 需要更改上下文,則處理起來可能會很麻煩。 下游的 operators 看不到任何更改,因為來自初始訂閱的上下文會在每個步驟中還原。

DECORATE_ON_LAST

Hooks.onLastOperator 用於提高效能。 這種方法可能會失敗,因為它提供了靈活性。 如果上游 operator 修改了上下文,下游操作將會看到變更。 這帶來了風險,如果一個 operator 清除了該上下文,則該上下文將會遺失,直到另一個信號被排程到封裝的 Scheduler。 另一個風險是我們在之前的範例中看到的,訂閱發生在某些 Thread 上,但請求資料發生在另一個不在 Reactor 控制範圍內的 Thread 上。

DECORATE_QUEUES

DECORATE_QUEUES 是先前策略的演進,它修正了一些錯誤的場景(請求資料在頻外發生或多個 Threads 發布資料),但並非所有場景都修正了。 Hooks.addQueueWrapper 外掛程式以我們之前描述的方式使用。 Queue 包裝的一個已知問題是,在處理完一個項目後,沒有可靠的方法可以進行清理。 從 Queue 檢索項目時,上下文會還原。 沒有任何範圍圍繞通過下游 operators 的項目處理。 因此,這種方法也容易汙染 ThreadLocal 儲存空間。 最近在排水程序中進行了一些改進,以限制影響。

MANUAL

在這個策略中,Sleuth 唯一做的事情就是在訂閱時將 ThreadLocal 中的值捕獲到 Reactor 的 Context 中作為快照。 由使用者在相關位置提取該快照並填充 ThreadLocal,以使其可用於儀表庫。 對於受支援的追蹤儀器(例如 Zipkin 和 Brave),Sleuth 使用範圍的概念來還原 ThreadLocal - ThreadLocal 會針對儀器還原,並在快照關閉後立即消失。 這是效能最佳的方法,儘管它需要使用者手動(顧名思義)處理。

Evolution

在本地範圍內使用 Reactor Context 填充 ThreadLocal 被證明既高效能又符合反應式鏈的工作方式。 將上下文與 Subscriber 關聯是一種經過驗證的方法,不會意外地導致上下文資料遺失。 在下一篇文章中,我們將展示 Reactor 3.5 和 Micrometer 1.10 如何將手動方法提升到一個新的水平,並提供一個結構化的方法來實現跨反應式和命令式邊界的上下文傳播。

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部