Spring Cloud Data Flow 中的組合函式支援

工程 | Ilayaperumal Gopinathan | 2019 年 1 月 9 日 | ...

Spring Cloud Stream 最近新增了一個功能,可將函式定義組合成現有的 Spring Cloud Stream 應用程式。 在此部落格中,我們將了解 Spring Cloud Data Flow 如何利用此功能在串流管線中組合函式。

它有什麼不同?

在 Spring Cloud Data Flow 中,串流資料管線由 Spring Cloud Stream 應用程式組成。 開發人員可以挑選和選擇現成的 串流應用程式,涵蓋許多常見的用例。 開發人員也可以使用 Spring Cloud Stream 架構擴充這些現成的應用程式,或建立自訂應用程式。

Spring Cloud Stream 2.1.0 GA的發佈包含Spring Cloud Function的整合,基於程式設計模型,允許業務邏輯表示為java.util.Functionjava.util.Consumerjava.util.Supplier,分別代表 ProcessorSinkSource 的角色。 鑑於這種靈活性,Spring Cloud Stream 架構現在支援一種簡單但功能強大的函式組合方法。 在這種情況下,組合可以是將來源和處理器組合成單一應用程式:一個「新來源」。 否則,它可以是將處理器 + Sink 組合成單一應用程式:「一個新的 Sink」。這種靈活性為串流應用程式開發人員開闢了有趣的新機會。

讓我們考慮如何建立一個管線來使用三個應用程式執行簡單的轉換,然後看看如何使用使用函式組合的兩個應用程式將其實現為管線。

具有三個應用程式的串流管線

對於第一個串流,

我們將使用現成的http-sourcetransform-processorlog-sink應用程式。

第一步,啟動 Spring Cloud Data Flow local 伺服器

java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar

現在,啟動 Spring Cloud Data Flow shell

java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar

現在讓我們註冊 HTTP 來源、轉換器處理器和使用 RabbitMQ 繫結器的日誌 Sink 應用程式

dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar

現在我們可以建立一個沒有函式組合的簡單串流

dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然後我們可以部署串流

dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://127.0.0.1:9000"
POST (text/plain) https://127.0.0.1:9000 friend
202 ACCEPTED

您可以在 log 應用程式中看到以下日誌訊息

[sformer.hello-1] log-sink                                 : Hello FRIEND

在這個串流中,我們在目標平台上將 http (來源)、transformer (處理器) 和 log (Sink) 應用程式部署為獨立應用程式(在本例中為 local)。 對於某些用例,對於簡單的有效負載轉換邏輯,我們可能希望將 Processor 應用程式與 SourceSink 應用程式結合使用。 例如,在來源輸出資料中遮罩某些特定使用者特定欄位之類的轉換情境,不一定需要部署為單獨的獨立應用程式。 相反,它可以在來源或 Sink 應用程式中組成。

為了將 Processor 函式組合到 Source 或 Sink 應用程式中,我們使用 Spring Cloud Stream 的函式組合支援。

Spring Cloud Stream 中的函式組合支援基於 Spring Cloud Function 的能力,允許註冊 java.util.Supplierjava.util.Consumerjava.util.Function 作為 Spring @Bean 定義。 這些函式 @Bean 定義可用於在執行時間進行組合。

Spring Cloud Stream 引入了一個新屬性,稱為 spring.cloud.stream.function.definition,它對應於 Spring Cloud Function 中的函式定義 DSL。 設定此屬性後,所需的函式 Bean 會在執行時間自動連結。

函式組合以下列方式發生

當 Spring Cloud Stream 應用程式的類型為 Source 時,組合函式會在來源 output 之後套用。

當 Spring Cloud Stream 應用程式的類型為 Sink 時,組合函式會在 Sink input 之前套用。

這提供了一種將函式(在 Spring Cloud Function DSL 中定義)組合成現有 Spring Cloud Stream 應用程式的能力,隨後讓 Spring Cloud Data Flow 在串流資料管線中協調它。

將函式組合成串流應用程式

讓我們建立和部署一個串流,將先前範例的轉換器表示式組合成 Source 應用程式本身。 轉換器邏輯是透過使用兩個 java.util.Function 實作完成的。

