Spring Cloud Stream 應用程式的 Java 函數簡介 - 第 1 部分

工程 | David Turanski | 2020 年 7 月 20 日 | ...

Spring Cloud Stream 應用程式的 Java 函數簡介 - 第 1 部分

上週我們發布了Spring Cloud Stream 應用程式的 Java 函數簡介 - 第 0 部分
以宣布 Spring Cloud Stream 應用程式 2020.0.0-M2 的發布。
在此,我們將探討函數組合,這是第 0 部分中介紹的函數導向架構所啟用的更強大功能之一。如果您還沒有機會閱讀第 0 部分,現在是絕佳時機!

函數組合

函數組合在數學和電腦科學中具有堅實的理論基礎。
實際上,它是一種連接一系列函數以創建更複雜函數的方法。

讓我們看看一個使用 Java 函數的簡單範例。我們有兩個函數,reverseupper
每個函數都接受 String 作為輸入並產生 String 作為輸出。我們可以使用內建的 andThen 方法組合它們。組合函數本身就是一個 Function<String, String>
如果您執行此程式碼,它將印出 ESREVER

Function<String, String> reverse = s -> new StringBuilder(s).reverse().toString(); Function<String, String> upper = String::toUpperCase; Function<String, String> reverseUpper = reverse.andThen(upper); System.out.println(reverseUpper.apply("reverse"));

提示

除了 andThen 之外,java.util.Function 還包含 compose,它首先應用引數 (b),然後將 a 應用於結果。
因此,a.compose(b).apply(s) 等效於 a.apply(b.apply(s))

Spring Cloud Function 中的函數組合

Spring Cloud Function 包含一些很棒的功能,可將組合函數提升到另一個層次。

宣告式組合

如果我們將上述範例中的函數定義為 Spring bean,

@Bean Function<String, String> reverse() { return s -> new StringBuilder(s).reverse().toString(); }

@Bean Function<String, String> upper() { return String::toUpperCase;

}

我們可以使用 spring.cloud.function.definition 屬性 spring.cloud.function.definition=upper|reverse 來組合這些函數

此處 | 是一個組合運算子,它會產生一個自動配置的 bean,實作組合函數,以及相關資源,讓您可以無縫地調用組合函數。

與 Supplier 和 Consumer 的組合

Spring Cloud Function 擴展了原生 Java 函數組合,以支援與 Supplier 和 Consumer 的組合。

