使用 Project Reactor 進行上下文傳播 1 - 基礎篇

工程 | Dariusz Jędrzejczyk | 2023 年 3 月 28 日 | ...

這篇文章是一個系列的一部分

  1. 基礎篇
  2. Spring Cloud Sleuth 的坎坷之路
  3. 反應式與命令式之間的統一橋接

Spring Boot 3 和 Spring Framework 6 為我們帶來了一種統一且一致的方式,可以在使用 Micrometer 的應用程式中啟用可觀察性。從 Spring Cloud Sleuth 到 Micrometer 的演變,以及 Observation API 和 Micrometer Tracing,使我們能夠整合各種上下文傳播方法。在本部落格文章系列中,我們旨在解釋我們如何開始支援 Project Reactor 中的上下文傳播,以滿足命令式函式庫的需求。透過從頭開始建立您的理解,您將能夠使用這些結構並了解幕後發生了什麼。我們假設您對反應式程式設計概念有基本的了解。如果您是新手或想要複習您的知識,請查看 Project Reactor 文件中的 反應式程式設計簡介

在本文中,我們開發一個簡單的電子商務應用程式。我們考慮一個新增產品並通知商店已將新產品新增到庫存的請求。作為負責任的開發人員,我們希望記錄特定請求所採取的所有步驟,以便在我們調查問題時,可以查看日誌並了解發生了什麼。我們將探討如何在命令式風格中實現為日誌記錄實用程式提供有關請求的上下文元數據的目標,並將其與 Project Reactor 更具功能性、聲明性的風格進行比較。下一篇文章將更詳細地探討為什麼以及我們如何需要這兩種程式設計風格之間的橋樑。

ThreadLocal

為了識別屬於特定請求的日誌,我們需要一種方法將它們關聯起來。 我們可以產生一個像這樣的簡化的隨機標識符

static long correlationId() {
  return Math.abs(ThreadLocalRandom.current().nextLong());
}

我們需要一種方法使相關標識符在日誌記錄實用程式中可用。 我們可以使相關成為我們業務邏輯中每個方法呼叫的一部分,但這會非常具有侵入性且冗長。

通常,第三方函式庫會使用 JDK 的 ThreadLocal 來傳達隱含的資訊,而這些資訊並非我們應用程式業務邏輯的主要考量。

讓我們為我們的相關標識符宣告一個靜態欄位

static final ThreadLocal<Long> CORRELATION_ID = new ThreadLocal<>();

這是我們的日誌方法。 它列印當前 Thread 名稱並格式化輸出

static void log(String message) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, CORRELATION_ID.get(), message);
}

現在我們擁有處理請求並使用隱式相關標識符記錄每個步驟所需的一切。

在每個請求的開始,應用程式會呼叫以下方法來啟動相關

static void initRequest() {
  CORRELATION_ID.set(correlationId()));
}

我們簡化的請求處理程式執行以下操作

void handleRequest() {
  initRequest();

  addProduct("test-product");
  notifyShop("test-product");
}

業務邏輯中的日誌記錄如下所示

void addProduct(String productName) {
  log("Adding product: " + productName);
  // ...
}

void notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  // ...
}

我們可以期望我們的應用程式記錄這些行

[      main][ 8592000019542134146] Adding product: test-product
[      main][ 8592000019542134146] Notifying shop about: test-product

只要特定請求的執行發生在同一個 Thread 上,且沒有與其他考量因素交錯,ThreadLocal 就可以讓我們將業務邏輯與用於日誌記錄的元數據分開。

非同步處理

讓我們想像一下,這個應用程式開始具有更高的負載,並且需要處理許多並行請求。 想像一下,我們可以採用非同步和非阻塞的伺服器實作,這需要我們提供非同步宣告,而不是命令式和阻塞步驟。

我們的請求處理程式可以傳回一個 CompletableFuture 以非同步和非阻塞的方式處理請求

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"))
    .thenRunAsync(() -> notifyShop("test-product"))
}

不幸的是,當我們執行非同步版本時,日誌不再包含相關標識符

[l-worker-1][                null] Adding product: test-product
[l-worker-1][                null] Notifying shop about: test-product

任務包裝

此問題的一個已知解決方案是包裝非同步 API 執行的任務。 透過包裝,我們指的是執行 ThreadLocal 上下文還原的實作。 建立任務時,會捕獲當前上下文。 當 worker Thread 實際執行任務時,會還原該上下文。 讓我們看看這對於我們的 Runnable 範例案例如何運作

class WrappedRunnable implements Runnable {

  private final Long correlationId;
  private final Runnable wrapped;

  public WrappedRunnable(Runnable wrapped) {
    this.correlationId = CORRELATION_ID.get();
    this.wrapped = wrapped;
  }

  @Override
  public void run() {
    Long old = CORRELATION_ID.get();
    CORRELATION_ID.set(this.correlationId);
    try {
      wrapped.run();
    } finally {
      CORRELATION_ID.set(old);
    }
  }
}

我們可以重新實作我們的處理程式,如下所示

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(new WrappedRunnable(
      () -> addProduct("test-product")))
    .thenRunAsync(new WrappedRunnable(
      () -> notifyShop("test-product")));
}

