搶先一步
VMware 提供訓練和認證,以加速您的進展。
了解更多此部落格系列的其他部分
第 1 部分:Spring Cloud Stream Kafka 應用程式中的交易簡介
第 2 部分:Spring Cloud Stream Kafka 應用程式中由生產者啟動的交易
第 3 部分:在 Spring Cloud Stream 中與外部交易管理器同步 Kafka 應用程式
第 4 部分:使用 Spring Cloud Stream 和 Apache Kafka 的交易式回滾策略
在我們這個系列的前幾次討論中,對交易如何在 Spring Cloud Stream Kafka 應用程式中運作進行了基本分析之後,我們終於來到了這個關鍵問題:**精確一次語義**,這是串流應用程式中備受討論且必要的功能。在本部落格系列的這部分中,我們將探討如何透過 Apache Kafka 交易在 Spring Cloud Stream 應用程式中獲得精確一次語義。從前面章節中了解交易如何運作,可以相對容易地理解 Spring Cloud Stream Kafka 應用程式如何實現精確一次語義。
這裡需要注意的重要一點是,除了我們在本部落格系列之前的文章中已經看到的程式碼之外,我們不需要編寫任何新的程式碼來實現 **精確一次語義**。本部落格闡明了在 Spring Cloud Stream Kafka 應用程式中充分支援精確一次語義所需的某些期望。
精確一次語義在分散式運算中很難實現。我們超出範圍來審查所有技術細節,以了解為什麼這是一項如此困難的任務。有興趣了解精確一次語義的所有基礎知識以及為什麼在分散式系統中如此難以實現的讀者,可以參考有關該主題的更廣泛文獻。Confluent 的這篇部落格 是理解這些技術挑戰以及 Apache Kafka 為實現這些挑戰而實施的解決方案的良好起點。
雖然我們不會深入探討細節,但值得看一下 Apache Kafka 提供的不同傳遞保證。主要有三種傳遞保證
在 **至少一次** 的傳遞語義中,應用程式可能會接收資料一次或多次,但保證至少接收一次。使用 **最多一次** 語義的傳遞保證,應用程式可能會接收資料零次或一次,這意味著存在資料遺失的可能性。另一方面,**精確一次** 語義保證,顧名思義,僅傳遞一次。根據應用程式的使用案例,您可以使用這些保證中的任何一種。預設情況下,Apache Kafka 提供至少一次傳遞保證,這意味著記錄可能會傳遞多次。如果您的應用程式可以處理重複記錄或沒有記錄的後果,則使用非精確一次保證可能還可以。相反,如果您處理任務關鍵型資料,例如金融系統或醫療資料,則必須保證精確一次的傳遞和處理,以避免可怕的後果。由於像 Apache Kafka 這樣的系統具有分散式性質,因此由於許多移動部件的性質,通常很難實現精確一次語義。
我們在部落格系列之前的文章中看到了許多不同的情境。Apache Kafka 中的精確一次語義解決了 **讀取-處理-寫入**(或 **消費-轉換-生產**)應用程式。有時會對我們「一次」到底在做什麼產生困惑。是最初的消費、資料的處理,還是最後的生產部分?Apache Kafka 保證整個 **讀取->處理-寫入** 序列的精確一次語義。在這個序列中,讀取和處理部分始終是 **至少一次** – 例如,如果處理或寫入的某部分因任何原因而失敗。當您依賴精確一次傳遞時,交易非常關鍵,這樣資料的最終發佈才能成功完成或回滾。一個潛在的副作用是,最初的消費和處理可能會多次發生。例如,如果交易被回滾,則消費者偏移量不會更新,並且下一次輪詢(如果是 Spring Cloud Stream 內的重試或應用程式重新啟動時)會重新傳遞相同的記錄並再次處理。因此,保證在消費和處理(轉換)部分是至少一次,這是理解的關鍵點。任何以 read_committed
隔離等級執行的下游消費者都只會從上游處理器獲得精確一次的消息。因此,務必了解,在精確一次傳遞的世界中,處理器和下游消費者都必須協調才能從精確一次語義中受益。任何以 read_uncommitted
隔離等級運行的生產主題的消費者都可能會看到重複的資料。
另一個需要記住的重點是,由於記錄的消費及其處理可能會多次發生,因此應用程式程式碼需要遵循冪等模式,如果您的程式碼與外部系統(例如資料庫)互動,這主要是需要關注的問題。在這種情況下,由應用程式確保使用者程式碼不會產生副作用。
讓我們重新檢視我們之前看到的用於簡單的消費-處理-生產迴圈的程式碼。
@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
正如我們之前看到的,為了使此應用程式具有交易性,我們必須提供 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
組態屬性,並提供適當的值。在 Spring Cloud Stream 中,僅需提供此屬性即可使上述程式碼片段完全具備精確一次傳遞能力。整個端對端流程在交易邊界內執行(儘管在上面的範例中我們有兩個交易)。當容器調用監聽器時,我們有一個在容器中啟動的外部 Kafka 交易,以及另一個由交易攔截器啟動的 JPA 交易。當 StreamBridge
發送發生時,相同的交易資源會從初始 Kafka 交易中使用,但它不會在控制權返回到容器後才提交。當方法退出時,JPA 交易被提交。假設這裡出現問題,並且資料庫操作拋出異常。在這種情況下,JPA 不會提交,它會回滾,並且異常會傳播回監聽器容器,此時 Kafka 交易也會被回滾。另一方面,如果 JPA 操作成功,但 Kafka 發佈失敗並拋出異常,則 JPA 不會提交,而是會回滾,並且異常會傳播到監聽器。
在上面的程式碼中,如果我們沒有與外部交易管理器同步,而只是發佈到 Kafka,那麼我們不需要使用 @Transactional
註解,我們甚至可以將 txCode
方法中的程式碼內聯為消費者 lambda 的一部分
@Bean
public Consumer<PersonEvent> process() {
return pe -> {
Person person = new Person();
person.setName(pe.getName());
PersonEvent event = new PersonEvent();
event.setName(person.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
在這種情況下,我們只有一個 Kafka 交易,該交易由容器在調用監聽器時啟動。當程式碼透過 StreamBridge
發送方法發佈記錄時,KafkaTemplate
使用來自初始交易的相同交易生產者工廠。
這兩種情境的故事都是我們完全是交易性的,並且最終發佈僅針對交易完成一次。隔離等級為 read_committed
的下游消費者應該精確一次消費它們。
在本系列中,到目前為止,我們還沒有談到 Kafka Streams。具有諷刺意味的是,最初 Kafka Streams 應用程式是 Apache Kafka 添加交易支援和精確一次語義的原因,但我們還沒有談到它。原因是,在 Kafka Streams 應用程式中實現精確一次語義非常簡單,幾乎微不足道。正如他們所說,這只是一個組態旋鈕。要了解有關 Kafka Streams 中精確一次語義的更多資訊,請參閱 Confluent 的這篇 部落格
與常規的基於 Kafka 客戶端的應用程式一樣,在 Kafka Streams 的情況下,當您在 **消費-處理-生產** 模式中產生最終輸出時,精確一次保證就會發揮作用,這意味著只要下游消費者使用 read_committed
隔離等級,他們就會精確一次消費產生的資料。
Kafka Streams 組態屬性 processing.guarantee
屬性在 Kafka Streams 應用程式中啟用精確一次語義。您可以透過設定 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee
屬性在 Spring Cloud Stream 中設定它。您需要將值設定為 exactly_once
。預設情況下,Kafka Streams 使用值 at_least_once
。
在有狀態的 Kafka Streams 應用程式中通常發生的三個主要活動是
模式是接收和處理記錄。在此過程中,任何狀態資訊都會實體化到狀態儲存中,本質上是更新特定的變更日誌主題。最後,出站記錄會發佈到另一個 Kafka 主題。如果您注意到此模式,它看起來與我們已經看到的許多情境類似,除了狀態儲存部分。當將 processing.guarantee
設定為 exactly_once
時,Kafka Streams 保證,如果在這些活動期間發生異常或應用程式崩潰,則整個單元將以原子方式回滾,就像什麼都沒有發生一樣。在應用程式重新啟動時,處理器再次消費記錄、處理記錄,並最終發佈資料。由於此發佈在幕後以交易方式發生,因此隔離等級為 read_committed
的任何下游消費者都不會在發佈成功之前消費該記錄,從而處理實現交易性所需的一切(例如提交消費記錄的偏移量等等),從而保證精確一次傳遞。
Kafka Streams 的精確一次傳遞保證適用於從 Kafka 相關活動的角度來看,記錄的端對端消費、處理和發佈。當存在外部系統時,它不提供此保證。例如,假設您的程式碼與外部系統(例如資料庫插入或更新操作)有互動。在這種情況下,由應用程式決定它希望如何參與交易。Spring 的交易支援在這種情況下再次派上用場。我們不想在這裡重複程式碼。但是,正如我們在本系列中多次看到的那樣,您可以將與資料庫互動的程式碼封裝在一個單獨的方法中,使用 @Transactional
註解對其進行註解,並提供適當的交易管理器,例如我們已經看到的 JPA 交易管理器。當此類方法拋出異常時,JPA 交易會回滾,並且異常會傳播到 Kafka Streams 處理器程式碼,最終將其傳播回 Kafka Streams 框架本身,然後框架會回滾原始 Kafka 交易。再次重複一遍,重要的是要理解,從串流拓撲中的處理器調用的這些操作必須編碼為處理冪等性,因為「精確一次」僅適用於整個過程,而不適用於序列中的讀取和處理。
正如我們在本文開頭已經提到的,**精確一次傳遞** 語義是分散式運算中一個複雜的主題。但是,憑藉 Kafka 本身為實現精確一次語義提供的解決方案,以及 Spring 在 Spring for Apache Kafka 和 Spring Cloud Stream 框架中的支援,在 Spring Cloud Stream Kafka 應用程式中實現精確一次傳遞語義相對容易。