Spring XD 1.0 Milestone 1 版本發佈

工程 | Mark Pollack | 2013 年 6 月 12 日 | ...

今天我們很高興地宣布 Spring XD 1.0 M1 版本的發佈  (下載)。Spring XD 是一個統一、分散式和可擴展的系統,用於資料擷取、即時分析、批次處理和資料匯出。  該專案的目標是簡化大數據應用程式的開發。

從較高的角度來看,大數據應用程式與企業整合和批次應用程式具有許多共同特徵。  Spring 透過 Spring Integration 和 Spring Batch 專案,已為構建整合和批次應用程式提供了超過 6 年的驗證解決方案。  Spring XD 以此為基礎,並提供了一個輕量級的執行時環境,可以透過簡單的 DSL 輕鬆配置和組裝。

在本部落格中,我們將介紹 Spring XD 的主要元件,即 Streams、Jobs、Taps、Analytics 和用於宣告它們的 DSL,以及執行時架構。  更多詳細資訊可以在 XD 指南中找到。

Streams

Stream 定義了如何收集、處理和儲存或轉發資料。  例如,stream 可以收集 syslog 資料、過濾它,並將其儲存在 HDFS 中。  Spring XD 提供了一個 DSL 來定義 stream。  DSL 允許您使用 UNIX pipes-and-filters 語法開始簡單的線性處理流程,但也可以使用擴展的語法來描述更複雜的流程。

Sources 和 Sinks

一個簡單的線性 stream 包含以下順序:輸入 Source、(可選) 處理步驟和輸出 Sink。  作為一個簡單的範例,考慮從 HTTP Source 收集資料並寫入 File Sink。 描述此 stream 的 DSL 是
http | file

您透過向預設在 8080 埠執行的 XD Admin Server 發出 HTTP 請求來告訴 Spring XD 建立一個 stream。  在 M2 版本中,我們將提供一個互動式 shell 來與 XD 通訊,但對於 M1 來說,與 XD 互動最簡單的方法是使用 'curl'。

curl -d "http | file" https://127.0.0.1:8080/streams/httptest

stream 的名稱是 httptest,要監聽的預設 HTTP 埠是 9000,預設檔案位置是 /tmp/xd/output/${streamname}

如果您使用 curl 在 9000 埠上發佈一些資料
curl -d "hello world" https://127.0.0.1:9000

您將在檔案 /tmp/xd/output/httptest 中看到字串 hello world

要變更預設值,您可以傳入選項引數

http --port=9090 | file --dir=/var/streams --name=data.txt

M1 中支援的 sources 包括 file、time、HTTP、Tail、Twitter Search、Gemfire (Continuous Queries)、Gemfire (Cache Event)、Syslog 和 TCP。  支援的 sinks 包括 Log、File、HDFS、 Gemfire Distributed Data Grid 和 TCP。  要將 syslog 資料擷取到 HDFS,DSL 非常簡單

syslog | hdfs --namenode="http://192.168.1.100:9000"

您也可以新增您自己的自訂 sources 和 sinks。  可以按照 簡單配方新增 Spring Integration 中現有的 Inbound 和 Outbound Channel Adapters。  未來的版本將新增對 MQTT、RabbitMQ、JMS 和 Kafka 的支援。  我們很樂意收到 pull request 來貢獻您偏好的 source 和 sink 模組。

Stream 的程式設計模型基於 Spring Integration。  Input Sources 將外部資料轉換為 Message,該 Message 包含包含鍵值對的標頭和可以是任何 Java 類型的 payload。 Message 透過 Message Channels 在 stream 中流動。 下圖顯示了具有 Input Source、Processing Step 和 Output Sink 的 stream。

Processors

下圖顯示了一個包含多個處理步驟的 stream。  處理步驟全部透過 Channels 連接在一起。

在 DSL 中,pipe 符號對應於將資料從每個處理步驟傳遞到下一個步驟的 channel。  Spring XD 中的 channels 可以是記憶體中的,也可以由中介軟體(例如 Redis、JMS、RabbitMQ 等)支援。  這允許一個簡單的分散式處理模型,我們將在稍後討論。

表示具有處理步驟的 stream 的 DSL 表達式採用以下形式

source | filter | transform | sink

M1 中支援的 processors 包括 filter、transformer、json-field-extractor、json-field-value-filter 和 script。  filter 和 transformer processors 支援使用 Spring Expression Language  (SpEL) 以及 Groovy。  要在先前的範例中使用 SpEL 將 HTTP 請求的 payload 轉換為大寫,

http | transform --expression=payload.toUpperCase() | file

script processor 還允許您執行自訂 Groovy 程式碼。

Taps

Tap 允許您「監聽」來自另一個 stream 的資料,並在單獨的 stream 中處理資料。 原始 stream 不受 tap 的影響,並且不知道它的存在,類似於電話竊聽。  WireTaps 是 EAI 模式標準目錄的一部分,並且是 Spring XD 使用的 Spring Integration framework 的一部分。

tap 可以從目標 stream 的處理管道中的任何點消耗資料。 例如,如果您有一個名為 mystream 的 stream,定義為
source | filter | transform | sink

您可以使用 DSL 建立 tap

tap mystream.filter | sink2

這將在套用 filter 之後但在 transformer 之前點擊 stream 的資料。 因此,未轉換的資料將發送到 sink2。

例如,如果您使用以下命令建立一個名為 httpstream 的 stream

