案例研究:使用 CDC Debezium 來源和分析接收器進行即時變更資料擷取 (CDC) 分析

工程 | Christian Tzolov | 2020 年 12 月 14 日 | ...

本文是一系列部落格文章的一部分,探討基於 Java Functions 的全新重新設計的 Spring Cloud Stream 應用程式。

以下是本部落格系列的所有先前部分。

在這篇文章中,我們將研究 Debezium CDC 來源,它允許我們從 MySQL、PostgreSQL、MongoDB、Oracle、DB2 和 SQL Server 等資料庫擷取資料庫變更,並透過各種訊息繫結器(例如 RabbitMQ、Apache Kafka、Azure Event Hubs、Google PubSub 和 Solace PubSub+ 等)即時處理這些變更。

我們還將揭示如何使用 Analytics 接收器 將擷取的資料庫變更轉換為指標,並將它們發布到各種監控系統以進行進一步分析。

本文首先解釋 CDC supplierAnalytics consumer 元件,展示如何在您自己的 Spring 應用程式中以程式設計方式自訂和使用它們。 接下來,我們解釋 CDC 來源Analytics 接收器 如何建立在 supplier 和 consumer 之上,以提供開箱即用、隨時可用的串流應用程式。

最後,我們將示範使用 Spring Cloud Data Flow (SCDF) 部署串流管線是多麼容易,這些管線會即時回應資料庫更新,將變更事件轉換為分析指標,並將它們發布到 Prometheus 以便使用 Grafana 進行分析和視覺化。

變更資料擷取

變更資料擷取 (CDC) 是一種觀察所有寫入資料庫的資料變更,並將它們發布為可以以串流方式處理的事件的技術。 由於您的應用程式資料庫始終在變更,因此 CDC 允許您對這些變更做出反應,並讓您的應用程式以與提交到資料庫相同的順序串流每個資料列層級的變更。

CDC 支援多種使用案例,例如:快取失效、記憶體內資料檢視、更新搜尋索引、透過保持不同資料來源同步來進行資料複寫、即時詐欺偵測、儲存稽核追蹤、資料出處等等。

Spring Cloud Data Flow CDC 來源 應用程式是圍繞 Debezium 建置的,Debezium 是一種流行的、開放原始碼的、基於日誌的 CDC 實作,支援各種資料庫。 CDC 來源支援各種訊息繫結器,包括 Apache Kafka、Rabbit MQ、Azure Event Hubs、Google PubSub、Solace PubSub+。

注意

CDC source 實作嵌入了 Debezium Engine,並且不依賴 Apache Kafka 也不依賴 ZooKeeper! 您可以將 CDC source 與任何受支援的訊息繫結器一起使用! 但是 Debezium Engine 具有一些 限制 需要考慮。

CDC Debezium Supplier

CDC Debezium Supplier 實作為 java.util.function.Supplier bean,當調用時,它將傳遞給定目錄中檔案的內容。 檔案 supplier 具有以下簽章

Supplier<Flux<Message<?>>>

supplier 的使用者可以訂閱傳回的 Flux<Message<?>,它是一系列訊息或 CDC 變更事件,它們具有複雜的結構。 每個事件由三個部分組成(例如 metadatabeforeafter),如下面的 payload 範例所示

{
  "before": { ... },  // row data before the change.
  "after": { ... },  // row data after the change.
  "source": {  //  the names of the database and table where the change was made.
    "connector": "mysql", "server_id": 223344,"snapshot": "false",
    "name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
    "db": "inventory",  // source database name.
    "table": "customers", // source table name.
  },
  "op": "u",  // operation that made the change.
  "ts_ms": 1607440256301, // timestamp - when the change was made.
  "transaction": null  // transaction information (optional).
}

如果將 cdc.flattening.enabled 屬性設定為 true,則只會將 after 區段作為獨立訊息傳遞。

為了調用 CDC supplier,我們需要指定一個來源資料庫來接收 CDC 事件。 cdc.connector 屬性用於在支援的 mysqlpostgressql serverdb2oraclecassandramongo 來源資料庫類型之間進行選擇。 cdc.config.database.* 屬性有助於配置來源存取。 以下是連接到 MySQL 資料庫的範例配置

# DB type
cdc.connector=mysql

# DB access
cdc.config.database.user=debezium
cdc.config.database.password=dbz
cdc.config.database.hostname=localhost
cdc.config.database.port=3306

# DB source metadata
cdc.name=my-sql-connector
cdc.config.database.server.id=85744
cdc.config.database.server.name=my-app-connector

