Spring Integration Java DSL:逐行教學

工程 | Artem Bilan | 2014 年 11 月 25 日 | ...

親愛的 Spring 社群!

就在Spring Integration Java DSL 1.0 GA版本發布公告之後,我想向您介紹 Spring Integration Java DSL,這是一個基於經典 Cafe Demo 整合範例的逐行教學。我們在此描述 Spring Boot 支援、Spring Framework Java 和 Annotation 組態、IntegrationFlow 功能,並向 Java 8 Lambda 支援致敬,這也是 DSL 風格的靈感來源。當然,所有這些都由 Spring Integration Core 專案提供支援。

對於那些對 Java 8 尚不感興趣的人,我們提供類似的無 Lambda 教學:Spring Integration Java DSL (pre Java 8):逐行教學

但是,在我們開始描述 Cafe 示範應用程式之前,這裡有一個較短的範例可以開始使用...

@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {

	public static void main(String[] args) throws InterruptedException {
		ConfigurableApplicationContext ctx = 
                                 SpringApplication.run(Start.class, args);

		List<String> strings = Arrays.asList("foo", "bar");
		System.out.println(ctx.getBean(Upcase.class).upcase(strings));

		ctx.close();
	}

	@MessagingGateway
	public interface Upcase {

		@Gateway(requestChannel = "upcase.input")
		Collection<String> upcase(Collection<String> strings);

	}

	@Bean
	public IntegrationFlow upcase() {
	     return f -> f
		 	.split()                                         // 1
			.<String, String>transform(String::toUpperCase)  // 2
			.aggregate();                                    // 3
	}

}

我們將基礎架構(註解等)的描述留給主要咖啡流程描述。在這裡,我們希望您專注於最後的 @BeanIntegrationFlow 以及將訊息傳送到該流程的閘道方法。

main 方法中,我們將字串集合傳送到閘道並將結果列印到 STDOUT。流程首先將集合拆分為個別的 String (1);然後將每個字串轉換為大寫 (2),最後我們將它們重新聚合回一個集合 (3)。由於這是流程的結尾,因此框架將聚合的結果返回給閘道,並且新的 payload 成為閘道方法的傳回值。

等效的 XML 組態可能是...

<int:gateway service interface="foo.Upcase" 
                 default-request-channel="upcase.input">

<int:splitter input-channel="upcase.input" output-channel="transform"/>

<int:transformer expression="payload.toUpperCase()"
    input-channel="transform"
    output-channel="aggregate" />

<int:aggregator input-channle="aggregate" />

或者...

<int:gateway service interface="foo.Upcase" 
                default-request-channel="upcase.input">

<int:chain input-channel="upcase.input">
    <int:splitter />
    <int:transformer expression="payload.toUpperCase()" />
    <int:aggregator />
</int:chain>

##Cafe Demo

Cafe Demo 應用程式的目的是示範如何使用企業整合模式 (EIP) 來反映真實咖啡廳中的 order-delivery 情況。透過此應用程式,我們處理多個飲料訂單 - 熱的和冰的。執行應用程式後,我們可以在標準輸出 (System.out.println) 中看到冷飲比熱飲準備得更快。但是,整個訂單的交付會延遲到熱飲準備好為止。

為了反映領域模型,我們有幾個類別:OrderOrderItemDrinkDelivery。它們都在整合案例中被提及,但我們不會在這裡分析它們,因為它們足夠簡單。

我們應用程式的原始碼僅放置在一個類別中;重要的行標記有數字,對應於後面的註解

@SpringBootApplication               // 1
@IntegrationComponentScan            // 2
public class Application {

  public static void main(String[] args) throws Exception {
  	ConfigurableApplicationContext ctx =
  	              SpringApplication.run(Application.class, args);// 3

  	Cafe cafe = ctx.getBean(Cafe.class);                         // 4
  	for (int i = 1; i <= 100; i++) {                             // 5
       Order order = new Order(i);
       order.addItem(DrinkType.LATTE, 2, false); //hot
       order.addItem(DrinkType.MOCHA, 3, true);  //iced
       cafe.placeOrder(order);
  	}

  	System.out.println("Hit 'Enter' to terminate");              // 6
  	System.in.read();
  	ctx.close();
  }

