搶先一步
VMware 提供培訓和認證,以加速您的進度。
了解更多親愛的 Spring 社群!
就在Spring Integration Java DSL 1.0 GA版本發布公告之後,我想向您介紹 Spring Integration Java DSL,這是一個基於經典 Cafe Demo 整合範例的逐行教學。我們在此描述 Spring Boot 支援、Spring Framework Java 和 Annotation 組態、IntegrationFlow
功能,並向 Java 8 Lambda 支援致敬,這也是 DSL 風格的靈感來源。當然,所有這些都由 Spring Integration Core 專案提供支援。
對於那些對 Java 8 尚不感興趣的人,我們提供類似的無 Lambda 教學:Spring Integration Java DSL (pre Java 8):逐行教學。
但是,在我們開始描述 Cafe 示範應用程式之前,這裡有一個較短的範例可以開始使用...
@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext ctx =
SpringApplication.run(Start.class, args);
List<String> strings = Arrays.asList("foo", "bar");
System.out.println(ctx.getBean(Upcase.class).upcase(strings));
ctx.close();
}
@MessagingGateway
public interface Upcase {
@Gateway(requestChannel = "upcase.input")
Collection<String> upcase(Collection<String> strings);
}
@Bean
public IntegrationFlow upcase() {
return f -> f
.split() // 1
.<String, String>transform(String::toUpperCase) // 2
.aggregate(); // 3
}
}
我們將基礎架構(註解等)的描述留給主要咖啡流程描述。在這裡,我們希望您專注於最後的 @Bean
、IntegrationFlow
以及將訊息傳送到該流程的閘道方法。
在 main
方法中,我們將字串集合傳送到閘道並將結果列印到 STDOUT。流程首先將集合拆分為個別的 String
(1);然後將每個字串轉換為大寫 (2),最後我們將它們重新聚合回一個集合 (3)。由於這是流程的結尾,因此框架將聚合的結果返回給閘道,並且新的 payload 成為閘道方法的傳回值。
等效的 XML 組態可能是...
<int:gateway service interface="foo.Upcase"
default-request-channel="upcase.input">
<int:splitter input-channel="upcase.input" output-channel="transform"/>
<int:transformer expression="payload.toUpperCase()"
input-channel="transform"
output-channel="aggregate" />
<int:aggregator input-channle="aggregate" />
或者...
<int:gateway service interface="foo.Upcase"
default-request-channel="upcase.input">
<int:chain input-channel="upcase.input">
<int:splitter />
<int:transformer expression="payload.toUpperCase()" />
<int:aggregator />
</int:chain>
##Cafe Demo
Cafe Demo
應用程式的目的是示範如何使用企業整合模式 (EIP) 來反映真實咖啡廳中的 order-delivery
情況。透過此應用程式,我們處理多個飲料訂單 - 熱的和冰的。執行應用程式後,我們可以在標準輸出 (System.out.println
) 中看到冷飲比熱飲準備得更快。但是,整個訂單的交付會延遲到熱飲準備好為止。
為了反映領域模型,我們有幾個類別:Order
、OrderItem
、Drink
和 Delivery
。它們都在整合案例中被提及,但我們不會在這裡分析它們,因為它們足夠簡單。
我們應用程式的原始碼僅放置在一個類別中;重要的行標記有數字,對應於後面的註解
@SpringBootApplication // 1
@IntegrationComponentScan // 2
public class Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args);// 3
Cafe cafe = ctx.getBean(Cafe.class); // 4
for (int i = 1; i <= 100; i++) { // 5
Order order = new Order(i);
order.addItem(DrinkType.LATTE, 2, false); //hot
order.addItem(DrinkType.MOCHA, 3, true); //iced
cafe.placeOrder(order);
}
System.out.println("Hit 'Enter' to terminate"); // 6
System.in.read();
ctx.close();
}
@MessagingGateway // 7
public interface Cafe {
@Gateway(requestChannel = "orders.input") // 8
void placeOrder(Order order); // 9
}
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() { // 11
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow orders() { // 12
return f -> f // 13
.split(Order.class, Order::getItems) // 14
.channel(c -> c.executor(Executors.newCachedThreadPool()))// 15
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16
.subFlowMapping("true", sf -> sf // 17
.channel(c -> c.queue(10)) // 18
.publishSubscribeChannel(c -> c // 19
.subscribe(s -> // 20
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21
.subscribe(sub -> sub // 22
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared cold drink #"
+ this.coldDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item) // 23
.handle(m -> System.out.println(m.getPayload())))))// 24
.subFlowMapping("false", sf -> sf // 25
.channel(c -> c.queue(10))
.publishSubscribeChannel(c -> c
.subscribe(s ->
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26
.subscribe(sub -> sub
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared hot drink #"
+ this.hotDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item)
.handle(m -> System.out.println(m.getPayload()))))))
.<OrderItem, Drink>transform(orderItem ->
new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots())) // 27
.aggregate(aggregator -> aggregator // 28
.outputProcessor(group -> // 29
new Delivery(group.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList()))) // 30
.correlationStrategy(m ->
((Drink) m.getPayload()).getOrderNumber()), null) // 31
.handle(CharacterStreamWritingMessageHandler.stdout()); // 32
}
}
逐行檢查程式碼...
1
@SpringBootApplication
來自 Spring Boot 1.2 的這個新的 Meta 註解。包括 @Configuration
和 @EnableAutoConfiguration
。由於我們在 Spring Integration 應用程式中,並且 Spring Boot 具有自動組態功能,因此會自動應用 @EnableIntegration
,以初始化 Spring Integration 基礎架構,包括 Java DSL 的環境 - DslIntegrationConfigurationInitializer
,它由 /META-INF/spring.factories
中的 IntegrationConfigurationBeanFactoryPostProcessor
拾取。
2
@IntegrationComponentScan
@ComponentScan
的 Spring Integration 類似物,用於根據介面掃描元件(Spring Framework 的 @ComponentScan
僅查看類別)。Spring Integration 支援探索使用 @MessagingGateway
註解的介面(請參閱下面的 #7)。
3
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
我們類別的 main
方法旨在透過來自此類別的組態啟動 Spring Boot 應用程式,並透過 Spring Boot 啟動 ApplicationContext
。此外,它還將命令列引數委派給 Spring Boot。例如,您可以指定 --debug
以查看 boot 自動組態報告的日誌。
4
Cafe cafe = ctx.getBean(Cafe.class);
由於我們已經有一個 ApplicationContext
,我們可以開始與應用程式互動。而 Cafe
是該進入點 - 在 EIP 術語中是 gateway
。閘道只是介面,應用程式不與訊息 API 互動;它只處理領域(請參閱下面的 #7)。
5
for (int i = 1; i <= 100; i++) {
為了示範咖啡廳的「工作」,我們啟動 100 個訂單,其中包含兩種飲料 - 一種熱的和一種冰的。並將 Order
傳送到 Cafe
閘道。
6
System.out.println("Hit 'Enter' to terminate");
通常 Spring Integration 應用程式是異步的,因此為了避免從 main
執行緒提前退出,我們阻塞 main
方法,直到透過命令列進行一些最終使用者互動。非守護程式執行緒將保持應用程式開啟,但 System.read()
為我們提供了一種乾淨地關閉應用程式的機制。
7
@MessagingGateway
用於標記業務介面的註解,以指示它是終端應用程式和整合層之間的 gateway
。它是 Spring Integration XML 組態中 <gateway />
元件的類似物。Spring Integration 為此介面建立一個 Proxy
,並將其作為 bean 填入應用程式上下文中。此 Proxy
的目的是將參數包裝在 Message<?>
物件中,並根據提供的選項將其傳送到 MessageChannel
。
8
@Gateway(requestChannel = "orders.input")
方法層級註解,可透過方法以及目標整合流程區分業務邏輯。在此範例中,我們使用 orders.input
的 requestChannel
參考,它是我們 IntegrationFlow
輸入通道的 MessageChannel
bean 名稱(請參閱下面的 #13)。
9
void placeOrder(Order order);
介面方法是從終端應用程式與整合層互動的中心點。此方法具有 void
傳回類型。這意味著我們的整合流程是 one-way
,我們只是將訊息傳送到整合流程,但不等待回覆。
10
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();
兩個計數器,用於收集我們的咖啡廳如何處理飲料的資訊。
11
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
default
poller
bean。它是 Spring Integration XML 組態中 <poller default="true">
元件的類似物。inputChannel
為 PollableChannel
的端點所必需。在這種情況下,對於兩個 Cafe queues
(熱的和冰的)是必要的(請參閱下面的 #18)。在這裡,我們使用 DSL 專案中的 Pollers
工廠,並使用其方法鏈流暢 API 來建立 poller 元資料。請注意,如果端點需要特定的 poller
(而不是預設 poller),則可以直接從 IntegrationFlow
定義中使用 Pollers
。
12
@Bean
public IntegrationFlow orders() {
IntegrationFlow
bean 定義。它是 Spring Integration Java DSL 的核心元件,儘管它在執行階段沒有任何作用,僅在 bean 註冊階段才有用。下面的所有其他程式碼都在 IntegrationFlow
物件中註冊 Spring Integration 元件(MessageChannel
、MessageHandler
、EventDrivenConsumer
、MessageProducer
、MessageSource
等),該物件由 IntegrationFlowBeanPostProcessor
解析,以處理這些元件並將它們註冊為應用程式上下文中的 bean(如果需要)(某些元素(例如通道)可能已經存在)。
13
return f -> f
IntegrationFlow
是一個 Consumer
函數介面,因此我們可以最小化我們的程式碼,並且只專注於整合案例需求。它的 Lambda
接受 IntegrationFlowDefinition
作為引數。此類別提供了一組全面的方法,可以將這些方法組成 chain
。我們將這些稱為 EIP-methods
,因為它們為 EI 模式提供實作,並從 Spring Integration Core 填入元件。在 bean 註冊階段,IntegrationFlowBeanPostProcessor
將此內嵌 (Lambda) IntegrationFlow
轉換為 StandardIntegrationFlow
並處理其元件。我們可以使用 IntegrationFlows
工廠(例如 IntegrationFlow.from("channelX"). ... .get()
)實現相同的效果,但我們發現 Lambda 定義更優雅。使用 Lambda 的 IntegrationFlow
定義會填入 DirectChannel
作為流程的 inputChannel
,並將其註冊到應用程式上下文中,作為 bean,在此範例中名稱為 orders.input
(flow bean name + ".input"
)。這就是我們將該名稱用於 Cafe
閘道的原因。
14
.split(Order.class, Order::getItems)
由於我們的整合流程透過 orders.input
通道接受訊息,因此我們已準備好使用和處理它們。我們案例中的第一個 EIP 方法是 .split()
。我們知道來自 orders.input
通道的訊息 payload
是一個 Order
領域物件,因此我們可以簡單地在此處使用其類型,並使用 Java 8 method-reference
功能。第一個參數是我們預期的訊息 payload
類型,第二個參數是對 getItems()
方法的方法參考,該方法會傳回 Collection<OrderItem>
。因此,這會執行 split
EI 模式,當我們將每個集合條目作為單獨的訊息傳送到下一個通道時。在後台,.split()
方法會註冊 MethodInvokingSplitter
MessageHandler
實作和該 MessageHandler
的 EventDrivenConsumer
,並將 orders.input
通道連接為 inputChannel
。
15
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.channel()
EIP 方法允許指定端點之間的具體 MessageChannel
,就像使用 Spring Integration XML 組態透過 output-channel
/input-channel
屬性對完成的一樣。預設情況下,DSL 整合流程定義中的端點與 DirectChannel
連接,這些通道會根據流程鏈中的 IntegrationFlow
bean 名稱和 index
取得 bean 名稱。在這種情況下,我們使用另一個 Lambda
運算式,它會從其 Channels
工廠中選擇特定的 MessageChannel
實作,並使用流暢 API 進行組態。目前通道是 ExecutorChannel
,允許將訊息從 splitter
分配到單獨的 Thread
,以便在下游流程中並行處理它們。
16
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping
我們案例中的下一個 EIP 方法是 .route()
,將 hot/iced
訂單項目傳送到不同的 Cafe 廚房。我們再次在此處使用方法參考 (isIced()
) 以從傳入訊息取得 routingKey
。第二個 Lambda 參數表示 router mapping
- 類似於 Spring Integration XML 組態中 <router>
元件的 <mapping>
子元素。但是,由於我們使用的是 Java,因此我們可以透過其 Lambda 支援更進一步!Spring Integration Java DSL 為 router
引入了 subflow
定義,除了傳統的 channel mapping
之外。每個子流程都根據路由執行,如果子流程產生結果,則在路由器之後將其傳遞到流程定義中的下一個元素。
17
.subFlowMapping("true", sf -> sf
指定目前路由器 mappingKey
的整合流程。在這些範例中,我們有兩個子流程 - hot
和 iced
。子流程與 IntegrationFlow
功能介面相同,因此我們可以像在頂層 IntegrationFlow
定義中一樣,完全相同地使用其 Lambda。子流程與其父流程沒有任何執行階段相依性,它只是一種邏輯關係。
18
.channel(c -> c.queue(10))
我們已經知道 IntegrationFlow
的 Lambda 定義是從 [FLOW_BEAN_NAME].input
DirectChannel
開始的,因此可能會有人問「如果我們再次指定 .channel()
,它是如何運作的?」。DSL 會處理這種情況,並使用 BridgeHandler
和端點連接這兩個通道。在我們的範例中,我們在這裡使用受限制的 QueueChannel
來反映真實生活中咖啡廳廚房的繁忙狀態。這裡需要 global poller
用於監聽此通道的下一個端點。
19
.publishSubscribeChannel(c -> c
.publishSubscribeChannel()
EIP 方法是 MessageChannels.publishSubscribe()
的 .channel()
變體,但帶有 .subscribe()
選項,我們可以指定子流程作為通道的訂閱者。沒錯,再次強調子流程!因此,可以指定任何深度的子流程。無論是否存在 .subscribe()
子流程,父流程中的下一個端點也是此 .publishSubscribeChannel()
的訂閱者。由於我們已經在 .route()
子流程中,因此最後一個訂閱者是一個隱式的 BridgeHandler
,它只是將訊息彈出到頂層 - 到一個類似的隱式 BridgeHandler
,以將訊息彈出到主流程中的下一個 .transform()
端點。關於我們目前流程位置的另一點說明:先前的 EIP 方法是 .channel(c -> c.queue(10))
,而這個方法也適用於 MessageChannel
。因此,它們再次使用隱式的 BridgeHandler
綁定在一起。在實際應用中,我們可以只使用單個 .handle()
來處理咖啡廳廚房,但我們的目標是在這裡盡可能涵蓋 DSL 功能。這就是為什麼我們將廚房工作分配到幾個相同 PublishSubscribeChannel
的子流程中。
20
.subscribe(s ->
.subscribe()
方法接受 IntegrationFlow
作為參數,可以將其指定為 Lambda 來配置訂閱者作為 subflow
。我們在這裡使用幾個子流程訂閱者來避免多行 Lambda,並涵蓋一些 DSL 以及 Spring Integration 功能。
21
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))
在這裡,我們使用一個簡單的 .handle()
EIP 方法來阻塞目前的執行緒一段時間,以示範咖啡廳廚房準備飲料的速度。我們在這裡使用 Google Guava 的 Uninterruptibles.sleepUninterruptibly
,以避免在 Lambda 表達式中使用 try...catch
區塊,儘管您可以這樣做,並且您的 Lambda 將會是多行的。或者您可以將該程式碼移到一個單獨的方法中,並在此處將其用作 method reference
。
由於我們沒有在 .publishSubscribeChannel()
上使用任何 Executor
,因此所有訂閱者將在同一個執行緒上按順序執行;在我們的例子中,它是前一個 QueueChannel
上 poller
的 TaskScheduler
的其中一個執行緒。這就是為什麼這個 sleep
會阻塞所有下游進程,並允許展示限制為 10 的 QueueChannel
的 busy state
。
22
.subscribe(sub -> sub
只有在 iced
飲料的 1 秒 sleep
之後,才會執行下一個子流程訂閱者。我們在這裡再次使用一個子流程,因為前一個 .handle()
對於 MessageHandler
的 Lambda 本質來說是 one-way
的。因此,為了繼續我們整個流程的進程,我們有幾個訂閱者:一些子流程在完成它們的工作後結束,並且不會將任何內容返回到父流程。
23
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared cold drink #"
+ this.coldDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item)
目前訂閱者子流程中的 transformer
是將 OrderItem
轉換為友好的 STDOUT 訊息,以供下一個 .handle
使用。在這裡,我們看到了 Lambda 表達式中泛型的使用。這是使用 GenericTransformer
功能介面實現的。
24
.handle(m -> System.out.println(m.getPayload())))))
這裡的 .handle()
只是為了示範如何使用 Lambda 表達式將 payload
列印到 STDOUT。這是一個訊號,表示我們的飲料已準備好。在此之後,PublishSubscribeChannel
的最終(隱式)訂閱者只是將帶有 OrderItem
的訊息傳送到主流程中的 .transform()
。
25
.subFlowMapping("false", sf -> sf
.subFlowMapping()
適用於 hot
飲料。實際上,它與先前的 iced
飲料子流程類似,但具有特定的 hot
業務邏輯。
26
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))
sleepUninterruptibly
適用於 hot
飲料。沒錯,我們需要更多時間來煮沸水!
27
.<OrderItem, Drink>transform(orderItem ->
new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots()))
主要的 OrderItem
到 Drink
transformer
,當 .route()
子流程在咖啡廳廚房訂閱者完成飲料準備後返回其結果時,將執行此轉換器。
28
.aggregate(aggregator -> aggregator
.aggregate()
EIP 方法提供了類似的選項來配置 AggregatingMessageHandler
及其端點,就像我們在使用 Spring Integration XML 配置時可以使用 <aggregator>
元件一樣。當然,使用 Java DSL,我們可以更強大地直接配置聚合器,而無需任何其他額外的 bean。Lambda 再次來救援!從咖啡廳業務邏輯的角度來看,我們為初始 Order
組合 Delivery
,因為我們在接近開始時將原始訂單 .split()
到 OrderItem
s。
29
.outputProcessor(group ->
AggregatorSpec
的 .outputProcessor()
允許我們在聚合器完成群組後發出一個自訂結果。它是 <aggregator>
元件或 POJO 方法上的 @Aggregator
註釋的 ref
/method
的類比。我們的目標是在這裡為所有 Drink
s 組合一個 Delivery
。
30
new Delivery(group.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList())))
正如您所見,我們在這裡使用了 Java 8 Stream
功能來處理 Collection
。我們從已發布的 MessageGroup
迭代訊息,並將每個訊息轉換 (map
) 為其 Drink
payload
。Stream
(.collect()
) 的結果 (Drink
s 的列表) 傳遞給 Delivery
建構函式。帶有此新 Delivery
payload 的 Message
將被傳送到我們的咖啡廳情境中的下一個端點。
31
.correlationStrategy(m ->
((Drink) m.getPayload()).getOrderNumber()), null)
.correlationStrategy()
Lambda 示範了我們如何自訂聚合器的行為。當然,我們可以只依賴 Spring Integration 中內建的 SequenceDetails
,它預設從我們流程開始的 .split()
填充到每個拆分的訊息中,但是包含 CorrelationStrategy
的 Lambda 範例只是為了說明。(使用 XML,我們可以使用了 correlation-expression
或自訂的 CorrelationStrategy
)。此行中 .aggregate()
EIP 方法的第二個參數是用於 endpointConfigurer
以自訂諸如 autoStartup
、requiresReply
、adviceChain
等選項。我們在這裡使用 null
來表明我們依賴於端點的預設選項。許多 EIP 方法都提供了帶有和不帶有 endpointConfigurer
的重載版本,但 .aggregate()
需要一個端點參數,以避免 AggregatorSpec
Lambda 參數的顯式轉換。
32
.handle(CharacterStreamWritingMessageHandler.stdout());
這是我們流程的結束 - Delivery
已交付給客戶!我們只是使用 Spring Integration Core 中現成的 CharacterStreamWritingMessageHandler
將訊息 payload
列印到 STDOUT。這是一個展示如何從 Java DSL 使用 Spring Integration Core(及其模組)的現有元件的例子。
好的,我們已經完成了基於 Spring Integration Java DSL 的 Cafe Demo 範例的描述。將它與 XML 範例 進行比較,以獲得有關 Spring Integration 的更多資訊。
這不是對 DSL 東西的總體教程。我們在這裡不評論 endpointConfigurer
選項、Transformers
工廠、IntegrationComponentSpec
階層、NamespaceFactories
、我們如何指定幾個 IntegrationFlow
bean 並將它們連接到單個應用程式等,請參閱 參考手冊 以獲取更多資訊。
至少這個逐行教程應該向您展示 Spring Integration Java DSL 的基礎知識,以及它在 Spring Framework Java & Annotation 配置、Spring Integration 基礎和 Java 8 Lambda 支援之間的無縫融合!
另請參閱 si4demo,以查看包括 Java DSL 在內的 Spring Integration 的演進,如 2014 年 SpringOne/2GX 大會上所示。(影片應該很快就可以觀看)。
與往常一樣,我們期待您的評論和回饋(StackOverflow(spring-integration
標籤)、Spring JIRA、GitHub),並且我們非常歡迎貢獻!
附註:即使本教程完全基於 Java 8 Lambda 支援,我們也不希望錯過 Java 8 之前的用戶,我們將提供類似的非 Lambda 部落格文章。敬請關注!