cdc.name, cdc.config.database.server.idcdc.config.database.server.name 屬性用於識別和分派傳入的事件。 您可以選擇設定 cdc.flattening.enabled=true 來扁平化 CDC 事件,以僅使用其 after 欄位替換原始變更事件,以建立簡單的 Kafka 記錄。 您也可以選擇使用 cdc.schema=true 將 DB schema 包含到 CDC 事件中。

注意

必須將來源資料庫配置為公開其 Write-Ahead Log API,Debezium 才能夠連接和使用 CDC 事件。 Debezium 連接器文件 提供了有關如何為任何受支援的資料庫啟用 CDC 的詳細說明。 為了我們的示範目的,我們將使用預先配置的 MySQL docker 映像

在自訂應用程式中重複使用 CDC Supplier

CDC supplier 是一個可重複使用的 Spring bean,我們可以將其注入到最終使用者自訂應用程式中。 注入後,可以直接調用它並與資料的自訂處理結合。 這是一個例子。

@Autowired
Supplier<Flux<Message<?>>> cdcSupplier;

public void consumeDataAndSendEmail() {
  Flux<Message<?> cdcData = cdcSupplier.get();
  messageFlux.subscribe(t -> {
      if (t == something)
         //send the email here.
      }
  }
}

在上面的偽代碼中,我們注入 CDC supplier bean,然後使用它調用其 get() 方法以取得 Flux。 然後,我們訂閱該 Flux,並且每次我們透過 Flux 接收到任何資料時,都會應用一些篩選,並根據該資料採取行動。 這只是一個簡單的說明,展示了我們如何重複使用 CDC supplier。 當您在實際應用程式中嘗試此操作時,您可能需要在實作中進行更多調整,例如在進行條件檢查之前,將接收到的資料的預設資料類型從 byte[] 轉換為其他類型。

提示

為了建置獨立的、非串流的應用程式,您可以利用 cdc-debezium-boot-starter。 只需新增 cdc-debezium-boot-starter 依賴項,並實作您的自訂 Consumer<SourceRecord> 處理程序來處理傳入的資料庫變更事件。

CDC Debezium 來源

正如我們在本部落格系列中看到的那樣,所有開箱即用的 Spring Cloud Stream 來源應用程式都已經使用多個開箱即用的通用處理器進行了自動配置。 您可以將這些處理器作為 CDC 來源 的一部分啟用。 這是一個範例,我們運行 CDC 來源並接收資料,然後在將消耗的資料傳送到中介軟體上的目的地之前轉換該資料。

java -jar cdc-debezium-source.jar
 --cdc.connector=mysql --cdc.name=my-sql-connector
 --cdc.config.database.server.name=my-app-connector
 --cdc.config.database.user=debezium --cdc.config.database.password=dbz
 --cdc.config.database.hostname=localhost --cdc.config.database.port=3306
 --cdc.schema=true
 --cdc.flattening.enabled=true
 --spring.cloud.function.definition=cdcSupplier|spelFunction
 --spel.function.expression=payload.toUpperCase()

藉由為 spring.cloud.function.definition 屬性提供值 cdcSupplier|spelFunction,我們啟用了由 CDC 供應器組成的 SpEL 函數。然後,我們提供一個 SpEL 表達式,用於轉換使用 spel.function.expression 的資料。還有其他一些函數可以透過這種方式組合。請參閱此處以獲取更多詳細資訊。

分析消費者 (Analytics Consumer)

分析消費者提供了一個函數,用於計算輸入資料訊息的分析結果,並將其作為指標發布到各種監控系統。它利用 micrometer 函式庫,為最受歡迎的 監控系統提供統一的程式設計體驗,並使用 Spring Expression Language (SpEL) 來定義如何從輸入資料計算指標名稱、值和標籤。

我們可以直接在自訂應用程式中使用 consumer bean,以計算傳遞訊息的分析結果。以下是 Analytics consumer bean 的類型簽章

Consumer<Message<?>> analyticsConsumer

一旦注入到自訂應用程式中,使用者可以直接調用 consumer 的 accept() 方法,並提供一個 Message<?> 物件來計算分析結果並將其發布到後端監控系統。

Message 是一個通用的資料容器。每個 Message 實例都包含一個 payloadheaders,其中包含使用者可擴展的屬性作為鍵值對。 SpEL 表達式用於存取訊息的 headers 和 payload,以計算指標量和標籤。例如,計數器指標可以有一個從輸入訊息 payload 大小計算出的值 amount,並添加一個從 kind header 值中提取的 my_tag 標籤

analytics.amount-expression=payload.lenght()
analytics.tag.expression.my_tag=headers['kind']

分析消費者 (Analytics consumer) 的組態屬性以 analytics.* 前綴開頭。有關可用的分析屬性,請參閱 AnalyticsConsumerProperties。監控組態屬性以 management.metrics.export 前綴開頭。有關組態特定監控系統的資訊,請遵循提供的組態說明

