案例研究:使用 HTTP 來源和 JDBC Sink 建置並執行串流應用程式

工程 | David Turanski | 2020 年 8 月 10 日 | ...

簡介

在本系列中,到目前為止,我們已介紹了新的基於 Java 函數的串流應用程式,以及函數組合。我們也提供了如何從供應器建立來源和從消費者建立 Sink 的詳細範例。在這裡,我們將繼續旅程,進行後續一系列案例研究中的第一個。每個案例研究都示範如何在各種情境中使用一個或多個可用的預先封裝 Spring Boot 串流應用程式,以建置資料串流管線。

今天,我們將展示兩個最常用的應用程式:HTTP 來源JDBC Sink。我們將使用它們來建置一個簡單的服務,該服務接受 HTTP POST 請求並將內容儲存到資料庫表格中。我們將首先將這些應用程式作為獨立的 Spring Cloud Stream 應用程式執行,然後展示如何使用 Spring Cloud Data Flow 來協調相同的管線。這將以逐步教學的方式呈現,我們鼓勵您在閱讀時按照步驟操作。

準備環境

這個簡單的串流應用程式由兩個透過訊息 Broker 通訊的遠端程序組成。預先封裝的串流應用程式可與 Apache Kafka 或 RabbitMQ 搭配使用。在這裡,我們將使用 Apache Kafka。JDBC Sink 將資料插入資料庫。在本範例中,我們將使用 MySQL。

application schematic

假設我們從頭開始,且開發環境中沒有可用的 Kafka 或 MySQL。為了執行此範例,我們將使用 Docker 來玩一下。因此,我們需要在本機上執行 Docker。稍後我們將使用 Spring Cloud Data Flow,因此我們將利用 Data Flow 的 docker-compose 安裝。這是開始使用 Data Flow 的最簡單方法。它會啟動多個容器,包括 MySQL 和 Kafka。為了使這些後端服務可供獨立應用程式使用,我們需要調整標準安裝以發布端口,並變更 Kafka 的宣告主機名稱。

注意

我已在 Mac OS 上執行此設定,並預期類似的設定也適用於 Windows。如果您遇到問題或有任何實用技巧要分享,請在評論區留言。

首先,讓我們建立一個名為 http-jdbc-demo 的目錄,並從 github 下載 docker-compose.yml 到該目錄

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml

curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml -o docker-compose.yml

為了能夠從本機主機連線到 Kafka 和 MySQL,我們將下載另一個 YAML 位元來覆蓋或自訂設定。

wget -O shared-kafka-mysql.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/shared-kafka-mysql.yml

接下來,我們需要取得本機的 LAN IP 位址。在 Mac 上,您可以透過幾種方式執行此操作,例如

dig +short $(hostname)

ping $(hostname)

LAN IP 位址也可以從 Docker 容器存取,而容器內部的 localhost127.0.0.1 則指向自身。我們需要將環境變數 KAFKA_ADVERTISED_HOST_NAME 設定為此值。我們還需要設定其他幾個環境變數

export KAFKA_ADVERTISED_HOST_NAME=$(dig +short $(hostname))
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0

並在 Data Flow 中註冊最新的串流應用程式

export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven

現在,從我們的專案目錄中,我們可以啟動 Data Flow 叢集

docker-compose -f docker-compose.yml -f shared-kafka-mysql.yml up

這將顯示大量記錄訊息並持續執行,直到您終止它(例如,Ctrl-C),這將停止所有容器。保持此終端機開啟。

開啟新的終端機並輸入

docker ps

這將列出 Data Flow 叢集的執行中容器。我們稍後會查看 Data Flow。此時,請確保 dataflow-kafka 容器在 PORTS 下顯示 0.0.0.0:9092→9092/tcp,且 dataflow-mysql 也類似地顯示 0.0.0.0:3306→3306/tcp

docker ps

建立資料庫表格

我們可以設定 JDBC Sink 應用程式自動初始化資料庫,但為了簡化,我們將手動建立它。我們可以使用任何 JDBC 資料庫工具,或從 dataflow-mysql 容器中執行 mysql 來執行此操作

docker exec -it dataflow-mysql mysql -uroot -p

系統會提示您輸入密碼。DB 認證已在 docker-compose.yml 中設定。如果您不想在那裡尋找,使用者名稱是 root,密碼是 rootpw