我們將建立一個新的來源應用程式,我們將其稱為 http-transformer,它擴充了現成的 http 來源應用程式。 新來源應用程式的來源可以在這裡找到。

http-transformer 應用程式包含 upperconcat 函式 Bean,如下所述

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

	@Bean
	public Function<String, String> upper() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> concat() {
		return value -> "Hello "+ value;
	}


	public static void main(String[] args) {
		SpringApplication.run(HttpSourceRabbitApplication.class, args);
	}
}

在複製 github repo 之後,您可以使用 maven 建置應用程式

cd function-composition/http-transformer ./mvnw clean package

現在使用 Data Flow Shell 註冊 http-transformer 應用程式。

注意

對於下面的應用程式註冊 --uri 選項,請將目錄名稱和構件的路徑替換為適合您系統的值。

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar

現在讓我們建立串流

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"

在部署串流時,我們傳遞 spring.cloud.stream.function.definition 屬性來定義組成的函式 DSL(如 Spring Cloud Function 中定義)。 在這種情況下,它是

dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"

上面的部署將 upperconcat 函式 Bean 組合成 http 來源應用程式。

然後我們可以將有效負載傳送到 http 應用程式

dataflow:>http post --data "friend" --target "https://127.0.0.1:9001"
> POST (text/plain) https://127.0.0.1:9001 friend
> 202 ACCEPTED

然後您可以在 log 應用程式中看到輸出,如下所示:

[helloComposed-1] log-sink                                 : Hello FRIEND

注意

請注意,函式組合支援不適用於現成的 Spring Cloud Stream Processor 應用程式,因為函式是否需要在現有處理器應用程式邏輯之前或之後套用存在歧義。

但是,您可以建立自己的處理器應用程式,這些應用程式使用具有標準 java.util.Function API 的函式組合,如下面的範例所示

@Configuration
public static class FunctionProcessorConfiguration {

@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

然後您需要使用以下屬性部署:spring.cloud.stream.function.definition=upperAndConcat

Kotlin 支援

另一個有趣的功能是 Spring Cloud Function 支援 Kotlin 函式的函式組合。 這讓我們可以將任何 Kotlin 函式 Bean 新增到 SourceSink 應用程式的可組合函式中。

為了看到這個工作,讓我們使用我們範例 github 儲存庫中的 http-transformer-kotlin-processor 應用程式。

Kotlin 函式 Bean 被配置為處理器。 在這裡,Kotlin 函式 Bean 是 transform 函式,定義如下

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,這個專案有 spring-cloud-function-kotlin 作為依賴項,用於將函式配置支援應用於 Kotlin 函式,定義如下

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

cd function-composition/http-transformer-kotlin ./mvnw clean package

注意

對於下面的應用程式註冊 --uri 選項,請將目錄名稱和構件的路徑替換為適合您系統的值。

dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar

要使用此應用程式作為 Source 建立串流

dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

如同我們在 http-transformer 範例中所做的,我們可以利用 spring.cloud.stream.function.definition 屬性來指定任何有效的組合函式 DSL,以建構函式組合。在這個例子中,讓我們將透過 Java 設定註冊的函式 Bean 與來自 Kotlin 處理器設定的函式 Bean 結合起來。

dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"

這裡,函式名稱 transform 對應到 Kotlin 函式。

注意:我們可以執行 Kotlin 函式和 Java 函式之間的組合,因為 Kotlin 函式在內部會轉換成 java.util.Function

dataflow:>http post --data "friend" --target "https://127.0.0.1:9002"
> POST (text/plain) https://127.0.0.1:9002 friend
> 202 ACCEPTED

而且,您可以在 log 應用程式中看到輸出如下:

[omposedKotlin-1] log-sink               : Hello How are you FRIEND

在這個範例中,http-transformer 也包含了函式的原始碼。然而,您可以透過在一個獨立的 artifact 中定義函式 Bean 來使應用程式更加模組化。然後,您可以透過僅在專案中新增一個 Maven 依賴項並設定 spring.cloud.stream.function.definition 屬性來建置應用程式。透過這種方式,您可以將您的大部分商業邏輯編碼為函式,並且在必要時,可以將它與 Source 或 Sink 組合。

取得 Spring 電子報

隨時掌握 Spring 電子報的最新消息

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部