分析接收器 (Analytics Sink)

如同 CDC Source 的情況,Spring Cloud Stream 開箱即用的應用程式也提供了一個基於 Analytics consumer分析接收器

此接收器適用於 Apache KafkaRabbitMQ 繫結器變體。當用作 Spring Cloud Stream 接收器時,分析消費者會自動組態為從相應的中介軟體系統(例如,來自 Kafka topic 或 RabbitMQ exchange)接收資料。

在 Spring Cloud Data Flow 上執行

單獨執行 CDC sourceAnalytics sink 都可以,但 Spring Cloud Data Flow 使它們更容易作為 pipeline 執行。基本上,我們想要編排如下所示的資料流

scdf pipelines

cdc-log pipeline 部署了一個 cdc-source,它使用 JSON 訊息格式將所有資料庫變更串流到 log-sink 中。同時,cdc-analytics-tap pipeline 將 cdc-source 輸出 tapanalytics-sink 中,以計算來自 CDC 事件的 DB 統計資料,並將其發布到時間序列資料庫 (TSDB),例如 Prometheus 或 Wavefront。 Grafana 儀表板用於視覺化這些變更。

Spring Cloud Data Flow 安裝說明說明了如何在任何受支援的雲端平台上安裝 Spring Cloud Data Flow。

以下我們將簡要介紹設定 Spring Cloud Data Flow 的步驟。首先,我們需要取得用於執行 Spring Cloud Data Flow 的 docker-compose 檔案,Prometheus 和 Grafana

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose.yml

wget -O docker-compose-prometheus.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose-prometheus.yml

此外,取得這個額外的 docker-compose 檔案以安裝一個來源 MySQL 資料庫,該資料庫已組態為公開 cdc-debezium 連接到的預寫日誌。

wget -O mysql-cdc.yml https://gist.githubusercontent.com/tzolov/48dec8c0db44e8086916129201cc2c8c/raw/26e1bf435d58e25ff836e415dae308edeeef2784/mysql-cdc.yml

mysql-cdc 使用 debezium/example-mysql 映像,並帶有庫存、範例資料庫

invetory db

我們需要設定一些環境變數才能正確執行 Spring Cloud Data Flow。

export DATAFLOW_VERSION=2.7.1
export SKIPPER_VERSION=2.6.1
export STREAM_APPS_URI=https://dataflow.spring.io/kafka-maven-latest

現在我們已準備好一切,可以開始執行 Spring Cloud Data Flow 和所有其他輔助元件了。

docker-compose -f docker-compose.yml -f docker-compose-prometheus.yml -f mysql-cdc.yml up

提示

若要使用 RabbitMQ 而不是 Apache Kafka,您可以下載一個額外的 docker-compose 檔案,如 RabbitMQ 而不是 Kafka 指示中所述,並將 STREAM_APPS_URI 變數設定為 https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-rabbit-maven

提示

若要使用 Wavefront 而不是 Prometheus 和 Grafana,請遵循 Wavefront 指示。

一旦 SCDF 啟動並執行,請前往 https://127.0.0.1:9393/dashboard。然後前往左側的 Streams 並選取 Create Stream。從來源應用程式中選取 cdc-debezium,然後從接收器應用程式中選取 loganalytics,以定義 cdc-log = cdc-debezium | logcdc-analytic-tap = :cdc-log.cdc-debezium > analytics pipeline。您可以點擊應用程式的選項以選取所需的屬性。

為了更快地引導啟動,您可以複製/貼上以下現成的 pipeline 定義程式碼片段

cdc-log = cdc-debezium --cdc.name=mycdc --cdc.flattening.enabled=false --cdc.connector=mysql --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.dbname=inventory --cdc.config.database.hostname=mysql-cdc --cdc.config.database.port=3307 --cdc.stream.header.offset=true --cdc.config.database.server.name=my-app-connector --cdc.config.tombstones.on.delete=false | log

cdc-analytic-tap = :cdc-log.cdc-debezium > analytics --analytics.name=cdc --analytics.tag.expression.table=#jsonPath(payload,'$..table') --analytics.tag.expression.operation=#jsonPath(payload,'$..op') --analytics.tag.expression.db=#jsonPath(payload,'$..db')

cdc-log pipeline 部署了一個 cdc-debezium source,它連接到 mysql-cdc:3307 上的 MySQL 資料庫,並將 DB 變更事件串流到 log sink。有關可用的組態選項,請參閱 cdc-debezium 文件

