1 個串流、2 個應用程式和 3 個 Spring Cloud Data Flow 的依賴項目

工程 | Josh Long | 2016 年 4 月 5 日 | ...

我只想在這裡記錄一個昨天讓我微笑的經驗:讓快速改進的 Spring Cloud Data Flow 在幾分鐘內從 (Spring Boot) 啟動器變成服務!

唯一的前提是您需要有一個正在運行的 Redis 實例。我的 Redis 實例在 127.0.0.1 上運行,並且不需要進一步的配置,Spring Boot 就能找到並與之協同工作。

我們將使用強大的 Spring Initializr 來快速生成我們的應用程式。還記得那些愚蠢的 Apple 廣告嗎?「總有一個 App 適合你?」別管那個,總有一個核取方塊適合你! 讓我們看看你是否和我一樣喜歡這種體驗!

本地 Data Flow 伺服器

前往 Spring Initializr 並選擇 Local Data Flow Server 並命名構件為 df-server。這將用於啟動本地 Data Flow 服務 - 一個 REST API 和一些持久性邏輯 - 來協調和儲存有關串流和任務的資訊。在舊的 Spring XD 世界中,這被稱為 Spring XD *Admin Server*。

在您選擇的 IDE 中打開專案,並將 @EnableDataFlowServer 添加到 DfServerApplication 類別

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;

@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DfServerApplication.class, args);
	}
}

df-server 專案的根目錄中運行 mvn spring-boot:run,應用程式將在端口 9393 上啟動。

提示:當您看到歡迎的 ASCII 藝術作品時,您就知道您(可能)已成功!

關於提示的提示:所以,這可能不是真的。它可能會因為各種原因而失敗(例如服務的端口衝突或嵌入式 H2 數據庫的端口衝突),但優質的 ASCII 藝術作品已被證明在(我的)研究中具有治療作用......(對...我)。

Data Flow Shell

前往 Spring Initializr 並選擇 Data Flow Shell 並將構件命名為 df-shell。這將用於啟動由 Spring Shell 提供支持的 Data Flow Shell。

Data Flow Shell 在任何作業系統上運行。它是我們剛才啟動的 Data Flow 服務的客戶端。它讓我們可以使用熟悉的管道和過濾器 DSL 和命令來操作服務。我喜歡好的橫幅 ASCII 藝術作品,但像下一個開發人員一樣,有*太多*好東西。默認情況下,Spring Shell *和* Spring Boot 都會嘗試發出 ASCII 橫幅,因此我們將 Spring Boot 排除在外(這次!)。在您選擇的 IDE 中打開專案,並將 @EnableDataFlowShell 添加到 DfShellApplication 類別,然後配置如何建立 SpringApplication 以隱藏 Spring Boot 橫幅

package com.example;

import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;

@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(DfShellApplication.class)
				.bannerMode(Banner.Mode.OFF)
				.run(args);
	}
}

df-shell 專案的根目錄中運行 mvn spring-boot:run。您應該能夠與默認情況下在本地運行的 Data Flow 伺服器進行交互。通過發布 module list 命令來嘗試它。您應該看到一個表格,其中包含 Spring Cloud Data Flow 已經知道的所有內置組件。

Logging Sink 模組

前往 Spring Initializr 並選擇 Stream Redis 並命名構件為 logging-sink。我們將使用 Spring Cloud Stream,它建立在 Spring 的 MessageChannel 抽象和 Spring Integration 中的組件模型之上,以簡潔地描述和整合基於消息的微服務,以構建一個*自定義*模組,用於記錄傳入的消息。然後,我們將使用 Spring Cloud Data Flow 部署和協調此模組。

Spring Cloud Data Flow 是一種強大的方式,可以使用小型 Spring Boot 驅動的模組來描述複雜的整合、批處理和串流處理工作負載。有幾種類型的 modulesource 產生數據,通常按照固定的時間表,下游組件可以消費和處理。processor 接收數據,對其進行處理,然後寫出數據。sink 僅僅接收數據,但不產生任何要發送的數據。這些組件可以很好地組合在一起,以描述任何種類的潛在的持續工作負載(物聯網傳感器數據、24/7 事件處理、在線事務數據接收和整合場景等)。最終,source *通常*是一個 Spring Integration inbound adapter。processor *通常*是任何種類的 Spring Integration 組件(例如轉換器),它接收數據並產生數據。sink *通常*是一個 Spring Integration outbound adapter。

task 描述了任何最終將停止的工作負載。它可能是一個簡單的 Spring Boot Command Line Runner 或一個 Spring Batch Job