  @MessagingGateway                                              // 7
  public interface Cafe {

  	@Gateway(requestChannel = "orders.input")                    // 8
  	void placeOrder(Order order);                                // 9

  }

  private AtomicInteger hotDrinkCounter = new AtomicInteger();

  private AtomicInteger coldDrinkCounter = new AtomicInteger();  // 10

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata poller() {                               // 11
  	return Pollers.fixedDelay(1000).get();
  }

  @Bean
  public IntegrationFlow orders() {                             // 12
  	return f -> f                                               // 13
  	  .split(Order.class, Order::getItems)                      // 14
  	  .channel(c -> c.executor(Executors.newCachedThreadPool()))// 15
  	  .<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16
  	    .subFlowMapping("true", sf -> sf                        // 17
  	      .channel(c -> c.queue(10))                            // 18
  	      .publishSubscribeChannel(c -> c                       // 19
  	        .subscribe(s ->                                     // 20
  	          s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21
  	        .subscribe(sub -> sub                               // 22
  	          .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared cold drink #"
  	              + this.coldDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)                                 // 23
  	          .handle(m -> System.out.println(m.getPayload())))))// 24
  	    .subFlowMapping("false", sf -> sf                        // 25
  	      .channel(c -> c.queue(10))
  	      .publishSubscribeChannel(c -> c
  	        .subscribe(s ->
  	          s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26
  	        .subscribe(sub -> sub
  	          .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared hot drink #"
  	              + this.hotDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)
  	          .handle(m -> System.out.println(m.getPayload()))))))
  	  .<OrderItem, Drink>transform(orderItem ->
  	    new Drink(orderItem.getOrderNumber(),
  	      orderItem.getDrinkType(),
  	      orderItem.isIced(),
  	      orderItem.getShots()))                                // 27
  	  .aggregate(aggregator -> aggregator                       // 28
  	    .outputProcessor(group ->                               // 29
  	      new Delivery(group.getMessages()
  	        .stream()
  	        .map(message -> (Drink) message.getPayload())
  	        .collect(Collectors.toList())))                     // 30
  	    .correlationStrategy(m ->
  	      ((Drink) m.getPayload()).getOrderNumber()), null)     // 31
  	  .handle(CharacterStreamWritingMessageHandler.stdout());   // 32
  }

}

逐行檢查程式碼...

1

@SpringBootApplication

來自 Spring Boot 1.2 的這個新的 Meta 註解。包括 @Configuration@EnableAutoConfiguration。由於我們在 Spring Integration 應用程式中,並且 Spring Boot 具有自動組態功能,因此會自動應用 @EnableIntegration,以初始化 Spring Integration 基礎架構,包括 Java DSL 的環境 - DslIntegrationConfigurationInitializer,它由 /META-INF/spring.factories 中的 IntegrationConfigurationBeanFactoryPostProcessor 拾取。

2

@IntegrationComponentScan

@ComponentScan 的 Spring Integration 類似物,用於根據介面掃描元件(Spring Framework 的 @ComponentScan 僅查看類別)。Spring Integration 支援探索使用 @MessagingGateway 註解的介面(請參閱下面的 #7)。

3

ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);

我們類別的 main 方法旨在透過來自此類別的組態啟動 Spring Boot 應用程式,並透過 Spring Boot 啟動 ApplicationContext。此外,它還將命令列引數委派給 Spring Boot。例如,您可以指定 --debug 以查看 boot 自動組態報告的日誌。

4

Cafe cafe = ctx.getBean(Cafe.class);