cdc-analytic-tap pipeline 將 cdc-debezium source 的輸出 tap 到 analytics sink 中,並將 cdc 事件串流到 analytics sink。分析建立一個 指標計數器 (稱為 cdc),並使用 SpEL 表達式 從串流訊息 payload 中計算指標標籤 (例如 db、table 和 operations)。

例如,讓我們修改 MySQL inventory 資料庫中的 customers 表。更新交易作為變更事件傳送到 cdc-debezium source,後者將原生 DB 事件轉換為如下所示的統一訊息 payload

{
  "before": {
    "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "[email protected]"
  },
  "after": {
    "id": 1004, "first_name": "Anne2", "last_name": "Kretchmar", "email": "[email protected]"
  },
  "source": {
    "version": "1.3.1.Final", "connector": "mysql", "server_id": 223344, "thread": 5,
    "name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
    "db": "inventory",
    "table": "customers",
  },
  "op": "u",
  "ts_ms": 1607440256301,
  "transaction": null
}

以下 SpEL 表達式用於從 CDC 訊息 payload 中計算 3 個標籤 (dbtableoperation)。這些標籤會分配給發布到 Prometheus 的每個 cdc 指標。

--analytics.tag.expression.db=#jsonPath(payload,'$..db')
--analytics.tag.expression.table=#jsonPath(payload,'$..table')
--analytics.tag.expression.operation=#jsonPath(payload,'$..op')

以下螢幕擷取畫面顯示了選取所有屬性後的外觀

scdf create streams

建立部署 cdc-logcdc-analytics-tap pipeline,接受所有預設選項。您可以選擇使用 Group Actions 同時部署兩個串流。

部署串流後,您可以透過 SCDF UI 或使用 Skipper docker 容器檢查已部署應用程式的日誌,如文件中說明。如果您檢查 Log sink 應用程式的日誌,您應該會看到類似於以下的 CDC JSON 訊息

cdc event log

接下來,使用 按鈕 (或直接開啟 localhost:3000) 前往 Grafana 儀表板,並以使用者:`admin` 和密碼:`admin` 登入。您可以瀏覽 Applications 儀表板以檢查已部署 pipeline 的效能。現在您可以匯入 CDC Grafana Dashboard-Prometheus.json 儀表板,並看到類似於以下的儀表板

grafana dashboard

以下查詢已用於匯總 Prometheus 內的 cdc_total 指標

sort_desc(topk(10, sum(cdc_total) by (db, table)))
sort_desc(topk(100, sum(cdc_total) by (op)))

提示

您可以開啟 Prometheus UI,網址為 https://127.0.0.1:9090 以檢查組態以及執行一些臨時 PQL 查詢。

產生 DB 活動

您可以連接到 localhost:3307 上的庫存 CDC MySQL 資料庫 (使用者:root 和密碼:debezium),並開始修改資料。

以下 docker 命令顯示如何連接到 mysql-cdc

docker exec -it mysql-cdc  mysql -uroot -pdebezium --database=inventory

以下指令碼有助於產生多個插入、更新和刪除 DB 交易

for i in {1..100}; do docker exec -it mysql-cdc  mysql -uroot -pdebezium --database=inventory -e'INSERT INTO customers (first_name, last_name, email) VALUES ("value1", "value2", "val@bla"); UPDATE customers SET first_name="value2" WHERE first_name="value1"; DELETE FROM customers where first_name="value2";'; done

您將看到 log-sink 日誌反映了這些變更以及 CDC 儀表板圖表的更新。

結論與未來工作

在這篇部落格中,我們看到了 CDC-debezium 供應者和 Analytics 消費者函式,以及它們對應的 Spring Cloud Stream 來源和接收器如何運作。 供應者和消費者函式可以注入到自定義應用程式中,以便與其他業務邏輯結合。

來源和接收器應用程式開箱即用,可用於 Kafka 和 RabbitMQ 中介軟體變體。

您可以輕鬆構建獨立應用程式,將 Cdc-debezium 供應者與 Geode 消費者結合使用,以建立和維護資料庫資料的記憶體內視圖。 同樣地,您可以將 Cdc-debezium 供應者與 Elasticsearch 消費者結合使用,以即時維護資料庫資料的可搜尋索引。

更令人興奮的是,您可以使用 OOTB cdc-debezium 來源geode 接收器elasticsearch 接收器應用程式來實現上述場景。 您可以在不同的訊息繫結器和來源資料庫上構建這些管線。

這個 Spring One 演示展示了一個進階的使用案例,使用 CDC-debezium 和機器學習來構建信用卡詐欺檢測的串流資料管線。

cdc fraud detection

在這個部落格系列中,我們還有幾個即將推出的章節。 請繼續關注。

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

瞭解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部