領先一步
VMware 提供培訓和認證,以加速您的進度。
了解更多我很高興地宣布 Spring Integration 的 Java DSL 1.2 M2
現在可以使用了!
首先,我要感謝所有提出 issue、提出 Pull Requests、提供回饋意見或僅僅在 StackOverflow 上提出問題的人。 特別感謝自前一個Milestone 1以來的早期採用者。 在他們的幫助下,我們改進並修復了一些有關運行時流程註冊的問題。
構件 org.springframework.integration:spring-integration-java-dsl:1.2.0.M2
可在Milestone repo 中找到。 所以,試試看,不要猶豫,提出一個 GH issue 以獲得任何回饋!
當前迭代的一些重點
經過許多社群要求後,我們終於引入了 Jpa
Factory 和相應的 IntegrationComponentSpec
,以提供 Spring Integration JPA 元件的 Fluent API
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("pollingResults"))
.get();
}
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
由於 Spring Integration JPA 元件交易支援配置的複雜性(實際上是程式化的 TransactionalInterceptor
),我們引入了 TransactionInterceptorBuilder
。 此外,我們還提供了 TransactionHandleMessageAdvice
,它允許從任何端點為整個子流程啟動交易,而不僅僅是 handleRequestMessage
,就像常規 ConsumerEndpointSpec.advice()
的情況一樣。 實際上,主要的技巧是由最近在 Spring Integration Core 中引入的 HandleMessageAdvice
完成的,它是一個標記介面,用於區分僅適用於 handleRequestMessage
或從當前 MessageHandler
開始的流程的 advice。 為方便起見,一組 .transactional()
方法已添加到 ConsumerEndpointSpec
中。
The Scatter-Gather EI 模式現在有自己的 Java DSL API
@Bean
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10)),
gatherer -> gatherer
.releaseStrategy(group ->
group.size() == 3 ||
group.getMessages()
.stream()
.anyMatch(m -> (Double) m.getPayload() > 5)),
scatterGather -> scatterGather
.gatherTimeout(10_000));
}
其中 scatterer
僅是一個 RecipientListRouter
,gatherer
- 一個 AggregatingMessageHandler
,最後一個 Consumer
接受 ScatterGatherHandler
的選項。
現在 .routeToRecipients()
API 為收件者提供更多配置變體
.routeToRecipients(r -> r
.recipient("foo-channel", "'foo' == payload")
.recipient("bar-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("baz"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"bax".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
先前,.route()
運算符將 IntegrationFlow
中的下一個 .channel()
作為 Router
的 defaultOutputChannel
。 根據使用者體驗,做出這樣的決定並非總是合理。 我們重構了 .route()
,使其與標準 AbstractMessageRouter
行為保持一致。 已添加 .defaultOutputChannel()
和 .defaultSubFlowMapping()
,以利用 Router
的 default
邏輯。 要回滾到先前的行為,可以使用 .defaultOutputToParentFlow()
,就像您在上面的 .routeToRecipients()
範例中看到的那樣。
有關更多信息,請參閱 1.2.0.M2
版本的 commit 歷史記錄。 並且始終閱讀 JavaDocs 以了解您使用的 API!
我們期望在幾個星期後發布版本 1.2
的第一個(並希望是最後一個)發布候選版本,在 Spring Integration 4.3.2 和 Spring Boot 1.4.1 之後進行一些採用。 很快,spring-integration-java-dsl
將遷移到 Spring Integration Core 5.0
和 Java 8 代碼庫。 當前的 1.2
版本仍將受到支持,但僅用於錯誤修復。