由於我們已經有一個 ApplicationContext,我們可以開始與應用程式互動。而 Cafe 是該進入點 - 在 EIP 術語中是 gateway。閘道只是介面,應用程式不與訊息 API 互動;它只處理領域(請參閱下面的 #7)。

5

for (int i = 1; i <= 100; i++) {

為了示範咖啡廳的「工作」,我們啟動 100 個訂單,其中包含兩種飲料 - 一種熱的和一種冰的。並將 Order 傳送到 Cafe 閘道。

6

System.out.println("Hit 'Enter' to terminate");

通常 Spring Integration 應用程式是異步的,因此為了避免從 main 執行緒提前退出,我們阻塞 main 方法,直到透過命令列進行一些最終使用者互動。非守護程式執行緒將保持應用程式開啟,但 System.read() 為我們提供了一種乾淨地關閉應用程式的機制。

7

@MessagingGateway

用於標記業務介面的註解,以指示它是終端應用程式和整合層之間的 gateway。它是 Spring Integration XML 組態中 <gateway /> 元件的類似物。Spring Integration 為此介面建立一個 Proxy,並將其作為 bean 填入應用程式上下文中。此 Proxy 的目的是將參數包裝在 Message<?> 物件中,並根據提供的選項將其傳送到 MessageChannel

8

@Gateway(requestChannel = "orders.input")

方法層級註解,可透過方法以及目標整合流程區分業務邏輯。在此範例中,我們使用 orders.inputrequestChannel 參考,它是我們 IntegrationFlow 輸入通道的 MessageChannel bean 名稱(請參閱下面的 #13)。

9

void placeOrder(Order order);

介面方法是從終端應用程式與整合層互動的中心點。此方法具有 void 傳回類型。這意味著我們的整合流程是 one-way,我們只是將訊息傳送到整合流程,但不等待回覆。

10

private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();

兩個計數器,用於收集我們的咖啡廳如何處理飲料的資訊。

11

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {

default poller bean。它是 Spring Integration XML 組態中 <poller default="true"> 元件的類似物。inputChannelPollableChannel 的端點所必需。在這種情況下,對於兩個 Cafe queues(熱的和冰的)是必要的(請參閱下面的 #18)。在這裡,我們使用 DSL 專案中的 Pollers 工廠,並使用其方法鏈流暢 API 來建立 poller 元資料。請注意,如果端點需要特定的 poller(而不是預設 poller),則可以直接從 IntegrationFlow 定義中使用 Pollers

12

@Bean
public IntegrationFlow orders() {

IntegrationFlow bean 定義。它是 Spring Integration Java DSL 的核心元件,儘管它在執行階段沒有任何作用,僅在 bean 註冊階段才有用。下面的所有其他程式碼都在 IntegrationFlow 物件中註冊 Spring Integration 元件(MessageChannelMessageHandlerEventDrivenConsumerMessageProducerMessageSource 等),該物件由 IntegrationFlowBeanPostProcessor 解析,以處理這些元件並將它們註冊為應用程式上下文中的 bean(如果需要)(某些元素(例如通道)可能已經存在)。

13

return f -> f

IntegrationFlow 是一個 Consumer 函數介面,因此我們可以最小化我們的程式碼,並且只專注於整合案例需求。它的 Lambda 接受 IntegrationFlowDefinition 作為引數。此類別提供了一組全面的方法,可以將這些方法組成 chain。我們將這些稱為 EIP-methods,因為它們為 EI 模式提供實作,並從 Spring Integration Core 填入元件。在 bean 註冊階段,IntegrationFlowBeanPostProcessor 將此內嵌 (Lambda) IntegrationFlow 轉換為 StandardIntegrationFlow 並處理其元件。我們可以使用 IntegrationFlows 工廠(例如 IntegrationFlow.from("channelX"). ... .get())實現相同的效果,但我們發現 Lambda 定義更優雅。使用 Lambda 的 IntegrationFlow 定義會填入 DirectChannel 作為流程的 inputChannel,並將其註冊到應用程式上下文中,作為 bean,在此範例中名稱為 orders.input (flow bean name + ".input")。這就是我們將該名稱用於 Cafe 閘道的原因。

14

.split(Order.class, Order::getItems)

由於我們的整合流程透過 orders.input 通道接受訊息,因此我們已準備好使用和處理它們。我們案例中的第一個 EIP 方法是 .split()。我們知道來自 orders.input 通道的訊息 payload 是一個 Order 領域物件,因此我們可以簡單地在此處使用其類型,並使用 Java 8 method-reference 功能。第一個參數是我們預期的訊息 payload 類型,第二個參數是對 getItems() 方法的方法參考,該方法會傳回 Collection<OrderItem>。因此,這會執行 split EI 模式,當我們將每個集合條目作為單獨的訊息傳送到下一個通道時。在後台,.split() 方法會註冊 MethodInvokingSplitter MessageHandler 實作和該 MessageHandlerEventDrivenConsumer,並將 orders.input 通道連接為 inputChannel

15

.channel(c -> c.executor(Executors.newCachedThreadPool()))

.channel() EIP 方法允許指定端點之間的具體 MessageChannel,就像使用 Spring Integration XML 組態透過 output-channel/input-channel 屬性對完成的一樣。預設情況下,DSL 整合流程定義中的端點與 DirectChannel 連接,這些通道會根據流程鏈中的 IntegrationFlow bean 名稱和 index 取得 bean 名稱。在這種情況下,我們使用另一個 Lambda 運算式,它會從其 Channels 工廠中選擇特定的 MessageChannel 實作,並使用流暢 API 進行組態。目前通道是 ExecutorChannel,允許將訊息從 splitter 分配到單獨的 Thread,以便在下游流程中並行處理它們。

16

.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping

我們案例中的下一個 EIP 方法是 .route(),將 hot/iced 訂單項目傳送到不同的 Cafe 廚房。我們再次在此處使用方法參考 (isIced()) 以從傳入訊息取得 routingKey。第二個 Lambda 參數表示 router mapping - 類似於 Spring Integration XML 組態中 <router> 元件的 <mapping> 子元素。但是,由於我們使用的是 Java,因此我們可以透過其 Lambda 支援更進一步!Spring Integration Java DSL 為 router 引入了 subflow 定義,除了傳統的 channel mapping 之外。每個子流程都根據路由執行,如果子流程產生結果,則在路由器之後將其傳遞到流程定義中的下一個元素。

17

.subFlowMapping("true", sf -> sf 

指定目前路由器 mappingKey 的整合流程。在這些範例中,我們有兩個子流程 - hoticed。子流程與 IntegrationFlow 功能介面相同,因此我們可以像在頂層 IntegrationFlow 定義中一樣,完全相同地使用其 Lambda。子流程與其父流程沒有任何執行階段相依性,它只是一種邏輯關係。

18

.channel(c -> c.queue(10))

我們已經知道 IntegrationFlow 的 Lambda 定義是從 [FLOW_BEAN_NAME].input DirectChannel 開始的,因此可能會有人問「如果我們再次指定 .channel(),它是如何運作的?」。DSL 會處理這種情況,並使用 BridgeHandler 和端點連接這兩個通道。在我們的範例中,我們在這裡使用受限制的 QueueChannel 來反映真實生活中咖啡廳廚房的繁忙狀態。這裡需要 global poller 用於監聽此通道的下一個端點。

19

.publishSubscribeChannel(c -> c

.publishSubscribeChannel() EIP 方法是 MessageChannels.publishSubscribe().channel() 變體,但帶有 .subscribe() 選項,我們可以指定子流程作為通道的訂閱者。沒錯,再次強調子流程!因此,可以指定任何深度的子流程。無論是否存在 .subscribe() 子流程,父流程中的下一個端點也是此 .publishSubscribeChannel() 的訂閱者。由於我們已經在 .route() 子流程中,因此最後一個訂閱者是一個隱式的 BridgeHandler,它只是將訊息彈出到頂層 - 到一個類似的隱式 BridgeHandler,以將訊息彈出到主流程中的下一個 .transform() 端點。關於我們目前流程位置的另一點說明:先前的 EIP 方法是 .channel(c -> c.queue(10)),而這個方法也適用於 MessageChannel。因此,它們再次使用隱式的 BridgeHandler 綁定在一起。在實際應用中,我們可以只使用單個 .handle() 來處理咖啡廳廚房,但我們的目標是在這裡盡可能涵蓋 DSL 功能。這就是為什麼我們將廚房工作分配到幾個相同 PublishSubscribeChannel 的子流程中。

20

.subscribe(s ->

.subscribe() 方法接受 IntegrationFlow 作為參數,可以將其指定為 Lambda 來配置訂閱者作為 subflow。我們在這裡使用幾個子流程訂閱者來避免多行 Lambda,並涵蓋一些 DSL 以及 Spring Integration 功能。

21

s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))

在這裡,我們使用一個簡單的 .handle() EIP 方法來阻塞目前的執行緒一段時間,以示範咖啡廳廚房準備飲料的速度。我們在這裡使用 Google Guava 的 Uninterruptibles.sleepUninterruptibly,以避免在 Lambda 表達式中使用 try...catch 區塊,儘管您可以這樣做,並且您的 Lambda 將會是多行的。或者您可以將該程式碼移到一個單獨的方法中,並在此處將其用作 method reference

由於我們沒有在 .publishSubscribeChannel() 上使用任何 Executor,因此所有訂閱者將在同一個執行緒上按順序執行;在我們的例子中,它是前一個 QueueChannelpollerTaskScheduler 的其中一個執行緒。這就是為什麼這個 sleep 會阻塞所有下游進程,並允許展示限制為 10 的 QueueChannelbusy state

22

.subscribe(sub -> sub

只有在 iced 飲料的 1 秒 sleep 之後,才會執行下一個子流程訂閱者。我們在這裡再次使用一個子流程,因為前一個 .handle() 對於 MessageHandler 的 Lambda 本質來說是 one-way 的。因此,為了繼續我們整個流程的進程,我們有幾個訂閱者:一些子流程在完成它們的工作後結束,並且不會將任何內容返回到父流程。

23

 .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared cold drink #"
  	              + this.coldDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)         

目前訂閱者子流程中的 transformer 是將 OrderItem 轉換為友好的 STDOUT 訊息,以供下一個 .handle 使用。在這裡,我們看到了 Lambda 表達式中泛型的使用。這是使用 GenericTransformer 功能介面實現的。

24

.handle(m -> System.out.println(m.getPayload())))))

這裡的 .handle() 只是為了示範如何使用 Lambda 表達式將 payload 列印到 STDOUT。這是一個訊號,表示我們的飲料已準備好。在此之後,PublishSubscribeChannel 的最終(隱式)訂閱者只是將帶有 OrderItem 的訊息傳送到主流程中的 .transform()

25

.subFlowMapping("false", sf -> sf

.subFlowMapping() 適用於 hot 飲料。實際上,它與先前的 iced 飲料子流程類似,但具有特定的 hot 業務邏輯。

26

s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))

sleepUninterruptibly 適用於 hot 飲料。沒錯,我們需要更多時間來煮沸水!

27

 .<OrderItem, Drink>transform(orderItem ->
  	    new Drink(orderItem.getOrderNumber(),
  	      orderItem.getDrinkType(),
  	      orderItem.isIced(),
  	      orderItem.getShots()))      

主要的 OrderItemDrink transformer,當 .route() 子流程在咖啡廳廚房訂閱者完成飲料準備後返回其結果時,將執行此轉換器。

28

.aggregate(aggregator -> aggregator

.aggregate() EIP 方法提供了類似的選項來配置 AggregatingMessageHandler 及其端點,就像我們在使用 Spring Integration XML 配置時可以使用 <aggregator> 元件一樣。當然,使用 Java DSL,我們可以更強大地直接配置聚合器,而無需任何其他額外的 bean。Lambda 再次來救援!從咖啡廳業務邏輯的角度來看,我們為初始 Order 組合 Delivery,因為我們在接近開始時將原始訂單 .split()OrderItems。

29

.outputProcessor(group -> 

AggregatorSpec.outputProcessor() 允許我們在聚合器完成群組後發出一個自訂結果。它是 <aggregator> 元件或 POJO 方法上的 @Aggregator 註釋的 ref/method 的類比。我們的目標是在這裡為所有 Drinks 組合一個 Delivery

30

new Delivery(group.getMessages()
  	        .stream()
  	        .map(message -> (Drink) message.getPayload())
  	        .collect(Collectors.toList())))    

正如您所見,我們在這裡使用了 Java 8 Stream 功能來處理 Collection。我們從已發布的 MessageGroup 迭代訊息,並將每個訊息轉換 (map) 為其 Drink payloadStream (.collect()) 的結果 (Drinks 的列表) 傳遞給 Delivery 建構函式。帶有此新 Delivery payload 的 Message 將被傳送到我們的咖啡廳情境中的下一個端點。

31

.correlationStrategy(m ->
  	      ((Drink) m.getPayload()).getOrderNumber()), null)

.correlationStrategy() Lambda 示範了我們如何自訂聚合器的行為。當然,我們可以只依賴 Spring Integration 中內建的 SequenceDetails,它預設從我們流程開始的 .split() 填充到每個拆分的訊息中,但是包含 CorrelationStrategy 的 Lambda 範例只是為了說明。(使用 XML,我們可以使用了 correlation-expression 或自訂的 CorrelationStrategy)。此行中 .aggregate() EIP 方法的第二個參數是用於 endpointConfigurer 以自訂諸如 autoStartuprequiresReplyadviceChain 等選項。我們在這裡使用 null 來表明我們依賴於端點的預設選項。許多 EIP 方法都提供了帶有和不帶有 endpointConfigurer 的重載版本,但 .aggregate() 需要一個端點參數,以避免 AggregatorSpec Lambda 參數的顯式轉換。

32

.handle(CharacterStreamWritingMessageHandler.stdout());

這是我們流程的結束 - Delivery 已交付給客戶!我們只是使用 Spring Integration Core 中現成的 CharacterStreamWritingMessageHandler 將訊息 payload 列印到 STDOUT。這是一個展示如何從 Java DSL 使用 Spring Integration Core(及其模組)的現有元件的例子。

好的,我們已經完成了基於 Spring Integration Java DSL 的 Cafe Demo 範例的描述。將它與 XML 範例 進行比較,以獲得有關 Spring Integration 的更多資訊。

這不是對 DSL 東西的總體教程。我們在這裡不評論 endpointConfigurer 選項、Transformers 工廠、IntegrationComponentSpec 階層、NamespaceFactories、我們如何指定幾個 IntegrationFlow bean 並將它們連接到單個應用程式等,請參閱 參考手冊 以獲取更多資訊。

至少這個逐行教程應該向您展示 Spring Integration Java DSL 的基礎知識,以及它在 Spring Framework Java & Annotation 配置、Spring Integration 基礎和 Java 8 Lambda 支援之間的無縫融合!

另請參閱 si4demo,以查看包括 Java DSL 在內的 Spring Integration 的演進,如 2014 年 SpringOne/2GX 大會上所示。(影片應該很快就可以觀看)。

與往常一樣,我們期待您的評論和回饋(StackOverflowspring-integration 標籤)、Spring JIRAGitHub),並且我們非常歡迎貢獻

附註:即使本教程完全基於 Java 8 Lambda 支援,我們也不希望錯過 Java 8 之前的用戶,我們將提供類似的非 Lambda 部落格文章。敬請關注!

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部