輸入以下命令 - 您應該可以複製並貼上整個內容 - 以建立資料庫和表格。

CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
	 id INT NOT NULL AUTO_INCREMENT,
	 name VARCHAR(255) NOT NULL,
	 street VARCHAR(255) NOT NULL,
	 city VARCHAR(255) NOT NULL,
	 PRIMARY KEY (id));

輸入 exit; 以退出。

create database

執行應用程式

此時,我們已準備好執行 HTTP 來源和 JDBC Sink。Spring Boot 可執行 jar 已發布到 Spring Maven 儲存庫。我們需要使用 Kafka Binder 建置的 jar

wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/http-source-kafka/3.0.0-SNAPSHOT/http-source-kafka-3.0.0-SNAPSHOT.jar

wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/jdbc-sink-kafka/3.0.0-SNAPSHOT/jdbc-sink-kafka-3.0.0-SNAPSHOT.jar

我們將在不同的終端機工作階段中執行這些 jar。我們需要設定這些應用程式使用相同的 Kafka Topic,我們稱之為 jdbc-demo-topic。Spring Cloud Stream Kafka Binder 將自動建立此 Topic。我們還需要設定 JDBC Sink 以連線到我們的資料庫,並將資料對應到我們建立的表格。我們將發布如下所示的 JSON

{
 “name”:”My Name”,
 “address”: {
      “street”:”My Street”,
       “city”: “My City”
    }
}

我們希望將這些值插入 Demo 資料庫的 People 表格中,欄位為 namestreetcity

啟動 JDBC Sink

開啟我們下載 jar 的新終端機工作階段並執行

java -jar jdbc-sink-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://127.0.0.1:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.consumer.table-name=People --jdbc.consumer.columns=name,city:address.city,street:address.street --spring.cloud.stream.bindings.input.destination=jdbc-demo-topic

請注意將欄位對應到欄位的 jdbc.consumer.columns 語法。

啟動 HTTP 來源

開啟我們下載 jar 的新終端機工作階段並執行

java -jar http-source-kafka-3.0.0-SNAPSHOT.jar --server.port=9000 --spring.cloud.stream.bindings.output.destination=jdbc-demo-topic

在這裡,我們將來源的 HTTP 端口設定為 9000(預設為 8080)。此外,非常重要的是,來源的輸出目的地必須與 Sink 的輸入目的地相符。

發布一些資料

接下來,我們需要將一些資料發布到 https://127.0.0.1:9000

curl https://127.0.0.1:9000 -H'Content-Type:application/json' -d '{"name":"My Name","address":{"street":"My Street","city":"My City"}}}

驗證資料是否已儲存

再次,找到一個開啟的終端機工作階段並

docker exec -it dataflow-mysql mysql -uroot -p

使用 rootpw 登入並查詢表格

query standalone

如果您看到此訊息,恭喜!獨立的 Spring Cloud Stream 應用程式已如預期般運作。我們現在可以終止我們的獨立應用程式 (Ctrl-C)。保持 docker-compose 程序執行,以便我們可以查看 Data Flow。

使用 Spring Cloud Data Flow

正如我們所看到的,即使我們沒有編寫任何程式碼,在「裸機」上執行這些應用程式也需要許多手動步驟。這些步驟包括

  • 自訂 docker-compose 設定,或在本機上安裝 Kafka 和 MySQL

  • 使用 Maven URL 下載所需版本的串流應用程式(我們碰巧知道在這裡要使用哪些應用程式)

  • 確保 Spring Cloud Stream 目的地繫結已正確設定,以便應用程式可以通訊

  • 尋找並閱讀 文件 以取得設定屬性(我們已經這樣做了以準備此範例),並正確設定它們。

  • 管理多個終端機工作階段

在以下章節中,我們將看到使用 Spring Cloud Data Flow 執行此操作可消除所有這些步驟,並提供更豐富的整體開發體驗。

開啟 Data Flow 儀表板

若要開始使用,請開啟 Data Flow 儀表板 https://127.0.0.1:9393/dashboard。這會將您帶到「應用程式」檢視,我們可以在其中看到已註冊的預先封裝應用程式。我們稍早執行的 docker-compose 命令已執行此步驟,使用我們提供的 URL 來取得串流應用程式的最新快照版本,包括我們剛執行的相同 jar 檔案。

datflow ui applications

建立並部署串流

