搶先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多本文是部落格系列文章的一部分,該系列探討了基於 Java Function 重新設計的 Spring Cloud Stream 應用程式。在本章中,我們將探討如何使用 Spring Cloud Stream Applications 和 Spring Cloud Data Flow 來實作一個非常常見的 ETL 使用案例:從遠端服務擷取檔案。具體來說,我們將研究如何從 S3、SFTP 和 FTP 擷取檔案。
以下是迄今為止部落格系列中包含的內容
為了闡明觀點,Spring Cloud Data Flow 幾年來一直支援使用 SFTP 進行遠端檔案擷取。自從我撰寫了這篇文章以來,基本架構沒有改變,但是正如我們將看到的,新的串流應用程式允許更簡單和更靈活的解決方案。
檔案擷取架構從遠端檔案來源開始,該來源輪詢遠端目錄並為偵測到的每個檔案發布訊息。術語遠端檔案來源指的是任何提供此功能的來源應用程式。迄今為止,這包括 Amazon S3 來源、SFTP 來源和 FTP 來源。
可以將這些來源中的每一個配置為將遠端目錄中的檔案同步到本機目錄。在這種情況下,底層 Supplier
function 產生的訊息酬載是本機檔案路徑。供應商的輸出會轉換為 Task Launch Request。我們稍後將解釋這是如何完成的。請求由 Task Launcher
sink 接收,該 sink 通過其 REST API 將其發布到 Data Flow Server,以啟動批次作業來擷取檔案內容。在下面顯示的範例中,作業將 CSV 檔案的每一行插入到資料庫表格中。
如果我們在雲端平台(例如 Kubernetes 或 Cloud Foundry)上執行,我們需要配置共用磁碟區(例如使用 NFS),以便 task 應用程式可以存取來源下載的檔案。
這是使用 Spring Cloud Data Flow 的建議檔案擷取架構。以下特性使其非常具有彈性
檔案擷取作業是使用 Spring Batch 實作的。它非常適合大型檔案處理,其中暫時性故障可能需要作業從上次中斷的地方重新啟動 - Spring Batch 專門設計來處理這種情況。
Task Launcher sink 使用 PollableMessageSource,以便它可以先確認 Data Flow 可以接受 task 請求,然後再從輸入佇列中提取 task 啟動請求。Data Flow 配置了允許的最大並行 task 執行次數。sink 使用 Data Flow API 檢查是否已達到此限制,然後再接受下一個請求。這種流程控制(類似於背壓)可以防止平台資源飽和,當例如 100 個檔案被放入遠端目錄時,這種飽和很容易發生。
需要共用磁碟區以使批次作業能夠在必要時從上次提交的交易繼續處理。
可以實作這種工作負載而無需 Spring Cloud Data Flow 或 Spring Batch。我們將把它作為讀者的練習。
我們所說的遠端檔案來源是什麼意思?Amazon S3、SFTP 和 FTP 來源應用程式在 Spring Integration 中共享一個共同的血統,因此行為基本相同。例如,擴展 AbstractInboundFileSynchronizer 的類別用於將本機目錄與遠端目錄同步。基底類別包括配置 FileListFilter
以指定要包含哪些檔案。通常,這用於模式比對檔案名稱。此外,此元件使用 MetadataStore 來追蹤本機目錄中已有哪些檔案以及上次修改時間,以便僅同步新的或已變更的檔案。預設情況下,中繼資料儲存區是記憶體內實作。這表示當來源重新啟動時,我們可能會收到已處理檔案的事件。為了解决這個問題,可以輕鬆自訂這些來源中的每一個,以使用多個可用的 持久性實作之一。AbstractFileSynchronizer
也支援使用 SpEL 表達式來建立本機檔案名稱、自動遠端檔案刪除等等。
除了檔案同步之外,這些來源中的每一個都包括 file.consumer.mode
屬性,該屬性可以是以下之一
contents
- 酬載是檔案內容位元組陣列
ref
- 酬載是本機檔案路徑
lines
- 每個酬載都是檔案中的一行
此外,每個來源都提供 list-only
選項,其中酬載包含有關遠端檔案的中繼資料,並且不執行同步。
SFTP 來源從 SFTP 伺服器取用檔案。由於 SFTP 是最常用的遠端檔案服務,因此此元件具有最先進的功能。實際上,在上一代串流應用程式中,SFTP 是我們為檔案擷取架構支援的唯一來源。隨著它發展為支援 task 啟動請求,我們最終實作了一個專門針對檔案擷取使用案例的特殊變體。sftp-datalow
來源旨在與 tasklauncher-dataflow
sink 配合使用,嵌入了將酬載轉換為 task 啟動請求的程式碼。在目前版本中,我們已停用此變體,轉而使用 function 組合。此外,可以設定 sftp 來源來輪詢多個遠端目錄,在每個目錄之間輪換。在此配置中,輪換演算法可以是 fair
- 每個遠端目錄獲得一個輪詢 - 或不輪詢 - 持續輪詢每個遠端目錄,直到沒有新檔案。它還支援 sftp.supplier.stream=true
,它將直接串流內容,而無需同步到本機目錄。
FTP 來源與 SFTP 來源非常相似,只是它使用 FTP 並且不加密傳輸中的資料,因此安全性較低。它提供相同的核心功能,但目前不支援多個遠端目錄、list-only
或 stream
模式。
Amazon S3 來源 以其他來源為模型,並支援與其他來源相同的檔案消費者模式以及 list-only
模式。在這種情況下,s3.supplier.remote-dir
指的是 S3 儲存貯體。當使用 list-only
時,酬載包含 S3ObjectSummary
,它提供有關 S3 物件的中繼資料。S3 本身提供的功能比 FTP/SFTP 更豐富。
除了 AWS S3 之外,此來源現在還可以與 S3 相容的實作(例如 Minio)一起使用。
在之前的版本中,這被稱為 tasklauncher-dataflow
sink。最初,我們也有獨立的 task 啟動器,每個受支援的平台一個。這些啟動器由於易於使用和彈性(如上所述)已被棄用,轉而使用 Data Flow 後端實作。因此,我們從名稱中刪除了「Data Flow」。現在它簡稱為 tasklauncher-sink。
sink 建置在對應的 tasklauncher-function 之上,該 function 可以用於任何獨立應用程式中,以將 task 啟動請求發送到 Data Flow。這實作為 Function<LaunchRequest, Optional<Long>>
。LaunchRequest 是一個簡單的值物件,至少包含要啟動的 task 的名稱。此 task 必須使用相同的名稱在 Data Flow 中定義。或者,啟動請求包括命令列引數和部署屬性。如果請求已提交,則 function 會傳回唯一的 task ID 作為 long 類型。如果 Data Flow 伺服器指示 task 平台已達到其最大執行 task 數、無法連線到 data flow 伺服器或請求無效,則不會提交請求。
Task Launcher sink 從排程的 task 中調用其基底 function,該 task 由 DynamicPeriodicTrigger 觸發,DynamicPeriodicTrigger 允許在執行時更新其週期。在這種情況下,我們使用它來實作指數退避。從一秒的初始週期開始,如果觸發器將退避,最終退避到每 30 秒,如果
沒有排隊的啟動請求
平台已在執行最大數量的 task
如果這些條件中的任何一個發生變化,來源會將週期重置為其初始值。當然,初始和最大觸發週期都是可配置的。
觸發的 task 檢查伺服器是否可以接受新的啟動請求,如果可以,則它使用 PollableMessageSource 輪詢輸入佇列。如果存在請求,它將通過其 REST API 將請求發布到 Data Flow。
新的基於 function 的架構為 function 組合 提供一流的支援。作為此策略的一部分,某些常見的 function 可以與任何來源組合。值得注意的是,這包括 task-launch-request-function。這表示現在可以將任何遠端檔案來源配置為產生 task 啟動請求。task 啟動請求 function 可以評估 SpEL 表達式。例如,每個 task 啟動請求可以提供不同的檔案路徑作為命令列引數。
讓我們深入研究一個範例,看看它是如何運作的。我們將使用 S3 來源、Task Launcher Sink、Spring Cloud Data Flow、S3 相容服務和一個簡單的 Spring Batch 應用程式來處理檔案。
為了簡單起見,我們將使用 Docker Compose 在本機上執行所有操作。
為此範例建立專案目錄,開啟終端機工作階段,然後 cd 到專案目錄。下載 SCDF docker-compose 檔案。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.1/spring-cloud-dataflow-server/docker-compose.yml
設定 Data Flow 和 Skipper 版本,以及匯入最新串流應用程式的 URI。然後執行 docker-compose
export DATAFLOW_VERSION=2.6.1
export SKIPPER_VERSION=2.5.1
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
docker-compose up
SCDF docker-compose.yml
掛載當前目錄,因此這些檔案可從容器存取,路徑為 /root/scdf
。
我們將使用 Minio 作為 S3 儲存,並將在 Docker 容器中執行它以繫結到 minio
目錄。我們將資料檔案新增至 minio/mybucket
。這將作為我們的遠端目錄。
我們也建立一個 download
目錄作為我們的共用本機目錄。download
目錄位於共用磁碟區上,任何需要它的應用程式容器都可以存取它。在本例中,S3 來源從 S3 下載檔案,批次應用程式將擷取資料並將其寫入資料庫表格。在生產環境中,這將是一個外部持久性磁碟區,例如專用伺服器上的 NFS 掛載目錄。
mkdir -p minio/mybucket
mkdir download
在 S3 儲存貯體位置建立資料檔案 name-list.csv
。我們碰巧有一個您可以下載的檔案
wget -o minio/mybucket/name-list.csv https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest/data/name-list.csv
其中包含 firstname
,lastname
行。批次作業將為檔案中的每一行在 people
表格中插入一行。
我們的專案目錄現在應如下所示
.
├── docker-compose.yml
├── download
└── minio
└── mybucket
└── name-list.csv
我們將執行 Minio,建立磁碟區掛載以將其容器的 /data
路徑繫結到 minio
目錄。這將建立一個 S3 儲存貯體 mybucket
,其中包含 name-list.csv
。
docker run --mount type=bind,source="$(pwd)"/minio,target=/data -p 9000:9000 -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" minio/minio server /data
此時,如果您願意,可以開啟瀏覽器到 https://127.0.0.1:9000 並使用上述憑證登入並檢視儲存貯體。
現在我們已經設定了本機環境,我們可以協調我們的檔案擷取管道。
我們碰巧擁有我們需要的應用程式,已發布到 repo.spring.io
Maven 儲存庫。原始程式碼在這裡。
若要註冊此應用程式,請開啟瀏覽器到 https://127.0.0.1:9393/dashboard 並導覽到 Apps 頁面。按一下 Add Application(s)
並使用 URI 註冊名為 fileingest
的 Task
應用程式。
maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
然後按一下 Register the application(s)
應用程式註冊後,我們需要建立一個 task 定義,該定義將在 task 啟動請求中引用。我們將 task 命名為 fileingest
,與應用程式相同。
現在我們將建立一個串流,以便為 S3 儲存貯體中的每個新檔案啟動 fileingest
task。由於我們已經在 S3 儲存貯體中放置了一個檔案,因此我們預期它將被下載到我們的共用 download
目錄。當這種情況發生時,task 啟動請求將被發送到 Task Launcher sink,後者將啟動 fileingest
task 來處理它。
在左側選單欄上選取 Streams
,然後按一下 Create stream(s)
。將下面的串流定義剪下並貼到文字區域中。
注意
在 S3 端點 URL 中替換您主機的 LAN IP 位址。由於 localhost
解析為容器自己的 IP,因此我們需要使用 LAN IP。有多種方法可以取得此值。
ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'
在 OS/X 上適用於我。
另外
dig +short $(hostname)
以前一直有效,直到我的雇主將我的機器放在他們的網域上。
以下是串流定義
s3 --spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction --file.consumer.mode=ref --s3.common.endpoint-url=http:<lan-ip-address>:9000 --s3.common.path-style-access=true --s3.supplier.remote-dir=mybucket --s3.supplier.local-dir=/root/scdf/download --cloud.aws.credentials.accessKey=minio --cloud.aws.credentials.secretKey=minio123 --cloud.aws.region.static=us-east-1 --cloud.aws.stack.auto=false --task.launch.request.taskName=fileingest --task.launch.request.argExpressions='localFilePath=payload' | tasklauncher --spring.cloud.dataflow.client.server-uri=http://dataflow-server:9393
串流定義基本上是 s3|tasklauncher
,但 S3 來源需要一些配置。為了分解它
spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction
- function 組合的秘訣。在這裡,我們使用逗號作為組合分隔符,而不是標準的 |
。如果以這種方式使用 |
,DSL 解析器會感到困惑。我們將 s3supplier
(S3 來源的主要 function)與 taskLaunchRequestFunction
(應用程式上下文中的 Function Bean)組合在一起,如果我們願意使用它,則可供任何標準來源使用。
file.consumer.mode=ref
- 酬載是下載檔案的路徑。
s3.common.endpoint-url
- 我們 Minio 實例的 S3 服務端點。如果您使用的是 AWS S3,則不需要此項。
s3.common.oath-style-access=true
- Minio 需要此項
s3.supplier.remote-dir=mybucket
- 我們的來源將監控的 S3 儲存貯體
3.supplier.local-dir=/root/scdf/download
- 從容器角度來看的本機目錄路徑
cloud.aws.credentials.accessKey=minio
cloud.aws.credentials.secretKey=minio123
- 憑證直接使用 spring-cloud-aws
屬性名稱
cloud.aws.region.static=us-east-1
- AWS s3 SDK 需要一個區域,Minio 忽略此區域
cloud.aws.stack.auto=false
- 不要為 AWS 執行任何特殊操作。
task.launch.request.taskName=fileingest
- 要啟動的 task 的名稱。這是必需的,但可以通過 SpEL 表達式動態設定。
task.launch.request.argExpressions='localFilePath=payload'
- 每次我們啟動 task 時,我們都希望將檔案位置作為命令列引數傳遞,在本例中,我們的擷取 task 正在尋找名為 localFilePath
的引數,該值是針對每個訊息評估的訊息酬載。此路徑位於配置的本機目錄 /root/scdf/download/<filename>
中。因此批次應用程式可以看到它。
在這種情況下,task launcher sink 僅需要 Data Flow Server URI。對於在 skipper 容器中執行的 sink,主機名稱為 dataflow-server
。
建立串流並為其命名。
使用 play
按鈕部署串流。這將開啟一個頁面,讓您可以檢閱配置並進行任何變更。按一下頁面底部的 Deploy stream
。
串流部署完成後,移至 Tasks
頁面,最終(在 30 秒內)您應該會看到 fileingest
task 已完成。
您也可以看到,檔案已複製到 download
目錄
.
├── docker-compose.yml
├── download
│ └── name-list.csv
└── minio
└── mybucket
└── name-list.csv
從 Executions
標籤頁,您可以取得有關 task 執行的更多詳細資訊。
由於這也是 Spring Batch 應用程式,您可以移至 Jobs
頁面,移至 ingestJob
,然後按一下 info
圖示以顯示 Job 執行詳細資訊
工作詳細資訊報告它執行了 5494 次寫入。Data Flow Server 將所有任務應用程式的 DataSource 配置為使用其資料庫來記錄任務和工作執行狀態。在此示範中,我們使用相同的 DataSource 來寫入應用程式資料。我們可以連線到 dataflow-mysql
容器以查詢表格
docker exec -it dataflow-mysql mysql -u root -p
使用密碼 rootpw
登入並查詢表格
如果您讀到這裡,感謝您的時間和關注。如果您執行了示範,恭喜您!
即使以最簡單的形式來看,這對於 Data Flow 來說也是相當進階的使用案例。在這裡,我們提供了 Spring Batch 應用程式。通常,您會編寫自己的應用程式(儘管有傳言說 Spring Cloud Task 的下一個版本將包含可配置的批次應用程式)。除此之外,我們不需要編寫任何程式碼,就能擁有功能完整、雲端原生、事件驅動的 ETL 管道,以將資料從 S3 擷取到關聯式資料庫。事件驅動意味著新資料會在到達時立即被擷取並供使用者使用,而不是在夜間排程執行並在隔天提供資料。檔案可以並行處理,並在需要時執行多個工作實例。由於 Data Flow 限制了平台上同時執行的任務數量,因此這種架構可以處理非常高的負載,而不會耗盡平台資源。
一種可行的替代方案是從 S3 擷取檔案,這樣就不需要將檔案複製到共用檔案系統。在這種情況下,S3 來源可以配置為 list only=true
,以便它將提供遠端 S3 路徑。然後,批次工作會連線到 S3 並直接處理遠端檔案。這個 Stack Overflow 貼文 提供了一些關於如何做到這一點的提示。
但是,當使用 S/FTP 時,這種方法不太理想,因為這些是檔案傳輸協定,而且直接串流受到限制。如果您不使用持久性磁碟區,並且工作由於某種原因而失敗,您必須從頭開始,可能必須從部分完成的狀態手動倒回。使用 S/FTP 實作此管道與我們在此處展示的非常相似。
這篇文章是關於新功能型 Spring Cloud Stream 應用程式系列文章的一部分。請在未來幾週內關注更多文章。