不幸的是,這會產生很多額外負荷。 幸運的是,JDK 具有一個用於執行非同步任務的 API:Executor 介面。 在真實場景中,我們會想要使用更全面的 API ExecutorService。 但是,就我們的解釋目的而言,Executor 應該足夠了。

讓我們看一下

static class WrappedExecutor implements Executor {

  private final Executor actual;

  WrappedExecutor(Executor actual) {
    this.actual = actual;
  }

  @Override
  public void execute(Runnable command) {
    actual.execute(new WrappedRunnable(command));
  }
}

讓我們重用 CompletableFuture 框架預設使用的通用 ForkJoinPool,但使用我們的實作來包裝它。 現在我們的程式碼如下所示

static Executor executor = new WrappedExecutor(ForkJoinPool.commonPool());

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"), executor)
    .thenRunAsync(() -> notifyShop("test-product"), executor);
}

我們的日誌再次正常運作

[l-worker-1][ 7321040639376081961] Adding product: test-product
[l-worker-2][ 7321040639376081961] Notifying shop about: test-product

在某些情況下,CompletableFuture 框架可以提供以非阻塞方式處理非同步任務的方法。 但是,在許多情況下,有限的 API 表面及其行為特徵可能會受到限制。 例如,我們可能想要延遲處理,並在我們的系統達到其容量時稍後恢復。 使用 CompletableFuture,所有建立的實例都會在建立後立即開始計算。 我們可能還想要在資料流之上應用更精細的操作,而不是對單個計算單元進行操作。 出於某些原因,我們可能會考慮使用反應式程式設計函式庫。 我們將考慮 Project Reactor,它是 Spring 產品組合中的預設反應式實作。

Project Reactor

為了提供一個彈性的非同步處理框架,Java 社群提出了 Reactive Streams 規範。 它有助於建立一個 JDK 之前遺漏的通用詞彙 - 信號傳播、錯誤處理、終止和生命週期管理的明確語義。 它還允許內建反壓力。 Spring 透過引入 WebFlux 採用了這種方法,使 Project Reactor 及其反應式類型成為 API 的一等公民。

Reactive Streams 為非同步流處理帶來了優雅且簡約的解決方案。 但是,上下文傳播不是規範的一部分。 反應式函式庫的非阻塞和非同步性質,以及潛在的複雜實作,使得使用 ThreadLocal 極為困難。 原因在於無法保證哪個 Thread 可以執行使用者的程式碼。 允許實作執行各種優化,只要它們保證串行傳遞,從而使使用者的程式碼與並行無關,從而將處理並發的負擔轉移到函式庫內部。

為了實現其保證,Java 中的反應式程式設計假設使用函數式程式設計範例來形成聲明式和可組合的流程,該流程與不同的 Thread 可以執行使用者提供的程式碼的事實無關。 只要使用者程式碼中沒有假設在特定 Thread 中執行的副作用,反應式函式庫就可以提供極高性能的運行時,同時符合規範。 ThreadLocal 明顯違反了此要求。

讓我們嘗試重寫我們的處理程式以使用 Project Reactor。 個別操作變為

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

讓我們嘗試使用上面的程式碼

Mono<Void> handleRequest() {
  initRequest();
  log("Assembling the chain");

  return Mono.just("test-product")
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product))
      .then())
}

我們簡單的實作會產生所需的輸出

[      main][ 7224499961623309444] Assembling the chain
[      main][ 7224499961623309444] Adding product: test-product
[      main][ 7224499961623309444] Notifying shop about: test-product

上面的實作是在 main Thread 中呼叫的,並且執行僅限於該 Thread。 但是,我們不應該做這樣的假設。

在處理程式中,我們在傳播處理結果之前引入了輕微的延遲。 我們這樣做是為了展示幕後發生的隱式 Thread 切換。

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product))
      .then())
}

運行時,會列印以下內容

[      main][ 6265915299594887150] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

發生了什麼事? 為什麼一個日誌具有相關標識符,而其他日誌沒有?

當伺服器呼叫我們的處理器時,<1> 的初始化會設定 ThreadLocal 關聯識別符,而 <2> 的日誌就能夠使用它。 有反應式程式設計經驗的人可以告訴你問題在於執行發生在不同的階段。 ThreadLocal 是在組裝時設定的。 “你也應該在訂閱時恢復它”會是一個建議。 我們稍後會再回到這一點。 如果「組裝」、「訂閱」和「執行時間」這些術語讓你感到困惑,請查看 Simon 的部落格文章中的精彩解釋,或觀看 同名演講

雖然方法會立即返回,但它不保證執行已開始。 這是因為返回的 Mono 必須被訂閱才能觸發處理。 它可能會在不同的 Thread 中發生。 <3>delayElement 運算符隱式地使用 Reactor 中共享的 Scheduler(一個 Thread 池的抽象)在指定的延遲後在另一個 Thread 上傳遞信號。 該信號傳播到下游運算符,這讓我們可以先對產品進行排序添加,然後再通知商店。 我們組裝的管道還有更多令人驚訝的方面,但不要太混淆了。

