領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多親愛的 Spring 社群:
我們很高興宣布 Spring Integration Java DSL 1.1 Milestone 1 版本現已推出。請使用 Milestone Repository 與 Maven 或 Gradle,以搶先體驗。
compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"
老實說,`1.1` 版本中許多計畫的功能尚未實作,但感謝 鼓勵 來自我們的好友 Josh Long,以及最近關於 Apache Kafka 支援的公告 (Spring Integration Kafka Support 1.1 Release, Spring XD 1.1.1 Release),我們發布此 Milestone 1 主要目的是展示 Java Configuration DSL 中的 Apache Kafka 支援。
我們將在本篇文章中探討該版本和其他功能。
讓我們從 Spring Integration Java DSL 中 KafkaTests
類別的一些「瑣碎」範例開始
@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
return new DefaultConnectionFactory(
new ZookeeperConfiguration(zookeeper.connectString()));
}
@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
MetadataStoreOffsetManager offsetManager =
new MetadataStoreOffsetManager(connectionFactory);
// start reading at the end of the
offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
return offsetManager;
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(
ConnectionFactory connectionFactory,
OffsetManager offsetManager) {
return IntegrationFlows
.from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
.autoCommitOffset(false)
.payloadDecoder(String::new)
.keyDecoder(b -> Integer.valueOf(new String(b)))
.configureListenerContainer(c ->
c.offsetManager(offsetManager)
.maxFetch(100)))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults"))
.get();
}
EmbeddedZookeeper
是 Apache Kafka test
artifact 的一部分 (在我們的例子中是 testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test'
),並且與許多其他功能 (例如 kafka.utils.TestUtils
) 一起,對於單元測試非常有用。ConnectionFactory
和 OffsetManager
的更多資訊,請參閱 Spring Integration Kafka 專案。IntegrationFlow
bean 定義。 Spring Integration Java DSL 提供了一個命名空間 factory - Kafka
- 它利用 IntegrationComponentSpec
實現 Spring Integration Kafka 配接器,例如用於 KafkaMessageDrivenChannelAdapter
的 KafkaMessageDrivenChannelAdapterSpec
。method-chain
的選項委派給底層的 KafkaMessageDrivenChannelAdapter
實例。.payloadDecoder(String::new)
行。 kafka.serializer.Decoder
是一個 Scala trait
,它被編譯成 Java 介面 (不是類別!),因此我們可以在這裡將它表示為 Java 8 lambda 方法。.configureListenerContainer()
是一個 lambda 感知方法,用於分離 KafkaMessageListenerContainer
特定選項的考量。來自 Kafka
命名空間 factory 的其他自我解釋的 factory-methods 是用於 KafkaHighLevelConsumerMessageSource
輪詢配接器的 .inboundChannelAdapter(...)
和用於 KafkaProducerMessageHandler
的 .outboundChannelAdapter(...)
。 有關更多資訊,請參閱它們的 JavaDoc。
如需更多資訊,請查看 Josh Long 關於 使用 Apache Kafka 進行 Spring 整合和資料處理管道 的文章!
來自社群的大量意見反應 (網路研討會重播:Spring Integration Java DSL 簡介) 是關於 bean 方法調用元件 (services、transformers、routers 等),並且我們清楚地聽到了您的聲音:元件方法選擇已獲得改進。 這是一個範例,它類似於 XML 設定中的 <int:service-activator input-channel="greetingChannel" ref="greetingService"/>
@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {
@Autowired
private GreetingService greetingService;
@Bean
public IntegrationFlow greetingFlow() {
return IntegrationFlows.from("greetingChannel")
.handle(this.greetingService)
.get();
}
}
@Component
public class GreetingService {
public void greeting(String payload) {
System.out.println("Hello " + payload);
}
}
在此,greeting
方法將由框架自動選取。 另一種選擇是採用 methodName
引數,以在不明確的情況下指定方法。 類似的 POJO 方法調用 EIP-methods 已被引入到許多其他 EIP 實作中,例如 transform(Object service, String methodName)
, split(Object service)
等。
Spring Integration Java DSL 也尊重 Spring Integration messaging annotations,例如 @ServiceActivator
, @Router
, @Filter
等,甚至還有 @Payload
, @Header
。 有關更多資訊,請參閱 IntegrationFlowDefinition
JavaDoc。
作為 IntegrationFlow
是一個介面,我們可以只提供它的直接實作作為自訂元件,並且它可以在 Spring Integration Java DSL 環境中按原樣工作,這並不奇怪
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
這類似於 @Bean
定義,但此方法有助於我們的元件保持更鬆散的耦合。
但是,等等,還有更多! IntegrationFlow
實作 (如 @Bean
定義案例中的 lambdas) 僅限於 DirectChannel
輸入通道。 我們在此更進一步,並引入了 IntegrationFlowAdapter
。 這是我的最愛範例,用於示範如何使用它
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new tomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this, "messageSource",
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("foo", "FOO"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "B,A,R";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> foo) {
return foo.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String foo) {
return payload + ":" + foo;
}
}
當然,透過 POJO 方法調用支援 (請參閱上文),將無法如此輕易地建構流程。
Spring Framework 和 Spring Integration 長期以來一直支援動態語言,並且它主要與 XML Spring 設定連結。 從 Java 程式碼處理腳本 (例如 Groovy、Ruby、JavaScript 等) 可能看起來很奇怪,但我們發現它是用於在執行階段重新載入功能的一個有用工具,並且當 Java lambas 不夠動態時。 讓我們看看 Spring Integration Java DSL 中的 Scripts
命名空間 factory
@Configuration
@EnableIntegration
public class ScriptsConfiguration {
@Value("com/my/project/integration/scripts/splitterScript.groovy")
private Resource splitterScript;
@Bean
public PollableChannel results() {
return new QueueChannel();
}
@Bean
public IntegrationFlow scriptSplitter() {
return f -> f
.split(Scripts.script(this.splitterScript)
.refreshCheckDelay(10000)
.variable("foo", "bar"))
.channel(results());
}
}
此腳本支援允許我們僅處理外部資源,這些資源可以在執行階段變更和重新載入。 Spring Integration Scripting 模組支援的 inline
腳本沒有意義,因為我們有 Java 8 lambdas 可用於這些情況。
Wire Tap EI Pattern 在 Spring Integration 中實作為 ChannelInterceptor
,並且可以像這樣注入到任何 MessageChannel
中作為攔截器
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct()
.interceptor(new WireTap(loggerChannel()))
.get();
}
IntegrationFlow
定義允許我們省略 EIP 元件之間的 MessageChannel
宣告,因此我們引入了一個 inline .wireTap()
EIP-method,以允許對這些匿名通道進行 WireTap
注入。 以下是一些範例
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlows.from("tappedChannel1")
.wireTap("tapChannel",
wt -> wt.selector(m -> m.getPayload().equals("foo")))
.channel("nullChannel")
.get();
}
@Bean
public IntegrationFlow wireTapFlow2() {
return f -> f
.wireTap(sf -> sf
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("wireTapSubflowResult")))
.channel("nullChannel");
}
有關更多資訊,請參閱 IntegrationFlowDefinition.wireTap()
方法 JavaDoc,並且不要錯過 GitHub 專案頁面上的我們的測試案例。
針對 1.1 版本,我們還有很多工作要做,例如進一步簡化 .aggregate()
、等設定,新增注入外部子流程的能力,能夠將 IntegrationComponentSpec
實作設定為獨立的 @Bean
以簡化目標流程定義,以及更多特定協議的命名空間工廠。請隨時透過 StackOverflow、JIRA 和 GitHub issues 與我們聯繫,分享您的想法和建議!
專案頁面 | JIRA | Issues | [貢獻] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration
標籤)