領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多如果您在 Spring One Platform 2019 大會上聽過 Oleg Zhurakousky 關於 Spring Cloud Stream 和 Functions 的演講,或者閱讀了他最近關於 簡化 Spring Cloud Stream 和 函數式 Spring Cloud Stream 的部落格文章,您可能會想說:「等等!Spring Integration 支援發生了什麼事?現在我的 @ServiceActivator
或 IntegrationFlow
該怎麼辦?我過去習慣使用 Sink.input()
作為通道,以使用一些 Spring Integration 邏輯來消費綁定目的地!」正如 Oleg 在他的部落格文章中提到的,使用現有的 @EnableBinding
等仍然是可能的,但我們正在逐漸擺脫那種模型,那麼我們如何在函數式 Spring Cloud Stream 的世界中仍然受益於 Spring Integration 的所有功能呢?
在這篇部落格文章中,我將在 Spring Integration 的背景下擴展 Spring Cloud Stream 的函數式功能,以及它在現代基於函數的串流中的重要性!
是的,我們確實可以建立一個簡單的 Function
橋樑,它會呼叫 MessageChannel.send()
,但我們也可以使用 Spring Integration 中的 Messaging Gateway 抽象概念來做到,如下所示
@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
extends Function<Message<InputData>, Message<OutputData>> { }
假設產生的 bean 是 java.util.function.Function
的擴展,它是一個完全有效的 Spring Cloud Function 和 Spring Cloud Stream 綁定候選者。Spring Cloud Stream 使用其泛型輸入/輸出參數類型來在之前和之後執行正確的有效負載轉換。此外,標頭會從 binder 傳遞到下游整合流程並返回。這很好,但我們仍然需要注意通道,並提供一些 SI 特定的註解,以將此閘道與我們的流程連接起來(樣板程式碼)。
使用 Spring Integration 的 Java DSL,我們可以更進一步,減少更多樣板程式碼,同時獲得使用函數式 Spring Cloud Stream 的好處。我們需要的是相同的 gateway
方法,但採用 DSL 風格。Oleg 部落格文章中的 uppercase
範例在使用 Spring Integration 的情況下會如下所示
@SpringBootApplication
public class SampleApplication {
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(Function.class,
gateway -> gateway.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.get();
}
}
使用 Spring Integration 實作大寫轉換用例非常簡單,但想像一下,我們需要執行一些硬邏輯,例如 split
、scatter-gather
與外部服務的並行呼叫,然後 aggregate
,執行一些稽核,並且僅在最後,從我們的函數返回結果到輸出目的地。所有這些以及更多都可以使用 Spring Integration、其 EIP 支援、Java DSL 抽象概念以及當然還有上述函數包裝器來實作。
java.util.function.Consumer
和 java.util.function.Supplier
介面可以以類似的方式使用,並根據其合約在它們周圍的閘道代理中使用適當的邏輯。
您可以在 Spring Integration 參考手冊中查看有關函數支援的更多資訊。
我們之前展示的所有內容都是關於命令式函數,它們是根據每個事件觸發的。反應式函數僅觸發一次,方法是將整個事件串流作為 Flux
傳遞到函數中。Spring Integration 中的反應式串流支援可協助您編寫反應式 Spring Integration 流程,這些流程可以作為 Spring Cloud Stream 中的函數公開。
以下範例示範如何圍繞反應式 Spring Integration 呼叫建置反應式函數包裝器
public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows.from(FluxFunction.class,
gateway -> gateway.beanName("uppercase"))
.handle(RSockets.outboundGateway("/uppercase")
.command(RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
雖然使用 RSocket 實作 uppercase
仍然很簡單,但此範例的目標是讓您了解如何使用 Spring Integration 解決更複雜的用例。
在這裡,我們取得傳遞到函數中的 Flux
,並將其傳播到 RSocket 請求者以進行 request channel
互動模型。結果 Flux
會透過 Spring Integration 內部的 replyChannel
標頭傳遞回函數傳回。
另一個反應式範例可能是將資料從推播模型傳輸到拉取模型。換句話說,將事件串流表示為 Supplier
@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
.toReactivePublisher();
}
@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
Publisher<Message<byte[]>> httpRequestPublisher) {
return () -> Flux.from(httpRequestPublisher);
}
透過這種方式,傳入的 HTTP 請求會落在來源 Flux
中,以便由輸出 binder 目的地拉取下游,同時遵守背壓和其他反應式串流需求。
有關 Spring Integration 中反應式串流支援的更多資訊,請參閱參考手冊。
Spring Integration 仍然是 Spring Cloud Stream 微服務開發的重要組成部分。其函數式支援允許將屬於企業整合模式類別的複雜用例公開為 Java 函數,從而在 Spring Cloud Stream 中提供一致的執行模型。實際上,透過使用此基礎,Spring Cloud Stream App Starters 最終將被函數實作取代。
請隨時提供任何意見回饋!
附註:對於那些對 Kotlin 不耐煩的人,我想分享一個最近啟動的 Spring Integration Kotlin DSL 專案。