領先一步
VMware 提供培訓和認證,以加速您的進度。
了解更多今天我們很高興宣布 Spring XD 的 1.0 M2 版本(下載)。Spring XD 是一個統一、分散式且可擴展的系統,用於資料擷取、即時分析、批次處理和資料匯出。該專案的目標是簡化大數據應用程式的開發。
Spring XD 的第二個里程碑版本引入了幾項新功能,讓您更容易擷取和處理即時資料串流,以及協調基於 Hadoop 的批次作業。在這篇部落格文章中,我們將介紹
在單節點模式下啟動 Spring XD ($XD_HOME/bin/xd-singlenode),並在另一個視窗中啟動 Shell。 以下範例示範如何建立一個簡單的串流,將透過 HTTP 發布的資料寫入檔案。 請注意,Shell 提供命令的 Tab 鍵完成提示。
$bin>./xd-shell
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name httpStream --definition "http | file"
xd:>tap create --name httpTap --definition "tap httpStream | counter"
xd:>http post --target https://127.0.0.1:9000 --data "helloworld"
您可以列出所有串流和 Tap 以驗證它們是否已建立
xd:>stream list
Stream Name Stream Definition
----------- -----------------
httpStream http | file
xd:>tap list
Tap Name Stream Name Tap Definition
-------- ----------- ------------------------
httpTap httpStream tap httpStream | counter
如果您檢查位於目錄 /tmp/xd/output/httpStream.out
中的檔案,您將會看到 hello world 訊息。
xd:>! cat /tmp/xd/output/httpStream.out
The httpTap is simply counting messages. To see the name of the counter created and its value, use the counter shell command
xd:>counter list
Counter name
------------
httpTap
xd:>counter display --name httpTap
1
在單節點模式下,計數器位於記憶體中,但也支援 Redis,這是未使用單節點模式時的預設值。 您可以使用 –analytics redis
命令列引數啟用 Redis 支援。
若要建立將資料儲存在 Hadoop 中的 Twitter 串流,以及推文中主題標籤頻率的即時計數器,請執行下列命令。 請注意,要取得consumerKey和consumerSecret您需要註冊一個 Twitter 應用程式。 如果您尚未設定,您可以在 Twitter Developers 網站上建立一個應用程式以取得這些憑證。
xd:> stream create bieberStream --definition "twittersearch --consumerKey=<your-key> --consumerSecret=<your-secret> --query=bieber | hdfs"
xd:> tap create --name bieberHashTap --definition "tap bieberStream | field-value-counter --fieldName=entities.hashTags.text --counterName=bieberHashCount"
xd:> hadoop config fs --namenode hdfs://127.0.0.1:8020
xd:> hadoop fs cat /xd/bieberStream/bieberStream-0.log
... see fun tweets here ...
xd:> fieldvaluecounter display --name bieberHashCount
FieldName=bieberHashCount
------------------------- - -----
VALUE - COUNT
mtvhottest | 57
MTVHottest | 31
MTVhottest | 10
mtvhottets | 3
MtvHottest | 2
MTVHott | 2
JustinBieber | 2
MTVH | 2
MTVHOTTEST | 2
KCAMEXICO | 1
BeliebersAreProudOfJustin | 1
MyBeliebers | 1
在計數器主題上,引入了一種新的聚合計數器類型,它將訊息中欄位的計數聚合到每年、每月、每日、每小時和每分鐘的時間 Bucket 中。
只需幾行 Shell 命令,您就取得了很大的成就! 請查看使用者指南,以取得有關所有 Shell 命令的詳細資訊。
到目前為止顯示的串流處理管線是線性的,但通常需要支援更複雜的流程。 為了開始處理這種情況,M2 中引入了具名管道。 您可以使用具名管道來代替來源或接收器模組。 為了與 Unix 主題保持一致,從特定管道來源/接收資料使用 `>’ 字元,並且名稱以 `:` 為前綴
這是一個範例,示範如何使用具名管道來共用由不同輸入來源驅動的資料管線。
xd:>stream create out --definition ":foo > file --name=demo"
xd:>stream create in1 --definition "http > :foo"
xd:>stream create in2 --definition "time > :foo"
xd:>http post --target https://127.0.0.1:9000 --data "hello"
查看輸出檔案
xd:>! cat /tmp/xd/output/demo.out
您將會看到 ‘hello’ 這個字與時間戳記值交錯在一起。 將訊息扇出到多個串流,以及支援根據訊息內容將訊息路由到不同串流的計畫將在未來的里程碑版本中推出。
同樣值得注意的是,我們增加了對 4 個 Hadoop 版本的支援
您可以透過傳入命令列選項 –hadoopDistribution
,來選擇啟動 XDContainer 時要使用的特定發行版本 JAR。 您應該也可以使用其他 Hadoop 發行版本,例如 Hadoop 1.2.x。 我們將在後續版本中新增其他發行版本的明確選項。 值得注意的是,有一個範例示範如何在 範例儲存庫中使用 Spring XD 與 Pivotal HD 的 HAWQ 功能。
M1 版本提供了本機和 Redis 佇列備份的傳輸,用於模組之間的通訊,以 DSL 中的管道符號表示。 M2 版本提供了對 Rabbit 備份傳輸的支援,讓您可以利用功能完善的訊息代理程式來擷取串流。
可以使用 Spring XD 執行批次作業,並設定觸發器來啟動這些作業。 作為範例,我們可以重複使用 Hadoop 中的經典 wordcount 範例,以提供具有兩個步驟的簡單工作流程協調。 第一個步驟是將檔案複製到 HDFS 中,第二個步驟是執行 wordcount MapReduce 作業。
若要執行該範例,請複製 spring-xd-samples 儲存庫並建置範例 batch-wordcount。 然後如下所示複製 JAR、設定和資料檔案。
$ cd batch-wordcount
$ mvn clean assembly:assembly
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/modules/job/* $XD_HOME/modules/job
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/lib/* $XD_HOME/lib
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/nietzsche-chapter-1.txt /tmp
現在停止並在單節點模式下重新啟動 Spring XD ($XD_HOME/bin/xd-singlenode)。 然後在 Shell 中執行以下命令
xd:> job create --name wordCountJob --definition "wordcount"
或者,也可以指定一個 cron 表達式,該表達式排程作業以供執行。 您可以透過檢視 Map Reduce 作業的輸出驗證結果
xd:> hadoop config fs --namenode hdfs://127.0.0.1:8020
xd:> hadoop fs cat /count/out/part-r-00000
也支援在工作流程中執行其他步驟,例如執行 Hive 或 Pig 腳本。 若要撰寫這些類型的工作流程,請參閱 Spring for Apache Hadoop 參考指南。 也支援非 Hadoop 的步驟。
下一個版本的主要主題是透過合併 Spring Batch Admin 專案的元件,來公開更多批次作業的管理功能。 您將能夠透過將訊息傳送到具名管道來觸發批次作業,以及從具名管道接收作業狀態通知。 這樣您就可以輕鬆地根據資料可用性來設定批次作業的觸發,例如
file --dir "/data/inbound" | jobParameterCreator > :wordCountJob
當檔案在目錄 /data/inbound
中可用時,將透過將訊息傳送到具名管道 :wordCountJob.. 啟動 wordcount 批次作業。 執行批次作業時,將有一個資料串流可供您使用訊息,其中包含有關 JobExecution、StepExecution 等的資料。
:wordCountJob.notifications > filter --expression "payload.status.equals('COMPLETED')" | email --address "[email protected]"
使用管道在串流和作業之間交換資料是您可以開始看到 Spring XD 如何採取步驟來統一串流和批次處理這兩個領域的領域之一。 敬請期待!