Spring Cloud Data Flow 並不特別了解 Spring Integration。它只知道 Spring Cloud Stream 和眾所周知的 Spring MessageChannels,例如 inputoutput。它不關心這些通道的終端是什麼。Spring Cloud Data Flow 也不特別了解 Spring Batch。它只知道 Spring Cloud Task。

正如 UNIX sh Shell 環境允許我們通過在 stdinstdout 之間傳輸數據,從單一焦點的命令行實用程序組成任意數量和任意複雜的解決方案一樣,Spring Cloud Data Flow 也允許我們從單一焦點的消息傳輸組件組成任意數量和任意複雜的解決方案。

Spring Cloud Data Flow 已經包含了很多功能。我們將開發和安裝一個簡單的模組來記錄事物 - 在我們的例子中是時間。值得注意的是,我們將這樣做是為了我們自己的教益,但我們*不需要*這樣做;Spring Cloud Data Flow 已經提供了一個 log 模組!(和一個 time 模組!)

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.Map;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

	@MessageEndpoint
	public static class LoggingMessageEndpoint {

		@ServiceActivator(inputChannel = Sink.INPUT)
		public void logIncomingMessages(
				@Payload String msg,
				@Headers Map<String, Object> headers) {

			System.out.println(msg);
			headers.entrySet().forEach(e ->
					System.out.println(e.getKey() + '=' + e.getValue()));

		}
	}

	public static void main(String[] args) {
		SpringApplication.run(LoggingSinkApplication.class, args);
	}
}

這是一個簡單的 Spring Cloud Stream 綁定。Sink.class 是一個定義 MessageChannel input() 的介面。Spring Cloud Stream 將把它變成一個與消息代理(在這種情況下是 Redis,儘管 Spring Cloud Data Flow 的默認設置可能會在未來幾個月內更改為 RabbitMQ)的實時、命名的管道,我們的任何消息傳輸代碼都可以使用它。該示例使用 Spring Integration 來打印傳入的消息數據。讓我們首先向 Data Flow 註冊我們的自定義模組,然後構成一個從 time 組件獲取包含時間的傳入消息,然後記錄結果的串流。

首先,mvn clean install logging-sink 專案,以便它可以在本地 Maven 存儲庫中解析。Spring Cloud Data Flow 使用可插拔策略來解析自定義模組的實例。在我們的示例中,它將嘗試在我們的系統本地 Maven 存儲庫中解析它們。

返回 Data Flow Shell 並輸入以下內容

dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'

dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║    source    │   processor    │       sink        │  task   ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file          │bridge          │aggregate-counter  │timestamp║
║ftp           │filter          │cassandra          │         ║
║http          │groovy-filter   │counter            │         ║
║jdbc          │groovy-transform│custom-log         │         ║
║jms           │httpclient      │field-value-counter│         ║
║load-generator│pmml            │file               │         ║
║rabbit        │splitter        │ftp                │         ║
║sftp          │transform       │gemfire            │         ║
║tcp           │                │gpfdist            │         ║
║time          │                │hdfs               │         ║
║trigger       │                │jdbc               │         ║
║twitterstream │                │log                │         ║
║              │                │rabbit             │         ║
║              │                │redis              │         ║
║              │                │router             │         ║
║              │                │tcp                │         ║
║              │                │throughput         │         ║
║              │                │websocket          │         ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝

dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'

dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝

dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'

您將在 Data Flow 服務日誌中看到模組已啟動並連接在一起。在我的特定日誌中,我觀察到

2016-04-05 09:09:18.067  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.custom-log instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time

追蹤日誌以確認您已經在內心深處知道的事情:我們的自定義 logging-sink 正在工作!

tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*

下一步

前往雲端!我們正在使用本地 Data Flow 伺服器。還有其他可用于處理結構(如 Cloud Foundry)的實現。Cloud Foundry Data Flow Server 啟動應用程式實例,而不是本地 Java 進程。現在,構建可擴展的數據接收和處理串流就像 cf push ..cf scale -i $MOAR 一樣容易!

我們僅使用了 Spring Cloud Data Flow 的一小部分功能!使用 Spring Cloud Data Flow 來協調由 Spring Cloud Stream 提供支持的任意數量的基於消息的微服務。我建議看看 內置的 Spring Cloud Stream 模組 以獲取靈感。

獲取 Spring 新聞通訊

與 Spring 新聞通訊保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

獲取支持

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二進制文件。

了解更多

即將舉行的活動

查看 Spring 社區中所有即將舉行的活動。

查看全部