curl  -d "http --port=9898 | filter --expression='payload.length() > 5'
                           | transform --expression=payload.toUpperCase()
                           | file"  https://127.0.0.1:8080/streams/httpstream

然後,若要在 stream 上建立一個名為 httptap 的 tap,該 tap 將過濾的資料 stream 寫入單獨的檔案,請使用以下命令

curl -d "tap httpstream.filter | file --dir=/tmp --name=filtered.txt" https://127.0.0.1:8080/streams/httptap

發佈諸如

curl -d "hello world" https://127.0.0.1:9898
curl -d "he" https://127.0.0.1:9898
curl -d "hello world 2" https://127.0.0.1:9898

將在檔案 /tmp/xd/output/httpstream 中產生 HELLO WORLD 和 HELLO WORLD 2,並在 /tmp/filtered.txt 中產生小寫等效項。 文字 'he' 不會出現在任何一個檔案中。

一個主要的使用案例是在透過其主要 stream 擷取資料的同時執行即時分析。 例如,考慮一個消耗 Twitter 搜尋結果並將其寫入 HDFS 的資料 Stream。 可以在將資料寫入 HDFS 之前建立一個 tap,並將資料從 tap 傳輸到一個計數器,該計數器對應於在推文中提及特定 hashtag 的次數。

分析

詢問 10 位開發人員「即時分析」是什麼,您將得到 20 個答案。  答案範圍從非常簡單(但非常有用)的計數器,到移動平均值,到彙總計數器,到直方圖,到時間序列,到機器學習演算法,再到嵌入式 CEP 引擎。  Spring XD 旨在支援廣泛的這些指標和分析資料結構,作為一個通用的類別庫,可與多種後端儲存技術協同工作。  它們也作為 Sink 的一種型別暴露給 XD,以用於 DSL 表達式中。

在 M1 版本中,支援 Counter、Field Value Counter、Gauge 和 Rich Gauge。 這些指標可以儲存在記憶體中或 Redis 中。  請參閱 JavaDocs 和  使用者指南的分析部分,以取得更多詳細資訊,以及未來版本中將實作的內容的清單。

作為一個範例,考慮收集推文 stream 中 hashtag 頻率的即時計數的情況。  若要使用 SpringXD 執行此操作,請建立一個使用 twitter search source 模組的新 stream 定義,並將其命名為 'spring'

curl -d "twittersearch --query='spring' --consumerKey=<consumer-key> --consumerSecret=<consumer-secret>
           | file" https://127.0.0.1:8080/streams/spring

這會將推文儲存在本機檔案系統中。  注意,要獲得consumerKeyconsumerSecret您需要註冊一個 twitter 應用程式。 如果您尚未設定,可以在 Twitter 開發人員網站上建立一個應用程式來取得這些認證。

接下來,在 twittersearch source 的輸出上建立一個名為 'springtap' 的 tap,以計算推文中 hashtag 的頻率。

curl -d "tap spring.twittersearch | field-value-counter
                                     --fieldName=entities.hashTags.text
                                     --counterName=hashTagFrequency" https://127.0.0.1:8080/streams/springtap

欄位 entities.hashTags.text 是 JSON 表示中 Spring Social Tweet 物件中 hashtag 的路徑,該物件用於底層實作中。  若要檢視前 5 個 hashtag,請使用 redis-cli 來檢視名為 fieldvaluecounters.hashTagFrequency. 的排序集合的內容。請注意,通常需要幾分鐘的時間才能收集到足夠的具有 hashtag 實體的推文。

> redis-cli
redis 127.0.0.1:6379>ZREVRANGEBYSCORE fieldvaluecounters.hashTagFrequency +inf -inf WITHSCORES LIMIT 0 5

1] "spring"
2] "6"
3] "Turkey"
4] "6"
5] "Arab"
6] "6"
7] "summer"
8] "3"
9] "fashion"
10] "3"

架構

Spring XD 有兩種操作模式 - 單節點和分散式。 第一個是處理所有處理和管理的單一程序。 這種模式可協助您輕鬆入門,並簡化應用程式的開發和測試。 分散式模式允許將處理任務分散到機器叢集上,並且管理伺服器會傳送命令來控制在叢集上執行的處理任務。

M1 版本中的分散式架構很簡單。  stream 的每個部分(稱為模組)都可以在其自己的容器實例中執行。  資料使用 Redis 佇列在模組之間傳遞。  請參閱 架構部分以取得更多詳細資訊。  此版本的重點是正確抽象,例如使 DSL 中的 pipe 符號可以跨各種傳輸方式進行外掛。  其他傳輸方式和效能改進將在未來的版本中提供,並在 Hadoop 叢集中執行。

更多內容即將推出

這篇文章未涵蓋的其他主題包括 Tuple 資料結構的介紹,以及如何建立自訂處理器。 下一個版本的重要部分將是支援 XDContainer 執行 Spring Batch 工作。 這些工作可用於協助將資料從 HDFS 匯出到關聯式資料庫,以及協調 Hadoop 工作的執行,無論是 MapReduce、Pig、Hive 或 Cascading 工作,都在叢集上進行。 我們還將提供額外的指標庫,例如彙總計數器、基於 HTTP/JMX 的管理,以及一些基於 Reactor 專案的高效能來源,敬請期待!

在我們繼續努力完成最終 Spring XD 1.0.0 版本之際,我們很樂意聽取您的回饋意見。 如果您有任何疑問,請使用 Stackoverflow(標籤:springxd),要報告任何錯誤或改進,請使用 Jira Issue Tracker 或提交 GitHub issue

訂閱 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進制檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部