這遵循了隱含為真的概念

  • 與 Consumer 組合的函數是一個 Consumer

  • 與函數組合的 Supplier 是一個 Supplier

  • 與 Consumer 組合的 Supplier 是一個有效的處理模型(沒有輸入或輸出,這種形式的組合不會對應到函數介面,但類似於 Runnable

正如我們將看到的,Spring Cloud Stream 應用程式有效地運用了這些概念。

類型轉換

使用函數組合時,我們必須考慮相容的引數類型。
使用原生 Java 組合,我們可以將 Function<Integer,String> 與 Function<String, Integer> 組合成 Function<Integer, Integer>

Function<Integer, String> intToStr = String::valueOf; Function<String, Integer> doubleit = i -> Integer.parseInt(i) * 2; Function<Integer, Integer> composite = intToStr.andThen(doubleit); composite.apply(10);

執行 Spring 應用程式時,Spring Cloud Function 會使用 Spring 的標準類型轉換支援,根據需要強制轉換函數引數。
給定以下 Function bean 定義,函數定義 intToStr|doubleit 如預期運作,將 String 轉換為 Integer。

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, String> intToStr() { return String::valueOf;

}

除了轉換基本類型之外,Spring 函數還可以在 Message 和 POJO、JSON String 和 POJO 等之間進行轉換。
例如,以下函數可以依任意順序組合

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, Message> convertIntMessage() { return i -> MessageBuilder.withPayload(String.valueOf(i)).build();

}

Spring Cloud Stream 中的函數組合

Spring Cloud Stream 3.x 以 Spring Cloud Function 為基礎,完全支援函數式程式設計模型。Spring Cloud Stream 的基本前提是它使函數能夠在分散式環境中執行。Binder 將 Spring Boot 應用程式中封裝的函數的輸入和輸出繫結到配置的消息代理目的地,以便一個函數產生的輸出被另一個遠端執行的函數消耗作為輸入。我們可以將資料串流管道視為函數元件的分散式組合。

為了說明這一點,典型的 Spring Cloud Stream 管道,例如

source | processor1 | processor2 | processor3 | sink

在邏輯上等效於

supplier | function1 | function2 | function3 | sink

這個想法引出了一些有趣的架構選擇,因為我們可以使用函數組合將部分或全部這些元件組合到單一應用程式中。

例如,我們可以將三個處理器的序列實作為單一應用程式,我們稱之為 composed-processor,封裝 function1function2function3,並透過 spring.cloud.function.definition=function1|function2|function3 組合。現在,管道可以部署為

source | composed-processor | sink

更簡單的是,我們可以建立一個 composed-source 以在來源中完成所有處理

composed-source | sink

與往常一樣,這裡沒有正確的答案。總是有需要權衡的地方

  • 函數組合減少了部署次數。這降低了成本、延遲、操作複雜性等等。

  • 個別部署鬆散耦合,可以獨立擴展。

  • 消息代理提供保證交付。當一個簡單的無狀態應用程式當機並重新啟動時,它可以從上次停止的地方繼續,處理上一個處理步驟的待處理結果。

  • 執行複雜處理的單一應用程式更難以理解,並且將中間處理結果保留在記憶體中,或可能在臨時資料儲存區中。當有狀態應用程式失敗時,可能會導致狀態不一致,使復原更加困難。

如果這些權衡看起來很熟悉,那是因為它們幾乎與任何微服務與單體架構的辯論相同。最終,做最適合您的事情。

與預先封裝的來源應用程式的函數組合

在某些情況下,函數組合是顯而易見的。從一開始,我們就提供了預先封裝的處理器來使用SpEL執行簡單的轉換或篩選。當使用預先封裝的來源或接收器時,舊版架構需要一個單獨的處理器。使用者常見的抱怨是「為什麼我需要部署一個單獨的應用程式才能評估 SpEL 表達式?」為了解決這個問題,我們最初在早期版本中引入了一種函數組合的支援形式。要將此功能與預先封裝的應用程式一起使用,需要 Fork 它們以修改程式碼或建置依賴項以提供函數。

目前的版本為所有預先封裝的來源提供了開箱即用的函數組合。具體來說,來源現在可以與預先封裝的函數組合,以在本機執行以下任何操作

  • 執行 SpEL 轉換

  • 豐富訊息標頭

  • 篩選事件

  • 產生任務啟動請求

例如,我們可以將 time 來源與標頭豐富器和篩選器與組態屬性組合,並將其作為獨立的 Spring Boot 應用程式執行

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=even --spring.cloud.function.definition=timeSupplier|headerEnricherFunction|filterFunction --header.enricher.headers=seconds=T(java.lang.Integer).valueOf(payload .substring(payload.length() - 2)) --filter.function.expression=headers[seconds]%2==0

這將每隔一秒發布時間,例如 `07/16/20 16:43:48,只要秒數為偶數,就發布到配置的目的地 even

在這裡,我們使用預先封裝的 RabbitMQ 時間來源,將輸出繫結到名為 even 的主題交換器。如果交換器不存在,Binder 將會建立它。函數定義擴展了 Supplier 以提取秒數,將其轉換為整數並將其儲存在 seconds 訊息標頭中,然後根據標頭的值進行篩選。只有偶數值才能通過篩選。

任務啟動請求

在 2018 年,我們引入了使用 Spring Cloud Data Flow 和 Spring Batch 執行檔案擷取的參考架構。為此,我們 Fork 了 sftp 來源作為 sftp-dataflow,專門用於實作產生任務啟動請求的預先封裝來源。任務啟動請求是一個簡單的值物件,以 JSON 呈現,並由 tasklauncher-sink 使用。接收器充當 Data Flow 的用戶端,以根據請求啟動批次應用程式。我們最初選擇 sftp,因為它是最常用的檔案處理協定。但是,我們意識到相同的模式可以應用於任何來源。我們現在可以使用函數組合來做到這一點。除了標準的 sftp 來源 之外,我們還可以從 ftpfiles3 等觸發任務啟動。甚至可以使用時間來源以規律的間隔啟動任務。

這個有些牽強的範例產生了任務啟動請求

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=time-test --spring.cloud.stream.function.definition=timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction --spel.function.expression=payload.length() --header.enricher.headers=task-id=payload*2 --task.launch.request.task-name-expression="'task-'+headers['task-id']

作為 JSON 的有效負載是 {"args":[],"deploymentProps":{},"name":"task-34"}

與使用者編寫的程式碼的函數組合

實際上,當使用者開發 Spring Cloud Stream 管道時,他們可能會從我們預先封裝的 Spring Cloud Stream 應用程式 中選擇來源和接收器。處理器通常是使用者編寫的程式碼,實作特定的業務邏輯。如果您正在編寫處理器,或想要擴展來源或接收器,則可以使用任何 函數。由於我們將函數作為單獨的成品發布,因此您可以簡單地將它們包含在您的依賴項中。您可以如上所示使用宣告式組合,也可以將它們注入到您的程式碼中並以程式設計方式調用它們。當然,您也可以輕鬆地整合您自己的函數。

我該如何貢獻新的函數或應用程式?

如果您在現有的函數和應用程式目錄中找不到您要尋找的內容,請考慮貢獻。這樣,整個開放原始碼社群都將受益。在後續的文章中,我們將逐步介紹開發函數和串流應用程式的真實範例。

我們鼓勵社群參與此專案。除了程式碼貢獻外,我們非常感謝文件改進和建立問題。

敬請關注…​

這篇部落格是系列文章中的第二篇,將涵蓋許多相關主題。敬請期待未來幾週更深入的探討和重點主題。我們將帶您了解此儲存庫中包含的所有元件以及周邊流程的完整概況。

取得 Spring 電子報

訂閱 Spring 電子報,隨時掌握最新資訊

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進展。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

瞭解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部