問題是,在 <4> 中,如果我們記錄,我們真的無法判斷呼叫將在哪個 Thread 上發生。 諸如 flatMap 之類的運算符可以引入它們自己的異步性。

在正常情況下,當鏈被訂閱時,值開始被傳遞。 因此,我們可以在每次訂閱時恢復 ThreadLocal 值。 但這並不總是最好的主意。 Subscription 可以異步地在不同的 Thread 上傳遞。 此外,值可以在不同的 Thread 上傳遞。 在背壓的情況下,信號可以作為在執行請求的 Thread 上請求更多數據的結果而傳遞,而不是由數據的 Publisher 使用的 Thread。 很多移動部件和需要考慮的怪癖! 要了解有關 Reactor 中線程和異步執行的更多信息,請查看 我們之前部落格文章系列的另一部分

Reactor Context (Reactor 內容)

Project Reactor 引入了一種機制,該機制與函數式程式設計非常吻合,可以提供傳輸上下文元數據的方法。 它簡稱為 Context(內容)。 儘管幕後發生了線程切換,但它仍然附加到反應式鏈。

正如我們所見,Project Reactor 允許宣告式地指定意圖,同時保持對並發性的不可知性。 當必要時,它確實提供了控制並發性的方法,通過使用專用的運算符或配置參數(例如 publishOnsubscribeOnflatMap 的高級參數),但該級別的控制從核心處理邏輯中抽象出來。

我們之前提到了副作用。 我們如何擺脫這些副作用,並且仍然能夠傳輸上下文元數據?

為了更好地與函數式程式設計配合使用,Context 被綁定到 Subscriber(訂閱者),即 Publisher(發布者)發出的信號的消費者。 訂閱後,Subscriber 對於組裝管道中的所有先前運算符都是可見的。 當我們將不可變的類似 Map 的數據結構與 Subscriber 實例相關聯時,它允許在反應式管道的部分中附加和檢索上下文信息。

通過控制影響以及提供反應式鏈中步驟之間繼承的方法,Reactor Context 是一個無副作用的概念,可用於為處理提供元信息。 “這正是我們關聯請求所需要的!”。

讓我們重寫我們的應用程式以使用 Reactor Context 而不是 ThreadLocal

首先,我們需要使關聯識別符成為日誌方法的顯式參數

static void log(String message, long correlationId) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, correlationId, message);
}

我們的行動如下

Mono<Void> addProduct(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Adding product: " + productName, ctx.get("CORRELATION_ID"));
    return Mono.empty(); // Assume we’re actually storing the product
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Notifying shop about: " + productName,
      ctx.get("CORRELATION_ID"));
    return Mono.just(true);
  });
}

有趣的是我們如何提供關聯識別符。 我們使用一個特殊的運算符 Mono.deferContextual,它可以訪問 Context。 從 ContextView(一個簡化的、只讀的 Context 版本)中,我們提取關聯識別符,然後返回一個實際的 Mono 供調用者訂閱。

我們的處理器如下所示

Mono<Void> handleRequest() {
  long correlationId = correlationId();
  log("Assembling the chain", correlationId);

  Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(addProduct(product), notifyShop(product))
          .then())
    .contextWrite(Context.of("CORRELATION_ID", correlationId));

訂閱後,輸出如預期

[      main][ 6328001264807824115] Assembling the chain
[parallel-1][ 6328001264807824115] Adding product: test-product
[parallel-1][ 6328001264807824115] Notifying shop about: test-product

信息的反轉流是顯而易見的。 與任何反應式鏈一樣,我們通過組裝運算符鏈來定義處理流程。 一旦我們(或實際上是伺服器)訂閱了這個鏈,信息就會從下游運算符流向上游運算符以啟動處理。 之後,實際的數據信號從上游傳遞到下游 – 例如,“test-product” 值傳輸到 flatMap 運算符,然後傳輸到 concat 運算符,後者又將該值提供給 addProductnotifyShop。 由於這種邏輯流,我們在最後(使用 contextWrite 方法)編寫 Context,就在任何 Subscriber 訂閱鏈之前。 我們可以想像 Context 然後與 Subscriber 一起對上游運算符中的所有階段都可用。

無論反應式管道在執行用戶的業務邏輯的過程中進行了多少線程跳躍,上下文都不會丟失。

您可以在 我們的文檔中閱讀有關 Reactor Context 的更多信息。

第三方函式庫

不幸的是,我們不能期望第三方函式庫使用 Reactor Context 來提供可觀察性功能。 傳播隱式元信息的既定貨幣是 ThreadLocal。 像 SLF4J 這樣的函式庫使用命令式風格,並且在 Java 社群中具有穩定的地位。 如果我們可以讓它們與反應式範例一起使用,而不是期望它們適應它,那將是一個明顯的勝利。 在下一部分中,我們將討論在 Spring Cloud Sleuth(一個可以與 Reactor 一起使用的追蹤函式庫)中傳播反應式鏈中 ThreadLocal 值的歷史和挑戰。

取得 Spring 電子報

透過 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進制文件。

瞭解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部