在儀表板中,從左側選單選擇 Streams,然後按一下 Create Streams 以開啟圖形化串流編輯器。

dataflow create stream

將 HTTP 來源和 JDBC Sink 拖放到編輯器窗格中,然後使用滑鼠連線兩個控制點。或者,您可以直接在頂端的文字方塊中輸入 Data Flow 串流定義 DSL:http | jdbc

接下來,我們需要設定應用程式。如果您按一下任一個應用程式,您會看到一個 Options 連結。開啟 JDBC Sink 的選項視窗。您將看到所有可用的設定屬性都列出,並附有簡短說明。以下螢幕截圖顯示部分檢視;我們需要捲動才能看到其餘部分。

datflow ui jdbc options

與之前一樣,我們需要提供 URL、使用者名稱、密碼、表格和欄位。在這裡,我們需要將 JDBC URL 變更為 jdbc:mariadb://mysql:3306/Demo,因為主機名稱 mysql 對應於 docker-compose.yml 中定義的 MySQL 服務名稱。此外,我們將 HTTP 端口設定為 20000,因為它在已發布端口的設定範圍內。如需更多詳細資訊,請參閱 skipper-server 設定

![]](https://github.com/spring-cloud/stream-applications/blob/gh-pages/img/http-jdbc/datflow-ui-stream-configured.png?raw=true)

讓我們看一下自動產生的串流定義 DSL

http --port=20000 | jdbc --password=rootpw --username=root --url=jdbc:mariadb://mysql:3306/Demo --columns=name,city:address.city,street:address.street --table-name=People

此 DSL 可用於腳本或 Data Flow 用戶端應用程式,以自動化串流建立。我們的設定已完成,但 Spring Cloud Stream 目的地繫結在哪裡?我們不需要它們,因為 Data Flow 會為我們處理連線。

選取 Create Stream 按鈕,並將串流命名為 http-jdbc

dataflow ui deploy 1

若要部署串流,請按一下播放按鈕

play button

接受預設部署屬性,然後按一下頁面底部的 Deploy stream

根據需要按一下 Refresh 按鈕。大約一分鐘後,您應該會看到我們的串流已部署。

dataflow ui deploy 2

發布一些資料並驗證是否已儲存

在這裡,我們將發布一些不同的值到端口 20000

curl https://127.0.0.1:20000 -H'Content-Type:application/json' -d '{"name":"Your Name","address":{"street":"Your Street","city":"Your City"}}}'

當我們再次執行查詢時,我們應該會看到表格中新增了一列。

query dataflow

做得好!

Data Flow 應用程式部署

精明的讀者會注意到,即使平台本身在容器中執行,也沒有為已部署的應用程式建立 Docker 容器。在 Data Flow 架構中,Skipper 伺服器負責部署串流應用程式。在本機設定中,Skipper 使用 Local Deployer 在其 localhost 上執行 jar 檔案,就像我們在獨立執行應用程式時所做的一樣。若要查看是否是這種情況,我們可以在 Skipper 容器中執行 ps

docker exec -it skipper ps -ef

docker exec ps

若要查看主控台記錄,請使用 stdout 路徑

docker exec -it skipper more /tmp/1596916545104/http-jdbc.jdbc-v4/stdout_0.log

tail -f 命令也適用。

如果部署成功,也可以從 UI 查看應用程式記錄。

dataflow ui app log

但如果部署失敗,我們可能需要深入了解以進行疑難排解。

注意

本機 Data Flow 安裝適用於本機開發和探索,但我們不建議用於生產環境。生產級 Spring Cloud Data Flow OSS 以及商業授權產品可用於 KubernetesCloud Foundry

摘要

我們剛剛仔細研究了如何從預先封裝的 Spring Cloud Stream 應用程式建置簡單的資料串流管線,以將透過 HTTP 發布的 JSON 內容儲存到關聯式資料庫中。我們使用了 Docker 和 docker-compose 來安裝本機環境,然後我們部署了來源和 Sink 應用程式,首先在「裸機」上,然後使用 Spring Cloud Data Flow。希望我們學到了一些關於使用 Spring Cloud Stream、Data Flow、Docker 容器、HTTP 來源和 JDBC Sink 的有趣知識。

敬請期待…​

在接下來的幾週內,我們將介紹更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每個案例研究都將探索不同的串流應用程式和功能。

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

瞭解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

檢視全部