Spring Integration Java DSL 1.1 M1 版本已釋出

發布 | Artem Bilan | 2015 年 4 月 15 日 | ...

親愛的 Spring 社群:

我們很高興宣布 Spring Integration Java DSL 1.1 Milestone 1 版本現已推出。請使用 Milestone Repository 與 Maven 或 Gradle,以搶先體驗。

compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"

老實說,`1.1` 版本中許多計畫的功能尚未實作,但感謝 鼓勵 來自我們的好友 Josh Long,以及最近關於 Apache Kafka 支援的公告 (Spring Integration Kafka Support 1.1 Release, Spring XD 1.1.1 Release),我們發布此 Milestone 1 主要目的是展示 Java Configuration DSL 中的 Apache Kafka 支援。

我們將在本篇文章中探討該版本和其他功能。

Apache Kafka 支援

讓我們從 Spring Integration Java DSL 中 KafkaTests 類別的一些「瑣碎」範例開始

@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
        return new DefaultConnectionFactory(
                new ZookeeperConfiguration(zookeeper.connectString()));
}

@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
        MetadataStoreOffsetManager offsetManager =
                           new MetadataStoreOffsetManager(connectionFactory);
        // start reading at the end of the
       offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
       return offsetManager;
}

@Bean
public IntegrationFlow listeningFromKafkaFlow(
                    ConnectionFactory connectionFactory,
                    OffsetManager offsetManager) {
     return IntegrationFlows
         .from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
                  .autoCommitOffset(false)
		  .payloadDecoder(String::new)
		  .keyDecoder(b -> Integer.valueOf(new String(b)))
		  .configureListenerContainer(c ->
				c.offsetManager(offsetManager)
						.maxFetch(100)))
         .<String, String>transform(String::toUpperCase)
         .channel(c -> c.queue("listeningFromKafkaResults"))
         .get();
}
  • EmbeddedZookeeper 是 Apache Kafka test artifact 的一部分 (在我們的例子中是 testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test'),並且與許多其他功能 (例如 kafka.utils.TestUtils) 一起,對於單元測試非常有用。
  • 有關 ConnectionFactoryOffsetManager 的更多資訊,請參閱 Spring Integration Kafka 專案。
  • 上述設定中最重要的部分是 IntegrationFlow bean 定義。 Spring Integration Java DSL 提供了一個命名空間 factory - Kafka - 它利用 IntegrationComponentSpec 實現 Spring Integration Kafka 配接器,例如用於 KafkaMessageDrivenChannelAdapterKafkaMessageDrivenChannelAdapterSpec
  • 作為建構者模式的一個範例,spec 僅將來自 method-chain 的選項委派給底層的 KafkaMessageDrivenChannelAdapter 實例。
  • 對於那些像我一樣不熟悉 Scala (Apache Kafka 是用 Scala 撰寫的) 的人,請注意 .payloadDecoder(String::new) 行。 kafka.serializer.Decoder 是一個 Scala trait,它被編譯成 Java 介面 (不是類別!),因此我們可以在這裡將它表示為 Java 8 lambda 方法。
  • .configureListenerContainer() 是一個 lambda 感知方法,用於分離 KafkaMessageListenerContainer 特定選項的考量。

來自 Kafka 命名空間 factory 的其他自我解釋的 factory-methods 是用於 KafkaHighLevelConsumerMessageSource 輪詢配接器的 .inboundChannelAdapter(...) 和用於 KafkaProducerMessageHandler.outboundChannelAdapter(...)。 有關更多資訊,請參閱它們的 JavaDoc。

如需更多資訊,請查看 Josh Long 關於 使用 Apache Kafka 進行 Spring 整合和資料處理管道 的文章!

POJO 方法調用

來自社群的大量意見反應 (網路研討會重播:Spring Integration Java DSL 簡介) 是關於 bean 方法調用元件 (services、transformers、routers 等),並且我們清楚地聽到了您的聲音:元件方法選擇已獲得改進。 這是一個範例,它類似於 XML 設定中的 <int:service-activator input-channel="greetingChannel" ref="greetingService"/>


@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {

	@Autowired
	private GreetingService greetingService;

	@Bean
	public IntegrationFlow greetingFlow() {
		return IntegrationFlows.from("greetingChannel")
				.handle(this.greetingService)
				.get();
	}

}

@Component
public class GreetingService {

   public void greeting(String payload) {
        System.out.println("Hello " + payload);
   }
}

在此,greeting 方法將由框架自動選取。 另一種選擇是採用 methodName 引數,以在不明確的情況下指定方法。 類似的 POJO 方法調用 EIP-methods 已被引入到許多其他 EIP 實作中,例如 transform(Object service, String methodName), split(Object service) 等。

Spring Integration Java DSL 也尊重 Spring Integration messaging annotations,例如 @ServiceActivator, @Router, @Filter 等,甚至還有 @Payload, @Header。 有關更多資訊,請參閱 IntegrationFlowDefinition JavaDoc。

IntegrationFlowAdapter

作為 IntegrationFlow 是一個介面,我們可以只提供它的直接實作作為自訂元件,並且它可以在 Spring Integration Java DSL 環境中按原樣工作,這並不奇怪

@Component
public class MyFlow implements IntegrationFlow {

	@Override
	public void configure(IntegrationFlowDefinition<?> f) {
		f.<String, String>transform(String::toUpperCase);
	}

}

這類似於 @Bean 定義,但此方法有助於我們的元件保持更鬆散的耦合。

但是,等等,還有更多! IntegrationFlow 實作 (如 @Bean 定義案例中的 lambdas) 僅限於 DirectChannel 輸入通道。 我們在此更進一步,並引入了 IntegrationFlowAdapter。 這是我的最愛範例,用於示範如何使用它

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

        private final AtomicBoolean invoked = new tomicBoolean();

        public Date nextExecutionTime(TriggerContext triggerContext) {
              return this.invoked.getAndSet(true) ? null : new Date();
       }

       @Override
       protected IntegrationFlowDefinition<?> buildFlow() {
          return from(this, "messageSource",
                        e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                   .split(this)
		   .transform(this)
		   .aggregate(a -> a.processor(this, null), null)
		   .enrichHeaders(Collections.singletonMap("foo", "FOO"))
		   .filter(this)
		   .handle(this)
		   .channel(c -> c.queue("myFlowAdapterOutput"));
      }

      public String messageSource() {
	       return "B,A,R";
      }

      @Splitter
      public String[] split(String payload) {
           return StringUtils.commaDelimitedListToStringArray(payload);
      }

      @Transformer
      public String transform(String payload) {
           return payload.toLowerCase();
      }

      @Aggregator
      public String aggregate(List<String> payloads) {
             return payloads.stream().collect(Collectors.joining());
      }

      @Filter
      public boolean filter(@Header Optional<String> foo) {
              return foo.isPresent();
      }

      @ServiceActivator
      public String handle(String payload, @Header String foo) {
             return payload + ":" + foo;
      }

}

當然,透過 POJO 方法調用支援 (請參閱上文),將無法如此輕易地建構流程。

動態語言 (腳本) 支援

Spring FrameworkSpring Integration 長期以來一直支援動態語言,並且它主要與 XML Spring 設定連結。 從 Java 程式碼處理腳本 (例如 Groovy、Ruby、JavaScript 等) 可能看起來很奇怪,但我們發現它是用於在執行階段重新載入功能的一個有用工具,並且當 Java lambas 不夠動態時。 讓我們看看 Spring Integration Java DSL 中的 Scripts 命名空間 factory

@Configuration
@EnableIntegration
public class ScriptsConfiguration {

	@Value("com/my/project/integration/scripts/splitterScript.groovy")
	private Resource splitterScript;

	@Bean
	public PollableChannel results() {
		return new QueueChannel();
	}

	@Bean
	public IntegrationFlow scriptSplitter() {
		return f -> f
        	             .split(Scripts.script(this.splitterScript)
                                              .refreshCheckDelay(10000)
                                              .variable("foo", "bar"))
                             .channel(results());
	}

}

此腳本支援允許我們僅處理外部資源,這些資源可以在執行階段變更和重新載入。 Spring Integration Scripting 模組支援的 inline 腳本沒有意義,因為我們有 Java 8 lambdas 可用於這些情況。

Inline WireTap

Wire Tap EI PatternSpring Integration 中實作為 ChannelInterceptor,並且可以像這樣注入到任何 MessageChannel 中作為攔截器

@Bean
public MessageChannel myChannel() {
     return MessageChannels.direct()
                .interceptor(new WireTap(loggerChannel()))
                .get();
}

IntegrationFlow 定義允許我們省略 EIP 元件之間的 MessageChannel 宣告,因此我們引入了一個 inline .wireTap() EIP-method,以允許對這些匿名通道進行 WireTap 注入。 以下是一些範例

@Bean
public IntegrationFlow wireTapFlow1() {
	return IntegrationFlows.from("tappedChannel1")
		.wireTap("tapChannel",
                         wt -> wt.selector(m -> m.getPayload().equals("foo")))
		.channel("nullChannel")
		.get();
}

@Bean
public IntegrationFlow wireTapFlow2() {
	return f -> f
		.wireTap(sf -> sf
			.<String, String>transform(String::toUpperCase)
			.channel(c -> c.queue("wireTapSubflowResult")))
		.channel("nullChannel");
}

有關更多資訊,請參閱 IntegrationFlowDefinition.wireTap() 方法 JavaDoc,並且不要錯過 GitHub 專案頁面上的我們的測試案例。

總結

針對 1.1 版本,我們還有很多工作要做,例如進一步簡化 .aggregate()、等設定,新增注入外部子流程的能力,能夠將 IntegrationComponentSpec 實作設定為獨立的 @Bean 以簡化目標流程定義,以及更多特定協議的命名空間工廠。請隨時透過 StackOverflow、JIRA 和 GitHub issues 與我們聯繫,分享您的想法和建議!

專案頁面 | JIRA | Issues | [貢獻] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration 標籤)

獲取 Spring 電子報

持續關注 Spring 電子報

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進制檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部