Spring Integration 1.2 Milestone 2 的 Java DSL 已發布

發布 | Artem Bilan | 2016 年 9 月 15 日 | ...

我很高興地宣布 Spring Integration 的 Java DSL 1.2 M2 現在可以使用了!

首先,我要感謝所有提出 issue、提出 Pull Requests、提供回饋意見或僅僅在 StackOverflow 上提出問題的人。 特別感謝自前一個Milestone 1以來的早期採用者。 在他們的幫助下,我們改進並修復了一些有關運行時流程註冊的問題。

構件 org.springframework.integration:spring-integration-java-dsl:1.2.0.M2 可在Milestone repo 中找到。 所以,試試看,不要猶豫,提出一個 GH issue 以獲得任何回饋!

當前迭代的一些重點

JPA 支援

經過許多社群要求後,我們終於引入了 Jpa Factory 和相應的 IntegrationComponentSpec,以提供 Spring Integration JPA 元件的 Fluent API

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManagerFactory)
                    .entityClass(StudentDomain.class)
                    .maxResults(1)
                    .expectSingleResult(true),
                e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
            .channel(c -> c.queue("pollingResults"))
            .get();
}

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
            .handle(Jpa.updatingGateway(this.entityManagerFactory),
                    e -> e.transactional(true))
            .channel(c -> c.queue("persistResults"));
}

@Bean
public IntegrationFlow retrievingGatewayFlow() {
    return f -> f
            .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                    .jpaQuery("from Student s where s.id = :id")
                    .expectSingleResult(true)
                    .parameterExpression("id", "payload"))
            .channel(c -> c.queue("retrieveResults"));
}

流程中的交易支援

由於 Spring Integration JPA 元件交易支援配置的複雜性(實際上是程式化的 TransactionalInterceptor),我們引入了 TransactionInterceptorBuilder。 此外,我們還提供了 TransactionHandleMessageAdvice,它允許從任何端點為整個子流程啟動交易,而不僅僅是 handleRequestMessage,就像常規 ConsumerEndpointSpec.advice() 的情況一樣。 實際上,主要的技巧是由最近在 Spring Integration Core 中引入的 HandleMessageAdvice 完成的,它是一個標記介面,用於區分僅適用於 handleRequestMessage 或從當前 MessageHandler 開始的流程的 advice。 為方便起見,一組 .transactional() 方法已添加到 ConsumerEndpointSpec 中。

Scatter-Gather 支援

The Scatter-Gather EI 模式現在有自己的 Java DSL API

@Bean
public IntegrationFlow scatterGatherFlow() {
    return f -> f
      .scatterGather(scatterer -> scatterer
         .applySequence(true)
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10)),
      gatherer -> gatherer
         .releaseStrategy(group ->
                group.size() == 3 ||
                      group.getMessages()
                          .stream()
                          .anyMatch(m -> (Double) m.getPayload() > 5)),
      scatterGather -> scatterGather
        	 .gatherTimeout(10_000));
}

其中 scatterer 僅是一個 RecipientListRoutergatherer - 一個 AggregatingMessageHandler,最後一個 Consumer 接受 ScatterGatherHandler 的選項。

更多路由器改進

現在 .routeToRecipients() API 為收件者提供更多配置變體

.routeToRecipients(r -> r
    .recipient("foo-channel", "'foo' == payload")
    .recipient("bar-channel", m ->
        m.getHeaders().containsKey("recipient")
            && (boolean) m.getHeaders().get("recipient"))
    .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
        f -> f.<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("recipientListSubFlow1Result")))
    .recipientFlow((String p) -> p.startsWith("baz"),
        f -> f.transform("Hello "::concat)
            .channel(c -> c.queue("recipientListSubFlow2Result")))
    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                   "bax".equals(m.getPayload())),
        f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
    .defaultOutputToParentFlow())

先前,.route() 運算符將 IntegrationFlow 中的下一個 .channel() 作為 RouterdefaultOutputChannel。 根據使用者體驗,做出這樣的決定並非總是合理。 我們重構了 .route(),使其與標準 AbstractMessageRouter 行為保持一致。 已添加 .defaultOutputChannel().defaultSubFlowMapping(),以利用 Routerdefault 邏輯。 要回滾到先前的行為,可以使用 .defaultOutputToParentFlow(),就像您在上面的 .routeToRecipients() 範例中看到的那樣。

有關更多信息,請參閱 1.2.0.M2 版本的 commit 歷史記錄。 並且始終閱讀 JavaDocs 以了解您使用的 API!

後續步驟

我們期望在幾個星期後發布版本 1.2 的第一個(並希望是最後一個)發布候選版本,在 Spring Integration 4.3.2 和 Spring Boot 1.4.1 之後進行一些採用。 很快,spring-integration-java-dsl 將遷移到 Spring Integration Core 5.0 和 Java 8 代碼庫。 當前的 1.2 版本仍將受到支持,但僅用於錯誤修復。

專案頁面 | 文件 | Issues | Help

獲取 Spring 電子報

隨時關注 Spring 電子報

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看所有