領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多今天我們很高興地宣布 Spring XD 1.0 M1 版本的發佈 (下載)。Spring XD 是一個統一、分散式和可擴展的系統,用於資料擷取、即時分析、批次處理和資料匯出。 該專案的目標是簡化大數據應用程式的開發。
從較高的角度來看,大數據應用程式與企業整合和批次應用程式具有許多共同特徵。 Spring 透過 Spring Integration 和 Spring Batch 專案,已為構建整合和批次應用程式提供了超過 6 年的驗證解決方案。 Spring XD 以此為基礎,並提供了一個輕量級的執行時環境,可以透過簡單的 DSL 輕鬆配置和組裝。
在本部落格中,我們將介紹 Spring XD 的主要元件,即 Streams、Jobs、Taps、Analytics 和用於宣告它們的 DSL,以及執行時架構。 更多詳細資訊可以在 XD 指南中找到。http | file
您透過向預設在 8080 埠執行的 XD Admin Server 發出 HTTP 請求來告訴 Spring XD 建立一個 stream。 在 M2 版本中,我們將提供一個互動式 shell 來與 XD 通訊,但對於 M1 來說,與 XD 互動最簡單的方法是使用 'curl'。
curl -d "http | file" https://127.0.0.1:8080/streams/httptest
stream 的名稱是 httptest
,要監聽的預設 HTTP 埠是 9000
,預設檔案位置是 /tmp/xd/output/${streamname}
。
curl -d "hello world" https://127.0.0.1:9000
您將在檔案 /tmp/xd/output/httptest
中看到字串 hello world
要變更預設值,您可以傳入選項引數
http --port=9090 | file --dir=/var/streams --name=data.txt
M1 中支援的 sources 包括 file、time、HTTP、Tail、Twitter Search、Gemfire (Continuous Queries)、Gemfire (Cache Event)、Syslog 和 TCP。 支援的 sinks 包括 Log、File、HDFS、 Gemfire Distributed Data Grid 和 TCP。 要將 syslog 資料擷取到 HDFS,DSL 非常簡單
syslog | hdfs --namenode="http://192.168.1.100:9000"
您也可以新增您自己的自訂 sources 和 sinks。 可以按照 簡單配方新增 Spring Integration 中現有的 Inbound 和 Outbound Channel Adapters。 未來的版本將新增對 MQTT、RabbitMQ、JMS 和 Kafka 的支援。 我們很樂意收到 pull request 來貢獻您偏好的 source 和 sink 模組。
Stream 的程式設計模型基於 Spring Integration。 Input Sources 將外部資料轉換為 Message,該 Message 包含包含鍵值對的標頭和可以是任何 Java 類型的 payload。 Message 透過 Message Channels 在 stream 中流動。 下圖顯示了具有 Input Source、Processing Step 和 Output Sink 的 stream。
在 DSL 中,pipe 符號對應於將資料從每個處理步驟傳遞到下一個步驟的 channel。 Spring XD 中的 channels 可以是記憶體中的,也可以由中介軟體(例如 Redis、JMS、RabbitMQ 等)支援。 這允許一個簡單的分散式處理模型,我們將在稍後討論。
表示具有處理步驟的 stream 的 DSL 表達式採用以下形式
source | filter | transform | sink
M1 中支援的 processors 包括 filter、transformer、json-field-extractor、json-field-value-filter 和 script。 filter 和 transformer processors 支援使用 Spring Expression Language (SpEL) 以及 Groovy。 要在先前的範例中使用 SpEL 將 HTTP 請求的 payload 轉換為大寫,
http | transform --expression=payload.toUpperCase() | file
script processor 還允許您執行自訂 Groovy 程式碼。
Tap 允許您「監聽」來自另一個 stream 的資料,並在單獨的 stream 中處理資料。 原始 stream 不受 tap 的影響,並且不知道它的存在,類似於電話竊聽。 WireTaps 是 EAI 模式標準目錄的一部分,並且是 Spring XD 使用的 Spring Integration framework 的一部分。
tap 可以從目標 stream 的處理管道中的任何點消耗資料。 例如,如果您有一個名為 mystream 的 stream,定義為source | filter | transform | sink
您可以使用 DSL 建立 tap
tap mystream.filter | sink2
這將在套用 filter 之後但在 transformer 之前點擊 stream 的資料。 因此,未轉換的資料將發送到 sink2。
例如,如果您使用以下命令建立一個名為 httpstream
的 stream
curl -d "http --port=9898 | filter --expression='payload.length() > 5'
| transform --expression=payload.toUpperCase()
| file" https://127.0.0.1:8080/streams/httpstream
然後,若要在 stream 上建立一個名為 httptap
的 tap,該 tap 將過濾的資料 stream 寫入單獨的檔案,請使用以下命令
curl -d "tap httpstream.filter | file --dir=/tmp --name=filtered.txt" https://127.0.0.1:8080/streams/httptap
發佈諸如
curl -d "hello world" https://127.0.0.1:9898
curl -d "he" https://127.0.0.1:9898
curl -d "hello world 2" https://127.0.0.1:9898
將在檔案 /tmp/xd/output/httpstream
中產生 HELLO WORLD 和 HELLO WORLD 2,並在 /tmp/filtered.txt
中產生小寫等效項。 文字 'he'
不會出現在任何一個檔案中。
一個主要的使用案例是在透過其主要 stream 擷取資料的同時執行即時分析。 例如,考慮一個消耗 Twitter 搜尋結果並將其寫入 HDFS 的資料 Stream。 可以在將資料寫入 HDFS 之前建立一個 tap,並將資料從 tap 傳輸到一個計數器,該計數器對應於在推文中提及特定 hashtag 的次數。
詢問 10 位開發人員「即時分析」是什麼,您將得到 20 個答案。 答案範圍從非常簡單(但非常有用)的計數器,到移動平均值,到彙總計數器,到直方圖,到時間序列,到機器學習演算法,再到嵌入式 CEP 引擎。 Spring XD 旨在支援廣泛的這些指標和分析資料結構,作為一個通用的類別庫,可與多種後端儲存技術協同工作。 它們也作為 Sink 的一種型別暴露給 XD,以用於 DSL 表達式中。
在 M1 版本中,支援 Counter、Field Value Counter、Gauge 和 Rich Gauge。 這些指標可以儲存在記憶體中或 Redis 中。 請參閱 JavaDocs 和 使用者指南的分析部分,以取得更多詳細資訊,以及未來版本中將實作的內容的清單。
作為一個範例,考慮收集推文 stream 中 hashtag 頻率的即時計數的情況。 若要使用 SpringXD 執行此操作,請建立一個使用 twitter search source 模組的新 stream 定義,並將其命名為 'spring'
curl -d "twittersearch --query='spring' --consumerKey=<consumer-key> --consumerSecret=<consumer-secret>
| file" https://127.0.0.1:8080/streams/spring
這會將推文儲存在本機檔案系統中。 注意,要獲得consumerKey和consumerSecret您需要註冊一個 twitter 應用程式。 如果您尚未設定,可以在 Twitter 開發人員網站上建立一個應用程式來取得這些認證。
接下來,在 twittersearch source 的輸出上建立一個名為 'springtap' 的 tap,以計算推文中 hashtag 的頻率。
curl -d "tap spring.twittersearch | field-value-counter
--fieldName=entities.hashTags.text
--counterName=hashTagFrequency" https://127.0.0.1:8080/streams/springtap
欄位 entities.hashTags.text
是 JSON 表示中 Spring Social Tweet 物件中 hashtag 的路徑,該物件用於底層實作中。 若要檢視前 5 個 hashtag,請使用 redis-cli 來檢視名為 fieldvaluecounters.hashTagFrequency.
的排序集合的內容。請注意,通常需要幾分鐘的時間才能收集到足夠的具有 hashtag 實體的推文。
> redis-cli
redis 127.0.0.1:6379>ZREVRANGEBYSCORE fieldvaluecounters.hashTagFrequency +inf -inf WITHSCORES LIMIT 0 5
1] "spring"
2] "6"
3] "Turkey"
4] "6"
5] "Arab"
6] "6"
7] "summer"
8] "3"
9] "fashion"
10] "3"
Spring XD 有兩種操作模式 - 單節點和分散式。 第一個是處理所有處理和管理的單一程序。 這種模式可協助您輕鬆入門,並簡化應用程式的開發和測試。 分散式模式允許將處理任務分散到機器叢集上,並且管理伺服器會傳送命令來控制在叢集上執行的處理任務。
M1 版本中的分散式架構很簡單。 stream 的每個部分(稱為模組)都可以在其自己的容器實例中執行。 資料使用 Redis 佇列在模組之間傳遞。 請參閱 架構部分以取得更多詳細資訊。 此版本的重點是正確抽象,例如使 DSL 中的 pipe 符號可以跨各種傳輸方式進行外掛。 其他傳輸方式和效能改進將在未來的版本中提供,並在 Hadoop 叢集中執行。這篇文章未涵蓋的其他主題包括 Tuple 資料結構的介紹,以及如何建立自訂處理器。 下一個版本的重要部分將是支援 XDContainer 執行 Spring Batch 工作。 這些工作可用於協助將資料從 HDFS 匯出到關聯式資料庫,以及協調 Hadoop 工作的執行,無論是 MapReduce、Pig、Hive 或 Cascading 工作,都在叢集上進行。 我們還將提供額外的指標庫,例如彙總計數器、基於 HTTP/JMX 的管理,以及一些基於 Reactor 專案的高效能來源,敬請期待!
在我們繼續努力完成最終 Spring XD 1.0.0 版本之際,我們很樂意聽取您的回饋意見。 如果您有任何疑問,請使用 Stackoverflow(標籤:springxd),要報告任何錯誤或改進,請使用 Jira Issue Tracker 或提交 GitHub issue。