領先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多在本系列中,到目前為止,我們已介紹了新的基於 Java 函數的串流應用程式,以及函數組合。我們也提供了如何從供應器建立來源和從消費者建立 Sink 的詳細範例。在這裡,我們將繼續旅程,進行後續一系列案例研究中的第一個。每個案例研究都示範如何在各種情境中使用一個或多個可用的預先封裝 Spring Boot 串流應用程式,以建置資料串流管線。
今天,我們將展示兩個最常用的應用程式:HTTP 來源和 JDBC Sink。我們將使用它們來建置一個簡單的服務,該服務接受 HTTP POST 請求並將內容儲存到資料庫表格中。我們將首先將這些應用程式作為獨立的 Spring Cloud Stream 應用程式執行,然後展示如何使用 Spring Cloud Data Flow 來協調相同的管線。這將以逐步教學的方式呈現,我們鼓勵您在閱讀時按照步驟操作。
這個簡單的串流應用程式由兩個透過訊息 Broker 通訊的遠端程序組成。預先封裝的串流應用程式可與 Apache Kafka 或 RabbitMQ 搭配使用。在這裡,我們將使用 Apache Kafka。JDBC Sink 將資料插入資料庫。在本範例中,我們將使用 MySQL。
假設我們從頭開始,且開發環境中沒有可用的 Kafka 或 MySQL。為了執行此範例,我們將使用 Docker 來玩一下。因此,我們需要在本機上執行 Docker。稍後我們將使用 Spring Cloud Data Flow,因此我們將利用 Data Flow 的 docker-compose 安裝。這是開始使用 Data Flow 的最簡單方法。它會啟動多個容器,包括 MySQL 和 Kafka。為了使這些後端服務可供獨立應用程式使用,我們需要調整標準安裝以發布端口,並變更 Kafka 的宣告主機名稱。
注意
我已在 Mac OS 上執行此設定,並預期類似的設定也適用於 Windows。如果您遇到問題或有任何實用技巧要分享,請在評論區留言。
首先,讓我們建立一個名為 http-jdbc-demo
的目錄,並從 github 下載 docker-compose.yml
到該目錄
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml
或
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml -o docker-compose.yml
為了能夠從本機主機連線到 Kafka 和 MySQL,我們將下載另一個 YAML 位元來覆蓋或自訂設定。
wget -O shared-kafka-mysql.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/shared-kafka-mysql.yml
接下來,我們需要取得本機的 LAN IP 位址。在 Mac 上,您可以透過幾種方式執行此操作,例如
dig +short $(hostname)
或
ping $(hostname)
LAN IP 位址也可以從 Docker 容器存取,而容器內部的 localhost
或 127.0.0.1
則指向自身。我們需要將環境變數 KAFKA_ADVERTISED_HOST_NAME
設定為此值。我們還需要設定其他幾個環境變數
export KAFKA_ADVERTISED_HOST_NAME=$(dig +short $(hostname))
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
並在 Data Flow 中註冊最新的串流應用程式
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
現在,從我們的專案目錄中,我們可以啟動 Data Flow 叢集
docker-compose -f docker-compose.yml -f shared-kafka-mysql.yml up
這將顯示大量記錄訊息並持續執行,直到您終止它(例如,Ctrl-C),這將停止所有容器。保持此終端機開啟。
開啟新的終端機並輸入
docker ps
這將列出 Data Flow 叢集的執行中容器。我們稍後會查看 Data Flow。此時,請確保 dataflow-kafka
容器在 PORTS
下顯示 0.0.0.0:9092→9092/tcp
,且 dataflow-mysql
也類似地顯示 0.0.0.0:3306→3306/tcp
。
我們可以設定 JDBC Sink 應用程式自動初始化資料庫,但為了簡化,我們將手動建立它。我們可以使用任何 JDBC 資料庫工具,或從 dataflow-mysql
容器中執行 mysql
來執行此操作
docker exec -it dataflow-mysql mysql -uroot -p
系統會提示您輸入密碼。DB 認證已在 docker-compose.yml 中設定。如果您不想在那裡尋找,使用者名稱是 root
,密碼是 rootpw
。
輸入以下命令 - 您應該可以複製並貼上整個內容 - 以建立資料庫和表格。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
PRIMARY KEY (id));
輸入 exit;
以退出。
此時,我們已準備好執行 HTTP 來源和 JDBC Sink。Spring Boot 可執行 jar 已發布到 Spring Maven 儲存庫。我們需要使用 Kafka Binder 建置的 jar
wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/http-source-kafka/3.0.0-SNAPSHOT/http-source-kafka-3.0.0-SNAPSHOT.jar
wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/jdbc-sink-kafka/3.0.0-SNAPSHOT/jdbc-sink-kafka-3.0.0-SNAPSHOT.jar
我們將在不同的終端機工作階段中執行這些 jar。我們需要設定這些應用程式使用相同的 Kafka Topic,我們稱之為 jdbc-demo-topic
。Spring Cloud Stream Kafka Binder 將自動建立此 Topic。我們還需要設定 JDBC Sink 以連線到我們的資料庫,並將資料對應到我們建立的表格。我們將發布如下所示的 JSON
{
“name”:”My Name”,
“address”: {
“street”:”My Street”,
“city”: “My City”
}
}
我們希望將這些值插入 Demo
資料庫的 People
表格中,欄位為 name
、street
和 city
。
開啟我們下載 jar 的新終端機工作階段並執行
java -jar jdbc-sink-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://127.0.0.1:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.consumer.table-name=People --jdbc.consumer.columns=name,city:address.city,street:address.street --spring.cloud.stream.bindings.input.destination=jdbc-demo-topic
請注意將欄位對應到欄位的 jdbc.consumer.columns
語法。
開啟我們下載 jar 的新終端機工作階段並執行
java -jar http-source-kafka-3.0.0-SNAPSHOT.jar --server.port=9000 --spring.cloud.stream.bindings.output.destination=jdbc-demo-topic
在這裡,我們將來源的 HTTP 端口設定為 9000(預設為 8080)。此外,非常重要的是,來源的輸出目的地必須與 Sink 的輸入目的地相符。
接下來,我們需要將一些資料發布到 https://127.0.0.1:9000。
curl https://127.0.0.1:9000 -H'Content-Type:application/json' -d '{"name":"My Name","address":{"street":"My Street","city":"My City"}}}
再次,找到一個開啟的終端機工作階段並
docker exec -it dataflow-mysql mysql -uroot -p
使用 rootpw
登入並查詢表格
如果您看到此訊息,恭喜!獨立的 Spring Cloud Stream 應用程式已如預期般運作。我們現在可以終止我們的獨立應用程式 (Ctrl-C)。保持 docker-compose 程序執行,以便我們可以查看 Data Flow。
正如我們所看到的,即使我們沒有編寫任何程式碼,在「裸機」上執行這些應用程式也需要許多手動步驟。這些步驟包括
自訂 docker-compose 設定,或在本機上安裝 Kafka 和 MySQL
使用 Maven URL 下載所需版本的串流應用程式(我們碰巧知道在這裡要使用哪些應用程式)
確保 Spring Cloud Stream 目的地繫結已正確設定,以便應用程式可以通訊
尋找並閱讀 文件 以取得設定屬性(我們已經這樣做了以準備此範例),並正確設定它們。
管理多個終端機工作階段
在以下章節中,我們將看到使用 Spring Cloud Data Flow 執行此操作可消除所有這些步驟,並提供更豐富的整體開發體驗。
若要開始使用,請開啟 Data Flow 儀表板 https://127.0.0.1:9393/dashboard。這會將您帶到「應用程式」檢視,我們可以在其中看到已註冊的預先封裝應用程式。我們稍早執行的 docker-compose 命令已執行此步驟,使用我們提供的 URL 來取得串流應用程式的最新快照版本,包括我們剛執行的相同 jar 檔案。
在儀表板中,從左側選單選擇 Streams
,然後按一下 Create Streams
以開啟圖形化串流編輯器。
將 HTTP 來源和 JDBC Sink 拖放到編輯器窗格中,然後使用滑鼠連線兩個控制點。或者,您可以直接在頂端的文字方塊中輸入 Data Flow 串流定義 DSL:http | jdbc
。
接下來,我們需要設定應用程式。如果您按一下任一個應用程式,您會看到一個 Options
連結。開啟 JDBC Sink 的選項視窗。您將看到所有可用的設定屬性都列出,並附有簡短說明。以下螢幕截圖顯示部分檢視;我們需要捲動才能看到其餘部分。
與之前一樣,我們需要提供 URL、使用者名稱、密碼、表格和欄位。在這裡,我們需要將 JDBC URL 變更為 jdbc:mariadb://mysql:3306/Demo
,因為主機名稱 mysql
對應於 docker-compose.yml
中定義的 MySQL 服務名稱。此外,我們將 HTTP 端口設定為 20000
,因為它在已發布端口的設定範圍內。如需更多詳細資訊,請參閱 skipper-server 設定。
讓我們看一下自動產生的串流定義 DSL
http --port=20000 | jdbc --password=rootpw --username=root --url=jdbc:mariadb://mysql:3306/Demo --columns=name,city:address.city,street:address.street --table-name=People
此 DSL 可用於腳本或 Data Flow 用戶端應用程式,以自動化串流建立。我們的設定已完成,但 Spring Cloud Stream 目的地繫結在哪裡?我們不需要它們,因為 Data Flow 會為我們處理連線。
選取 Create Stream
按鈕,並將串流命名為 http-jdbc
。
若要部署串流,請按一下播放按鈕
接受預設部署屬性,然後按一下頁面底部的 Deploy stream
。
根據需要按一下 Refresh
按鈕。大約一分鐘後,您應該會看到我們的串流已部署。
在這裡,我們將發布一些不同的值到端口 20000
curl https://127.0.0.1:20000 -H'Content-Type:application/json' -d '{"name":"Your Name","address":{"street":"Your Street","city":"Your City"}}}'
當我們再次執行查詢時,我們應該會看到表格中新增了一列。
做得好!
精明的讀者會注意到,即使平台本身在容器中執行,也沒有為已部署的應用程式建立 Docker 容器。在 Data Flow 架構中,Skipper 伺服器負責部署串流應用程式。在本機設定中,Skipper 使用 Local Deployer 在其 localhost
上執行 jar 檔案,就像我們在獨立執行應用程式時所做的一樣。若要查看是否是這種情況,我們可以在 Skipper 容器中執行 ps
docker exec -it skipper ps -ef
若要查看主控台記錄,請使用 stdout
路徑
docker exec -it skipper more /tmp/1596916545104/http-jdbc.jdbc-v4/stdout_0.log
tail -f
命令也適用。
如果部署成功,也可以從 UI 查看應用程式記錄。
但如果部署失敗,我們可能需要深入了解以進行疑難排解。
注意
本機 Data Flow 安裝適用於本機開發和探索,但我們不建議用於生產環境。生產級 Spring Cloud Data Flow OSS 以及商業授權產品可用於 Kubernetes 和 Cloud Foundry。
我們剛剛仔細研究了如何從預先封裝的 Spring Cloud Stream 應用程式建置簡單的資料串流管線,以將透過 HTTP 發布的 JSON 內容儲存到關聯式資料庫中。我們使用了 Docker 和 docker-compose 來安裝本機環境,然後我們部署了來源和 Sink 應用程式,首先在「裸機」上,然後使用 Spring Cloud Data Flow。希望我們學到了一些關於使用 Spring Cloud Stream、Data Flow、Docker 容器、HTTP 來源和 JDBC Sink 的有趣知識。
在接下來的幾週內,我們將介紹更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每個案例研究都將探索不同的串流應用程式和功能。