案例研究:彙總函式與處理器

工程 | Artem Bilan | 2020 年 10 月 26 日 | ...

本文是一系列部落格文章的一部分,探討基於 Java Function 重新設計的 Spring Cloud Stream 應用程式。 在這篇文章中,我們將研究 Aggregator 函式及其與 Splitter 函式的關係。 我們將了解如何自訂預設行為。 我們還將了解為 aggregator 配置共享訊息儲存的重要性。

以下是本部落格系列的所有先前部分。

Aggregator 函式

Aggregator 函式是 Spring Integration 中 [AggregatingMessageHandler](https://spring-docs.dev.org.tw/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的基礎,它繼承了其大部分功能,並將常用的 aggregator 選項公開為配置屬性。 有關更多資訊,請參閱 AggregatorFunctionProperties(或下一節)。 aggregator 函式是完全反應式的,並定義為 Function<Flux<Message<?>>, Flux<Message<?>>。 這是因為 aggregator 邏輯不需要它立即產生回覆。 相反,它將目前的訊息儲存在訊息儲存中,與其他訊息分組以進行收集或簡化為某些結果,直到滿足釋放結果所需的條件。 這樣,將輸入訊息作為串流 (Flux) 並讓它們透過 aggregator 組合為輸出串流(也是 Flux)感覺很自然。 因此,我們只需要 subscribe() 到 Aggregator 函式的結果即可啟動流程。 實際上,當我們在 Spring Cloud Stream 應用程式中使用這種反應式函式時,這正是自動發生的:當我們使用時,框架會為我們建構一個來自輸入目標的訊息 Flux,並在輸出目標上處理產生的 Flux

用法

通常,aggregator 與 Splitter 結合使用,後者將單一輸入訊息轉換為多個輸出訊息,包括一些序列詳細資訊標頭。 在經過一些個別項目處理(轉換、豐富等)之後,我們新增一個 aggregator 以將這些項目組合回單一訊息。 所提及的序列詳細資訊標頭用作預設關聯和釋放策略,以將訊息儲存在群組中,並決定何時以及如何組合和產生單一訊息。 使用函式組合建置這種處理邏輯感覺很自然,我們稍後將討論它。 但現在,讓我們想像一下(為了簡單起見)我們有一些想要組合到單一訊息中的資料!

首先,我們需要在我們的 Spring Boot 專案中新增 aggregator 函式的依賴項

<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>aggregator-function</artifactId>
</dependency>

就是這樣! aggregator 函式 bean 將被自動配置,足以讓我們將函式自動裝配到我們的程式碼中並使用它

@Autowired
Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;
...
Flux<Message<?>> input =
        Flux.just(MessageBuilder.withPayload("2")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build(),
         MessageBuilder.withPayload("1")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build());

Flux<Message<?>> output = this.aggregatorFunction.apply(input);

output.as(StepVerifier::create)
            .assertNext((message) ->
                    assertThat(message)
                            .extracting(Message::getPayload)
                            .isInstanceOf(List.class)
                            .asList()
                            .hasSize(2)
                            .contains("1", "2"))
            .thenCancel()
            .verify();

在此程式碼片段中,我們示範了如何將具有預定義序列詳細資訊的兩個簡單訊息組合為單一 List<String>。 所有繁重的工作都在 aggregatorFunction 及其預設關聯和釋放策略中完成。 這也包括預設群組組合器選項,該選項從已發布群組中的訊息建置有效負載清單。

我們將在下一節中查看更複雜的使用案例和配置選項。

持久性狀態儲存

我們在應用程式中處理和操作的資料和資訊實際上是應用程式最重要的部分。 我們需要三思而後行,何時將資料保存在記憶體中,而不是某些外部儲存中。 在大多數情況下,我們將使用某些資料庫作為狀態儲存和/或訊息傳遞中介軟體,以防止生產者和消費者之間的資料遺失。 作為額外的好處,這使叢集中的不同實例可以存取共享儲存,以實現流暢的分散式運算。

Aggregator 函式不需要持久性狀態儲存即可工作,但在生產環境中是避免資料遺失並確保故障轉移所必需的。

設定

Aggregator 函式的配置選項 (AggregatorFunctionProperties) 非常簡單,並且與 [AggregatingMessageHandler](https://spring-docs.dev.org.tw/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的高階選項完全一致。 它們是

  • correlation - 一個 SpEL 運算式,用於從傳入的訊息中確定關聯鍵(群組 ID)。 此類運算式(如果提供)會為基礎 AggregatingMessageHandler 建置一個 ExpressionEvaluatingCorrelationStrategy。 依預設(未提供時),AggregatingMessageHandler 使用 HeaderAttributeCorrelationStrategy,該策略基於 IntegrationMessageHeaderAccessor.CORRELATION_ID - 一個標頭,可以由 splitter、PublishSubscribeChannel 或收件者清單路由器在上游填充。

  • release - 一個 SpEL 運算式,用於確定是否應釋放已儲存的訊息群組並將其作為輸出訊息發出。 此類運算式(如果提供)會為基礎 AggregatingMessageHandler 建置一個 ExpressionEvaluatingReleaseStrategy。 依預設(未提供時),AggregatingMessageHandler 使用 SimpleSequenceSizeReleaseStrategy,該策略基於儲存的群組大小和 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE - 一個標頭,可以由 splitter、PublishSubscribeChannel 或收件者清單路由器在上游填充。

  • aggregation - 一個 SpEL 運算式,用於從已發布的訊息群組建置輸出結果。 此運算式(如果提供)有助於基礎 AggregatingMessageHandlerExpressionEvaluatingMessageGroupProcessor。 依預設(未提供時),AggregatingMessageHandler 使用 DefaultAggregatingMessageGroupProcessor,該處理器僅將群組中訊息的有效負載組合到 List 中並合併其標頭。

  • groupTimeout - 一個 SpEL 運算式,用於排程一個背景工作,以便在沒有更多訊息到達時使群組過期。 有關此選項的更多資訊,請參閱 Spring Integration

  • messageStoreType - 來自 AggregatorFunctionProperties.MessageStoreType 常數類別的值,指示要使用哪個 MessageGroupStore 實作來儲存訊息,直到釋放其群組。 支援的 MessageGroupStore 有:ConfigurableMongoDbMessageStoreRedisMessageStoreGemfireMessageStoreJdbcMessageStoreSimpleMessageStore,它是預設的,並將訊息儲存在記憶體中。 這是最重要的選項,應根據目標環境和可用的持久性儲存來選擇。 當 Aggregator 函式作為叢集實例部署時(例如,透過 Spring Cloud Data Flow,當用作 aggregator-processor 的一部分時),它具有更大的價值,因此共享狀態時,您可能會在一個實例上將訊息產生到 aggregator 中,但它們可以在不同的實例上發布。 這樣,您就不會在應用程式崩潰時遺失訊息。 MessageGroupStore 實作的依賴項被打包到最終函式 uber jar 中,並根據此選項自動配置。 唯一的區別是 JDBC,我們必須根據目標環境要求提供適當的驅動程式。 有關 MessageGroupStore 抽象的更多資訊,請參閱 Spring Integration 系統管理 和上一篇部落格文章,了解如何 提供 JDBC 驅動程式。 那些持久性儲存的所有配置選項與 Spring Boot 為我們自動配置它們提供的選項相同。

  • messageStoreEntity - 此選項僅特定於某些 MessageGroupStore 實作:它指的是 Gemfire/Geode 的用戶端區域; JDBC 的表格前綴; MongoDB 的集合名稱。 對於其餘實作,它會被忽略。

有關這些元件的更多資訊,請參閱 Spring IntegrationStream Applications 專案中相應的函式實作(如果有的話)。

所以,如果我們想要執行一個 Aggregator 函數 (作為獨立程式、作為 Spring Cloud Stream 處理器 或作為 Spring Cloud Data Flow 串流定義的一部分),並帶有一些自定義屬性,且針對共享的 MongoDB 儲存,我們可以這樣宣告它:

java -jar aggregator-processor-kafka-3.0.0-SNAPSHOT.jar --aggregator.correlation=T(Thread).currentThread().id --aggregator.release=!messages.?[payload == 'bar'].empty --aggregator.aggregation=#this.?[payload == 'foo'].![payload] --aggregator.messageStoreType=mongodb --aggregator.message-store-entity=aggregatorTest --spring.data.mongodb.uri=mongodb://127.0.0.1/test

其中這些屬性的值如下:

  • aggregator.correlation - 消費者執行緒 ID 作為訊息分組的鍵值;

  • aggregator.release - 一個針對訊息群組的 SpEL 表達式,只有在 bar payload 到達時才釋放它;

  • aggregator.aggregation - 一個 SpEL 表達式,用於選擇和投射訊息群組集合,其中只有具有 foo payload 的訊息會被組合到最終結果中;

  • aggregator.messageStoreType - 使用 MongoDb MessageGroupStore 實作;

  • aggregator.message-store-entity - MongoDb 資料庫中的集合名稱;

  • spring.data.mongodb.uri - MongoDb 資料庫連線。

即使我們在自定義 Spring Boot 應用程式中將此函數與其他函數組合,這些配置屬性設定仍然不受影響。 請參閱下一節以獲取更多信息。

組合

Aggregator 函數本身在生產解決方案中可能沒有意義。 當然,在大多數情況下,它與其他上游和下游函數結合使用。 如前所述,通常將 aggregator 與 splitter 預處理組合在一起。 可以使用 Java API 透過 Function.andThan()Function.compose() 以程式方式組合這些函數,但是,由於這些方法的類型非常嚴格,我們需要在之間執行一些中間轉換以滿足函數的輸入和輸出。 借助 Spring Cloud Function 函式庫,我們可以繞過編寫各種轉換的繁瑣程式設計工作,同時保留所需的序列細節訊息標頭。 我們寧願依賴框架中的類型推斷和開箱即用的轉換能力,並儘可能保持我們的組合簡單。

假設我們有如下的 JSON 輸入:

{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      {
        "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      {
        "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ]
  }
}

我們的任務是提供以逗號分隔的單一字串作為書名。

我們只需要將三個開箱即用的函數組合到一個 Spring Cloud Function (或 Stream) 應用程式中。 這些函數的依賴項是:splitter-functionspel-function 和我們的 aggregator-function。 此類應用程式的配置屬性可能如下:

spring.cloud.function.definition=splitterFunction|spelFunction|aggregatorFunction splitter.expression=#jsonPath(payload,'$.store.book') spel.function.expression=title aggregator.aggregation=T(org.springframework.util.StringUtils).collectionToCommaDelimitedString(#this.![payload])

我們可以將類似的串流定義和配置用於 Spring Cloud Data Flow。 唯一的區別是,來自/到函數的訊息將透過 binder 傳輸,使用預先建置的處理器應用程式。 您實際上可以在類似 Mongo DB source 中使用此組合。 使用 Spring Cloud Data Flow 時要記住的另一點是,Aggregator 函數與類型無關,並且使用具有 byte[] payload 的訊息。 如果您計劃針對 payload 執行一些複雜的邏輯,如上面的表達式中所示,您可能需要將此函數與一個上游函數組合,以將 byte[] payload 轉換為域物件或其他相容類型,例如 HashMap。 如果 payload 是 JSON 表示形式,則始終可以使用我們上面為 splitter 表達式顯示的 #jsonPath() SpEL 函數來存取它。

請參閱本系列之前的一篇部落格文章,以獲取有關 函數組合 的更多資訊。

結論

此部落格文章詳細介紹了 Aggregator Function 及其在 Spring Cloud Stream Aggregator Processor 中的使用方式。 我們還研究了如何使用此函數的配置屬性。 然後,我們深入研究了在獨立應用程式中使用 aggregator 的幾種變體,一路探索了各種功能。 最後,我們看到了如何在訊息之間輕鬆切換 aggregator 中持久狀態的 MessageGroupStore 實現。

敬請關注

本系列將繼續。 在接下來的幾週內,我們將研究更多函數和應用程式。

獲取 Spring 新聞通訊

隨時關注 Spring 新聞通訊

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部