搶先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多本文是部落格系列文章的一部分,該系列探討了基於 Java 函數的全新設計 Spring Cloud Stream 應用程式。在本集中,我們將探索 JDBC 供應商和基於 Spring Cloud Stream 的來源。我們將了解如何從關聯式資料庫匯出資料,並使用檔案消費者和對應的 Spring Cloud Stream 檔案接收器將其轉儲到檔案中。我們將研究幾種不同的方法,可以使用這些方法執行 JDBC 來源並將資料傳送到檔案。
以下是本部落格系列的所有先前部分。
查詢資料庫並處理結果是一個非常基本的企業用例。甚至可以追溯到資訊技術革命的大型主機時代,我們可以看到這種模式被廣泛使用。在過去幾十年中,SQL 已確立為與資料庫通訊的典型語言。Java 從最初的版本開始,就使用一個名為 Java 資料庫連線能力(通常稱為 JDBC)的函式庫,為基於資料庫的應用程式增加了企業級支援。在 Java 的早期,許多應用程式都是使用原始 JDBC 函式庫編寫的。Spring Framework 從一開始就支援使用資料庫的核心用例,它提供了一種基於 JDK 中的 JDBC 函式庫的範本模式 - JdbcTemplate。Spring Data 專案在此範本周圍增加了許多額外功能。當 Spring Integration 出現時,它利用了 Spring 中的這種支援,並提供了許多額外元件,以便資料可以通過通道適配器、閘道等使用。在最新的版本中,這也是本部落格的主題,我們意識到我們可以採用這些 Spring Integration 元件,然後將它們作為簡單的 Java 供應商來查詢資料庫。我們將深入了解如何存取、在自訂應用程式中重複使用以及用作 Spring Cloud Stream 來源的此供應商的所有詳細資訊。
JDBC 供應商是一個元件,它被實作為 java.util.function.supplier
bean。當被調用時,它將從資料庫表格傳遞資料。JDBC 供應商具有以下簽章。
Supplier<Flux<Message<?>>> jdbcSupplier()
預設情況下,JDBC 供應商根據資料庫表格的列來分割資料,其中資料庫的每一列都表示為 java.util.Map
資料結構。例如,這是一個包含一些資料的資料庫表格。
ID
名稱
1
Bob
2
Jane
3
John
當我們針對此表格調用 JDBC 供應商時,我們將獲得 Flux 的 Message 物件,這些物件包含一個 Map
作為有效負載。第一個訊息將有一個 map,其鍵為 ID
和 NAME
,值分別為 1
和 Bob
。第二個訊息將具有相同的鍵的 map,但值為 2
和 Jane
,依此類推。我們也可以要求提供資料,而無需將其分割成單獨的訊息。為此,我們可以使用屬性 jdbc.supplier.split
,並將其設定為 false
(預設值為 true
)。當我們停用分割時,與上述 JDBC 供應商簽章有一個重要的區別。它的簽章然後變為 Supplier<Message<?>>
,而不是擁有單獨的 Message<Map>
,我們得到一個單一的 Message<List<Map>
。如果我們將其應用於上面給出的範例,我們將得到一個包含 3 個元素的單一列表,並且每個元素將保存一個包含資料庫表格每一列的 Map
。
JDBC 供應商需要一個 SQL 查詢才能在每次調用時執行。此查詢是強制性的,必須通過屬性 jdbc.supplier.query
提供。我們也可以使用屬性 jdbc.supplier.update
來強制供應商忽略已讀取的列。我們將在本文章稍後的部分看到如何完成此操作的範例。
上面提到的兩種 Supplier
bean 類型——我們看到的預設類型(資料分割)和我們停用資料分割的另一種類型——都命名為 jdbcSupplier
。我們可以在我們的自訂應用程式中使用該名稱限定詞注入它們。我們需要確保根據我們是否分割資料來使用正確的類型。例如,如果我們使用預設值,即分割資料,那麼我們可以如下所示自動裝配 JDBC 供應商。
@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;
另一方面,如果我們使用屬性 jdbc.supplier.split
停用分割,那麼我們需要使用類型 Supplier<Message<?>
注入它。
注入後,我們可以調用 Supplier
的 get
方法,然後開始接收資料。
與我們在 先前的部落格 中看到的檔案供應商類似,檔案消費者 也是一個可重複使用的 bean,我們可以將其注入到我們的自訂應用程式中,並使用它在目錄中建立新檔案。該 bean 被實作為 java.util.function.Consumer
。對於新手讀者來說,為什麼它被稱為消費者,但實際上並沒有從檔案中消費任何東西,這可能會導致一些混淆。雖然被命名並實作為消費者,但檔案消費者不是輪詢或讀取檔案的檔案消費者,而是一個接受資料然後寫入檔案的消費者。“消費”檔案的用例由檔案供應商處理。
這是檔案消費者的類型簽章。
Consumer<Message<?>> fileConsumer()
由於它是一個消費者,因此僅在資料處理管道的末端使用此元件才有意義。消費者接受傳入的資料並使用它寫入檔案。當我們通過綁定器實作將檔案消費者與 Spring Cloud Stream 結合使用時,它將成為一個接收器應用程式,它從中介軟體目標(例如 Kafka 主題或 RabbitMQ 交換器)消費資料。
在許多企業用例中,檔案消費者都很有用。對於任何在有新資料可用時建立檔案或附加到檔案的業務案例,檔案消費者都很有用。
當使用檔案消費者時,我們可以通過屬性 file.consumer.directory
和 file.consumer.name
分別為其提供要寫入的檔案的目錄和名稱。如果我們不設定它們,它將使用消費者設定的預設值。預設情況下,資料將附加到檔案中,這可以通過屬性 file.consumer.mode
進行更改。有關更多詳細資訊,請參閱 FileConsumerProperties 的配置選項。
當將上述功能元件與 Spring Cloud Stream 結合使用時,它們會變得更加強大。這些函數可以用於訊息傳遞應用程式,而 Spring Cloud Stream 使它們更容易以與中介軟體無關的方式使用。JDBC 供應商用於建置 JDBC 來源,該來源可以與許多不同的中介軟體系統一起使用。同樣,檔案消費者被用作 檔案接收器 應用程式的骨幹,該應用程式也可以與不同的訊息傳遞系統一起使用。
在以下章節中,我們將獨立執行這些應用程式,並驗證它們是否按預期工作。
首先,建立一個新目錄 jdbc-file-demo
。
mkdir jdbc-file-demo && cd jdbc-file-demo
我們將執行 Kafka 變體的 jdbc-source 和 file-sink 應用程式。我們將使用 Apache Kafka 作為中介軟體執行這些應用程式。對於 JDBC 來源,我們將使用 MySQL 作為資料庫。我們為 Kafka 和 MySQL 提供了一個方便的 docker-compose 腳本
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml
啟動 docker 容器
docker-compose up
執行 docker ps
並確保您看到所有三個元件都在啟動並執行中(Kafka、Zookeeper 和 MySQL)。
現在我們已經準備好必要的基礎架構,讓我們繼續設定我們的 MySQL 資料庫,然後再執行我們的應用程式。
docker exec -it jdbc-file-blog-mysql mysql -uroot -p
使用 rootpw
作為密碼。在終端機上,輸入以下命令來設定資料庫和表格。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
上面的 schema 非常容易理解,但 tag
列需要一些解釋。它用於避免從我們已經讀取的表格中讀取重複資料。我們的想法是,我們為查詢返回的每一列更新 tag
列,這樣它就不會包含在後續查詢中。我們將在下面看到它的詳細資訊。
保持 MySQL 的終端機工作階段開啟,因為我們稍後會回到它。
讓我們執行現成的檔案接收器獨立應用程式。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar
然後如下所示執行它。
java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo
讓我們回顧一下我們試圖做什麼的詳細資訊。我們要求檔案接收器應用程式從 kafka 主題 jdbc-file-demo
消費資料,然後在檔案系統上的目錄 /tmp/processed-file
中產生一個名為 output.txt
的檔案。預設情況下,每個傳入的 Kafka 主題記錄都作為新行附加到檔案中。如果您將 file.consumer.binary
值設定為 true
,則該檔案將以二進制形式寫入。您可以在 這裡 找到所有可用的配置。
與我們執行檔案接收器的方式類似,我們現在將獲取並執行基於 Kafka 的 jdbc 來源應用程式。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar
然後執行,
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://localhost:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0
我們正在向 JDBC 來源提供以下配置資訊。
資料來源 URL - 在本例中,是我們正在執行的 MySQL 變體的 JDBC URL。
資料來源使用者憑證
來源應用程式要執行的 SQL 查詢
資料傳送到的 Kafka 主題(這設定為與檔案接收器從中消費資料的同一主題)
用於標記記錄的更新 SQL 陳述式
請注意,當我們稍早建立表格時,我們新增了一個名為 tag
的列,以避免讀取我們已讀取的重複記錄。我們的主要 SQL 查詢(通過屬性 jdbc.supplier.query
)將僅讀取 tag
值不為 null 的那些記錄。然後每次來源讀取記錄時,標籤都會更新為值 1
,以便下次執行查詢時,將省略該記錄。如果沒有通過 jdbc.supplier.update
提供更新陳述式(如我們上面所做的那樣),則每次查詢都將提供表格中的所有記錄。如果我們不提供此陳述式,但仍然想避免重複,那麼我們需要使用一些複雜的策略來使用某些 元資料儲存 來追蹤我們到目前為止消費的內容。提供一個 schema 來支援標誌(例如我們的範例中的 tag
),然後在每次讀取時更新它,這是一種更容易避免重複的策略。
JDBC 來源是使用輪詢器調用的。這與在自訂非 Spring Cloud Stream 應用程式中直接使用 JDBC 供應商不同,在後一種情況下,必須手動調用供應商。預設情況下,Spring Cloud Stream 為 JDBC 來源提供了一個輪詢器,它每秒輪詢一次。可以使用屬性 spring.cloud.stream.poller.fixedDelay
更改此排程。有關輪詢的更多控制項,請參閱 這裡。
現在我們正在執行這兩個應用程式,讓我們將資料插入表格中。
前往您的 MySQL 終端機工作階段並輸入以下插入陳述式。
mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);
現在前往目錄 /tmp/processed-file
,檔案接收器正在其中寫入檔案,並尋找名為 output.txt
的檔案。開啟檔案並驗證其內容。它應包含以下內容。
{"id":1,"name":"Bob","street":"First Street","city":"First City"}
將更多資料填入表格中。
mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);
驗證我們是否在 output.txt
檔案中看到新資料。
停止執行 JDBC 來源應用程式,然後如下所示重新執行。
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://localhost:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
正如我們在 先前的部落格 中看到的以及在 這裡 解釋的那樣,所有現成的來源都自動配置了許多有用的函數,因此您可以通過屬性啟動它們。在使用上面的新配置選項執行時,我們將 filterFunction 與 jdbcSupplier
組合在一起,從而為 JDBC 供應商產生的資料新增篩選功能。我們使用屬性 spring.cloud.stream.function.definition
組合它,並給它值 jdbcSupplier|filterFunction
。然後通過屬性 filter.function.expression
,我們提供一個 JSONPath 表達式來篩選掉所有偶數 ID。
現在,如果您在表格中插入更多資料,您將只看到 ID 為奇數的記錄被寫入檔案。
嘗試將這些記錄輸入表格中。
mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);
我們將看到該檔案不包含 ID 為 200、202 和 204 的記錄,因為它們已被篩選掉。
JDBC 供應商隨附了流行的開源 JDBC 驅動程式。目前,它包含適用於 MySQL、PostgreSQL 和 Microsoft SQL Server 資料庫的驅動程式。這使我們能夠快速切換針對特定資料庫(例如 MySQL)運行的相同 JDBC 來源應用程式,以針對 PostgreSQL 運行,而無需進行任何程式碼更改,只需在部署時進行配置更改即可。讓我們採用我們針對 MySQL 運行的 JDBC 來源,這次針對 PostgreSQL 運行。
首先,我們將在 docker 容器中運行 PostgreSQL。
docker run --rm --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo -d -p 5432:5432 postgres
登入到 psql
工作階段(或使用 PGAdmin 等 UI 工具)。
docker run -it --rm --network host postgres psql -h localhost -d demo -U test
使用 test
作為密碼。
然後建立此表格
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
停止目前的 JDBC 來源並使用以下配置選項重新執行它
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://localhost:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
這與我們第二次針對 MySQL 運行 JDBC 來源時的配置選項幾乎相同,但這次資料來源屬性已更改為針對 PostgreSQL 資料庫運行。
在 psql 提示符下,插入與之前使用 MySQL 相同的資料。您會注意到只有 ID 為奇數的資料才會附加到檔案中。
如果我們要為商業資料庫新增 JDBC 驅動程式,那麼我們需要手動進行這些變更。執行步驟很簡單,如下所示。
複製 stream-application 儲存庫
在 maven 配置 中新增我們想要的驅動程式(例如 Oracle JDBC 驅動程式)作為依賴項。將其範圍設定為 runtime
。
從儲存庫的根目錄:./mvnw clean install -pl :jdbc-supplier
使用供應商變更產生應用程式:./mvnw clean install -pl :jdbc-source
cd applications/source/jdbc-source/apps
- 在這裡,我們可以找到基於 Kafka 和 RabbitMQ 的 jdbc-source 應用程式
建置我們想要的應用程式變體。
本部落格詳細介紹了 JDBC 供應商以及如何在 Spring Cloud Stream JDBC 來源中使用此供應商。我們還看到了檔案消費者及其在 Spring Cloud Stream 中的接收器對應項。然後,我們深入研究了如何使用一些變體獨立執行這些應用程式,並在此過程中探索了各種功能。最後,我們看到了如何在各種資料庫之間輕鬆切換 JDBC 來源,以及如何新增新的資料庫驅動程式。
本系列將繼續。在接下來的幾週內,我們將研究更多函數和應用程式。