領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多Spring Cloud Stream 最近新增了一個功能,可將函式定義組合成現有的 Spring Cloud Stream 應用程式。 在此部落格中,我們將了解 Spring Cloud Data Flow 如何利用此功能在串流管線中組合函式。
在 Spring Cloud Data Flow 中,串流資料管線由 Spring Cloud Stream 應用程式組成。 開發人員可以挑選和選擇現成的 串流應用程式,涵蓋許多常見的用例。 開發人員也可以使用 Spring Cloud Stream 架構擴充這些現成的應用程式,或建立自訂應用程式。
Spring Cloud Stream 2.1.0 GA的發佈包含Spring Cloud Function的整合,基於程式設計模型,允許業務邏輯表示為java.util.Function
、java.util.Consumer
和java.util.Supplier
,分別代表 Processor
、Sink
和 Source
的角色。 鑑於這種靈活性,Spring Cloud Stream 架構現在支援一種簡單但功能強大的函式組合方法。 在這種情況下,組合可以是將來源和處理器組合成單一應用程式:一個「新來源」。 否則,它可以是將處理器 + Sink 組合成單一應用程式:「一個新的 Sink」。這種靈活性為串流應用程式開發人員開闢了有趣的新機會。
讓我們考慮如何建立一個管線來使用三個應用程式執行簡單的轉換,然後看看如何使用使用函式組合的兩個應用程式將其實現為管線。
對於第一個串流,
我們將使用現成的http-source
、transform-processor
和log-sink
應用程式。
第一步,啟動 Spring Cloud Data Flow local
伺服器
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar
現在,啟動 Spring Cloud Data Flow shell
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar
現在讓我們註冊 HTTP 來源、轉換器處理器和使用 RabbitMQ 繫結器的日誌 Sink 應用程式
dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar
現在我們可以建立一個沒有函式組合的簡單串流
dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"
然後我們可以部署串流
dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://127.0.0.1:9000"
POST (text/plain) https://127.0.0.1:9000 friend
202 ACCEPTED
您可以在 log
應用程式中看到以下日誌訊息
[sformer.hello-1] log-sink : Hello FRIEND
在這個串流中,我們在目標平台上將 http (來源)、transformer (處理器) 和 log (Sink) 應用程式部署為獨立應用程式(在本例中為 local
)。 對於某些用例,對於簡單的有效負載轉換邏輯,我們可能希望將 Processor
應用程式與 Source
或 Sink
應用程式結合使用。 例如,在來源輸出資料中遮罩某些特定使用者特定欄位之類的轉換情境,不一定需要部署為單獨的獨立應用程式。 相反,它可以在來源或 Sink 應用程式中組成。
為了將 Processor 函式組合到 Source 或 Sink 應用程式中,我們使用 Spring Cloud Stream 的函式組合支援。
Spring Cloud Stream 中的函式組合支援基於 Spring Cloud Function 的能力,允許註冊 java.util.Supplier
、java.util.Consumer
和 java.util.Function
作為 Spring @Bean
定義。 這些函式 @Bean
定義可用於在執行時間進行組合。
Spring Cloud Stream 引入了一個新屬性,稱為 spring.cloud.stream.function.definition
,它對應於 Spring Cloud Function 中的函式定義 DSL。 設定此屬性後,所需的函式 Bean 會在執行時間自動連結。
函式組合以下列方式發生
當 Spring Cloud Stream 應用程式的類型為 Source
時,組合函式會在來源 output
之後套用。
當 Spring Cloud Stream 應用程式的類型為 Sink
時,組合函式會在 Sink input
之前套用。
這提供了一種將函式(在 Spring Cloud Function DSL 中定義)組合成現有 Spring Cloud Stream 應用程式的能力,隨後讓 Spring Cloud Data Flow 在串流資料管線中協調它。
讓我們建立和部署一個串流,將先前範例的轉換器表示式組合成 Source
應用程式本身。 轉換器邏輯是透過使用兩個 java.util.Function
實作完成的。
我們將建立一個新的來源應用程式,我們將其稱為 http-transformer
,它擴充了現成的 http 來源應用程式。 新來源應用程式的來源可以在這裡找到。
http-transformer
應用程式包含 upper
和 concat
函式 Bean,如下所述
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
public static void main(String[] args) {
SpringApplication.run(HttpSourceRabbitApplication.class, args);
}
}
在複製 github repo 之後,您可以使用 maven 建置應用程式
cd function-composition/http-transformer ./mvnw clean package
現在使用 Data Flow Shell 註冊 http-transformer
應用程式。
注意
對於下面的應用程式註冊
--uri
選項,請將目錄名稱和構件的路徑替換為適合您系統的值。
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
現在讓我們建立串流
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"
在部署串流時,我們傳遞 spring.cloud.stream.function.definition
屬性來定義組成的函式 DSL(如 Spring Cloud Function 中定義)。 在這種情況下,它是
dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
上面的部署將 upper
和 concat
函式 Bean 組合成 http
來源應用程式。
然後我們可以將有效負載傳送到 http
應用程式
dataflow:>http post --data "friend" --target "https://127.0.0.1:9001"
> POST (text/plain) https://127.0.0.1:9001 friend
> 202 ACCEPTED
然後您可以在 log
應用程式中看到輸出,如下所示:
[helloComposed-1] log-sink : Hello FRIEND
注意
請注意,函式組合支援不適用於現成的 Spring Cloud Stream Processor
應用程式,因為函式是否需要在現有處理器應用程式邏輯之前或之後套用存在歧義。
但是,您可以建立自己的處理器應用程式,這些應用程式使用具有標準 java.util.Function API 的函式組合,如下面的範例所示
@Configuration
public static class FunctionProcessorConfiguration {
@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
}
然後您需要使用以下屬性部署:spring.cloud.stream.function.definition=upperAndConcat
另一個有趣的功能是 Spring Cloud Function 支援 Kotlin 函式的函式組合。 這讓我們可以將任何 Kotlin 函式 Bean 新增到 Source
或 Sink
應用程式的可組合函式中。
為了看到這個工作,讓我們使用我們範例 github 儲存庫中的 http-transformer-kotlin-processor
應用程式。
Kotlin 函式 Bean 被配置為處理器。 在這裡,Kotlin 函式 Bean 是 transform
函式,定義如下
@Bean
open fun transform(): (String) -> String {
return { "How are you ".plus(it) }
}
此外,這個專案有 spring-cloud-function-kotlin
作為依賴項,用於將函式配置支援應用於 Kotlin 函式,定義如下
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
cd function-composition/http-transformer-kotlin ./mvnw clean package
注意
對於下面的應用程式註冊
--uri
選項,請將目錄名稱和構件的路徑替換為適合您系統的值。
dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar
要使用此應用程式作為 Source
建立串流
dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"
如同我們在 http-transformer
範例中所做的,我們可以利用 spring.cloud.stream.function.definition
屬性來指定任何有效的組合函式 DSL,以建構函式組合。在這個例子中,讓我們將透過 Java 設定註冊的函式 Bean 與來自 Kotlin 處理器設定的函式 Bean 結合起來。
dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"
這裡,函式名稱 transform
對應到 Kotlin 函式。
注意:我們可以執行 Kotlin 函式和 Java 函式之間的組合,因為 Kotlin 函式在內部會轉換成 java.util.Function
。
dataflow:>http post --data "friend" --target "https://127.0.0.1:9002"
> POST (text/plain) https://127.0.0.1:9002 friend
> 202 ACCEPTED
而且,您可以在 log
應用程式中看到輸出如下:
[omposedKotlin-1] log-sink : Hello How are you FRIEND
在這個範例中,http-transformer
也包含了函式的原始碼。然而,您可以透過在一個獨立的 artifact 中定義函式 Bean 來使應用程式更加模組化。然後,您可以透過僅在專案中新增一個 Maven 依賴項並設定 spring.cloud.stream.function.definition
屬性來建置應用程式。透過這種方式,您可以將您的大部分商業邏輯編碼為函式,並且在必要時,可以將它與 Source 或 Sink 組合。