Spring Cloud Stream - 函數式與反應式

工程 | Oleg Zhurakousky | 2019 年 10 月 17 日 | ...

先前的文章中,我試圖說明我們在 Spring Cloud Stream (SCSt) 中轉向函數式程式設計模型的原因。它程式碼更少配置更少。但最重要的是,您的程式碼完全與 SCSt 的內部結構解耦且獨立

在這篇文章中,我將更深入地探討,並總結我們函數式支援的核心功能,特別是圍繞其反應式功能。

重要提示:您可以使用 @StreamListener/@EnableBinding 做任何事情,也可以不用它。換句話說,函數式支援現在在功能上與基於註釋的支援相容。

雖然下面描述的所有功能都是 Spring Cloud Function (SCF) 的功能(SCSt 的依賴項),但必須注意某些細微差別,才能理解函數在 SCSt 環境中的附加意義。

Supplier、Function 和 Consumer

任何類型為 SupplierFunctionConsumer 的 bean,或任何可以映射到 SupplierFunctionConsumer 的 bean(例如 POJO 函數、Kotlin lambdas 等)都被 SCSt 視為訊息處理器FunctionConsumer)或訊息來源Supplier)。根據所使用的函數式策略類型,輸入和輸出綁定會自動產生,使用以下命名慣例 <function-name>-<in/out>-<index>

請考慮以下範例

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

前面的函數被視為一個訊息處理器,它從 uppercase-in-0 消費並發送到 uppercase-out-0 綁定。其餘的現有 stream 屬性可以像以前一樣使用。例如 --spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain

點擊這裡以獲取更多詳細資訊和配置選項。

###命令式或反應式函數可以是命令式反應式。 命令式函數在每個單獨的事件上觸發,而反應式函數觸發一次,傳遞對整個事件流抽象的引用(例如 FluxMono),這些抽象由 Project Reactor 提供。

考慮以下一對範例

命令式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

反應式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

除了為您提供不同的(單子)程式設計風格來處理事件(這很容易被視為偏好問題)之外,反應式程式設計還為某些使用案例增加了額外的價值,否則這些使用案例的實作將相當複雜。雖然本篇文章的範圍不包括詳細討論這些使用案例或反應式模式,但狀態管理案例(例如視窗化、聚合和分組)仍然值得一提,以及分割串流或合併多個串流的案例,我將在下一節中討論這些案例。

關於反應式函數的綁定和命名規則,它們與前一節中解釋的相同。

注意:雖然前面的範例使用 Function 作為範例,但相同的規則適用於 SupplierConsumer。 使用者指南的Spring Cloud Function 支援部分以及 Reactor 文件提供了更多詳細資訊。

###函數元數 有時需要對資料串流進行分類和組織。 例如,考慮一個經典的大數據使用案例,處理包含(假設)「訂單」和「發票」的非結構化資料,並且您希望每個資料都進入單獨的資料儲存。 這就是函數元數(具有多個輸入和輸出的函數)支援發揮作用的地方。

讓我們看一個這樣的函數的範例(完整的實作細節可以在這裡找到),

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

鑑於 Project Reactor 是 SCF 的核心依賴項,我們正在使用其 Tuple 程式庫。 Tuples 通過向我們傳達基數類型資訊,為我們帶來了獨特的優勢。 兩者在 SCSt 的環境中都極其重要。 基數讓我們知道需要建立多少輸入和輸出綁定,並將其綁定到函數的相應輸入和輸出。 了解類型資訊可確保正確的類型轉換。

此外,這也是綁定名稱命名慣例中的「index」部分發揮作用的地方,因為在此函數中,兩個輸出綁定名稱為 organise-out-0organise-out-1

重要提示:目前,函數元數僅支援以複雜事件處理為中心的反應式函數(Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>),其中對事件合流進行評估和計算通常需要檢視事件串流,而不是單個事件。

如需更多最新資訊,請閱讀使用者指南的具有多個輸入和輸出引數的函數部分。

獲取 Spring 電子報

訂閱 Spring 電子報以保持聯繫

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

檢視全部