使用 Spring、協程和 Kotlin Flow 進行反應式開發

工程 | Sébastien Deleuze | 2019 年 4 月 12 日 | ...

自從我們在 2017 年 1 月宣布 Spring Framework 官方支援 Kotlin 以來,發生了很多事情。Kotlin 在 Google I/O 2017 上被宣布為官方 Android 開發語言,我們繼續改進 Spring 產品組合對 Kotlin 的支援,而 Kotlin 本身也隨著 協程等關鍵新功能而不斷發展。

我想藉著 Spring Framework 5.2 的第一個里程碑的機會,提供一個關於 Spring 和 Kotlin 結合的現況概述。我會盡力專注於具體的改進,因為我相信 Spring 和 Kotlin 具有相同的務實思維。

我認為這一切都與選擇有關。我們(Spring 團隊)提供的選擇,以及您作為應用程式開發人員在啟動新的 Spring Boot 應用程式時必須做出的選擇。例如:

  • 我應該使用哪種語言?

  • 使用 @Controller 註解還是函數式風格?

  • Spring MVC 還是 WebFlux?

這些問題顯然具有高度的主觀性,通常取決於專案的背景,但我將分享我個人的一些看法。

Java 還是 Kotlin?

Java 是顯而易見的預設選擇,但 Kotlin 是一種越來越受歡迎的替代方案。是什麼原因會讓開發人員從 Java 轉向 Kotlin?當人們問我時,我通常會說 Kotlin 允許 Java 開發人員利用他們現有的技能來編寫更簡短、更安全和更具表達性的程式碼。但要做出明智的選擇,我們應該找出更具體的要點。

我最喜歡的 Kotlin 功能是它將所謂的(多個)「數十億美元的錯誤」null 變成了一種安全功能。Java 的錯誤不是 null 本身,而是沒有在其類型系統中明確管理 null,導致出現類似於我們在動態語言中觀察到的問題。Kotlin 擁抱 null,透過在它的類型系統中使用它來 處理值的缺失。在 Kotlin 中,像 String 這樣的類型不可為空,因此可以安全使用而無需預防措施,而像 String? 這樣的類型可以為空,應該謹慎使用。好消息是 Kotlin 編譯器會在編譯時引發潛在的錯誤,您可以使用 安全呼叫Elvis 運算子如果非 null 則執行區塊來優雅地處理它們。與 Java Optional 不同,Kotlin 的 null 安全性也適用於輸入參數,並且不會強迫您使用影響效能和程式碼可讀性的包裝器。

DSL 也是 Kotlin 的另一個優勢。Gradle Kotlin DSL(在 start.spring.io 上的支援 即將推出)是一個很好的例子,它允許使用非常豐富和靈活的 API,並具有出色的可發現性和信心,這要歸功於 Kotlin 的靜態類型特性。Spring Framework 為 bean 定義函數式路由 甚至 MockMvc 提供 Kotlin DSL。

我可以詳細說明很多其他轉換的好理由,例如具有預設值的可選參數與 Java API(如 Spring)的良好互通性擴充函數具體化的類型參數以避免類型擦除、資料類別 或預設情況下鼓勵的不可變性,但我認為您應該只 透過範例學習 Kotlin,最終由 參考文件提供協助,並做出自己的判斷。您也可以按照此逐步 使用 Kotlin 的 Spring Boot 教學

所以假設我將為我的下一個 Spring Boot 專案選擇 Kotlin ;-)

使用 @Controller 註解還是函數式風格?

正如我在簡介中所說,選擇取決於上下文,並且是一個品味問題。我非常喜歡 使用 Kotlin 進行函數式路由,因為這種語言具有非常棒的 DSL 和函數式程式設計功能。我甚至正在研究如何透過實驗性的 Kofu DSL for Spring Boot 以函數式的方式定義 Spring Boot 應用程式配置,該配置在 Spring Fu 儲存庫中進行孵化。

但是今天,假設我的團隊由多年來習慣使用 @Controller 程式設計模型的開發人員組成,並且我不想同時更改所有內容,所以讓我們保留 @Controller

Spring MVC 還是 WebFlux?

我們在 Web 框架方面提出的選擇如下。

您可以繼續使用 Spring MVC 以及所有相關的知名技術,我們將繼續改進這些技術:Tomcat、JPA 等。您甚至可以使用 WebClient 現代 API 而不是 RestTemplate 來利用一些反應式功能。

但我們也提供了一個反應式堆疊,其中包括 WebFlux,這是一個基於 Reactive Streams 的 Web 框架,適用於那些需要更高可擴展性、對延遲免疫(適用於面向微服務的架構)和更好的流處理能力的開發人員。生態系統的其他部分,如 Spring Data 和 Spring Security 也提供了反應式支援。

Java 中使用 Reactor API 的 WebFlux

到目前為止,使用 WebFlux 使用 Spring 反應式堆疊需要一個相當大的轉變,即透過使用 Reactor MonoFlux 或 RxJava 類似類型等 API,將 IO 相關功能(Web、持久性)從命令式切換為聲明式/函數式風格。與命令式程式設計相比,這種顛覆性的方法提供了真正的優勢,但它也非常不同,並且需要一個非同小可的學習曲線。

讓我們看看這在具體程式碼中的含義,並讓我們藉此機會向您展示如何使用 R2DBC(基於 Reactive Streams 的 JDBC 替代方案)和 Spring Data R2DBC 以反應式的方式存取 SQL 資料庫。

如果我們選擇 Java,我們將編寫以下 UserRepository 類別,該類別公開一個反應式 API,以使用 Spring Data R2DBC 提供的 DatabaseClient API 存取 SQL 資料庫。

class UserRepository {

	private final DatabaseClient client;

	public UserRepository(DatabaseClient client) {
		this.client = client;
	}

	public Mono<Long> count() {
		return client.execute().sql("SELECT COUNT(*) FROM users")
			.as(Long.class).fetch().one();
	}

	public Flux<User> findAll() {
		return client.select().from("users").as(User.class).fetch().all();
	}

	public Mono<User> findOne(String id) {
		return client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).as(User.class).fetch().one();
	}

	public Mono<Void> deleteAll() {
		return client.execute().sql("DELETE FROM users").then();
	}

	public Mono<Void> save(User user) {
		return client.insert().into(User.class).table("users")
			.using(user).then();
	}

	public Mono<Void> init() {
		return client.execute().sql("CREATE TABLE ...").then()
			.then(deleteAll())
			.then(save(new User("smaldini", "Stéphane", "Maldini")))
			.then(save(new User("sdeleuze", "Sébastien", "Deleuze")))
			.then(save(new User("bclozel", "Brian", "Clozel")));
	}
}

注意

儲存使用者可以以 fork-join 方式完成,因為這些操作彼此不依賴,但為了進行比較,我使用以 then() 鏈接的循序操作。

你可以看到,在這樣的 API 中,void 變成了 Mono<Void>User 變成了 Mono<User>。這使得它們可以以非阻塞的方式使用,並提供對豐富的操作符集合的訪問。但這也強制要求使用 Mono 包裝器,並顯著改變你使用這些 API 的方式。例如,如果某些操作需要像 init() 方法中那樣依序執行,而使用命令式程式碼可以很直接地做到,那麼在這裡我們必須使用 then 操作符構建一個宣告式的管道。

Flux<User> 提供了更高的附加價值,因為它允許將傳入的使用者作為流來處理,而使用 List<User> (通常用於阻塞堆疊中)則意味著在處理之前將所有資料載入到記憶體中。請注意,我們也可以在這裡使用 Mono<List<User>>

在控制器端,你可以看到 Spring WebFlux 原生支援這些反應式型別,而且你還可以觀察到基於 Reactive Streams 的 API 的另一個特性,即異常主要被用作反應式型別所攜帶的錯誤訊號,而不是像常規命令式程式碼中那樣拋出。

@RestController
public class UserController {

	private final UserRepository userRepository;

	public UserController(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	@GetMapping("/")
	public Flux<User> findAll() {
		return userRepository.findAll();
	}

	@GetMapping("/{id}")
	public Mono<User> findOne(@PathVariable String id) {
		return userRepository
			.findOne(id)
			.switchIfEmpty(Mono.error(
				new CustomException("This user does not exist");
	}

	@PostMapping("/")
	public Mono<Void> save(User user) {
		return userRepository.save(user);
	}
}

Kotlin 中使用協程的 WebFlux API

重要的是要理解 Spring 的反應式支援是建立在 Reactive Streams 之上,並考慮到互操作性,而且 Reactor 被用於 2 種不同的目的

  • 它是我們在 Spring 反應式基礎架構中普遍使用的 Reactive Streams 實作

  • 它也是預設暴露的反應式公共 API

但 Spring 的反應式支援從一開始就被設計成可以輕鬆適應其他非同步或反應式 API,例如 CompletableFuture、RxJava 2、以及現在的協程。在這種情況下,我們仍然在內部利用 Reactor,並在公共 API 層面適應不同的終端使用者反應式 API。

如果你喜歡這種方法,當然可以繼續在 Kotlin 中使用 FluxMono,但 Spring Framework 5.2 引入了一個新的主要特性:我們現在可以使用 Kotlin 協程,以更命令式的方式利用 Spring 反應式堆疊。

協程是 Kotlin 的輕量級執行緒,允許以命令式的方式編寫非阻塞程式碼。在語言層面,以 suspend 關鍵字標識的暫停函數為非同步操作提供了一個抽象,而在函式庫層面,kotlinx.coroutines 提供了諸如 async { } 之類的函數,以及諸如 Flow 之類的型別,它在協程世界中相當於 Flux

kotlinx-coroutines-corekotlinx-coroutines-reactor 依賴項在 classpath 中時,就會啟用協程支援。

build.gradle.kts

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

那麼,如果使用 Kotlin 而不是 Java 編寫,並使用協程和 Flow 而不是 MonoFluxUserRepositoryUserController 會是什麼樣子?

class UserRepository(private val client: DatabaseClient) {

	suspend fun count(): Long =
		client.execute().sql("SELECT COUNT(*) FROM users")
			.asType<Long>().fetch().awaitOne()

	fun findAll(): Flow<User> =
		client.select().from("users").asType<User>().fetch().flow()

	suspend fun findOne(id: String): User? =
		client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).asType<User>()
			.fetch()
			.awaitOneOrNull()

	suspend fun deleteAll() =
		client.execute().sql("DELETE FROM users").await()

	suspend fun save(user: User) =
		client.insert().into<User>().table("users").using(user).await()

	suspend fun init() {
		client.execute().sql("CREATE TABLE IF NOT EXISTS users (login varchar PRIMARY KEY, firstname varchar, lastname varchar);").await()
		deleteAll()
		save(User("smaldini", "Stéphane", "Maldini"))
		save(User("sdeleuze", "Sébastien", "Deleuze"))
		save(User("bclozel", "Brian", "Clozel"))
	}
}

你可以看到,在這裡我們不是返回 Mono<User>,而是在一個可以在命令式使用的暫停函數中返回 User(或者更確切地說是它的可空變體 User?)。init() 方法實作的差異很好地說明了這一點,因為我們現在使用常規的命令式程式碼,而不是鏈式的 then 調用。

但是等等,我如何在直接在 DatabaseClient 型別上使用協程?它是一個基於 MonoFlux 的反應式 API。之所以能夠實現這一點,是因為 Spring Data R2DBC 也提供了 Kotlin 擴展(例如,請參閱 這個),它允許你在導入後在 DatabaseClient 上添加基於協程的方法。按照慣例,暫停方法以 await 作為前綴或以 AndAwait 作為後綴,並獲得與基於 Mono 的對應方法相似的名稱。

現在讓我們更深入地了解這個 Flow<User> 返回型別。首先,請注意,我們指的是 kotlinx.coroutines.flow.Flow,而不是 java.util.concurrent.Flow,它是 Java 9+ 提供的 Reactive Streams 容器型別。

你將像使用 Java 8+ 的 Stream 或其 Kotlin 等效項 Sequence 一樣使用 Flow API,但巨大的差異在於它適用於非同步操作並管理背壓。因此,它在協程世界中相當於 Flux,適用於熱或冷流、有限或無限流,具有以下主要差異:

  • Flow 是基於推送的,而 Flux 是推拉混合的。

  • 背壓是透過暫停函數實現的。

  • Flow 只有一個 單個暫停 collect 方法,操作符則被實現為 擴展

  • 由於協程,操作符很容易實現

  • 擴展允許將自訂操作符添加到 Flow

  • Collect 操作是暫停函數。

  • map 操作符 支援非同步操作(無需 flatMap),因為它接受一個暫停函數參數。

現在讓我們看一下控制器的協程版本。

@RestController
class UserController(private val userRepository: UserRepository) {

	@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): User? =
		userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")

	@PostMapping("/")
	suspend fun save(user: User) =
		userRepository.save(user)
}

同樣,你可以看到程式碼非常接近於我們使用 Spring MVC 時使用的常規命令式程式碼。

除了為基於 FluxMono 的 API(如 WebClientServerRequestServerResponse)提供協程擴展之外,Spring WebFlux 現在還原生支援用於帶有 @Controller 註解的類別的暫停函數和 Flow 返回型別。

使用命令式程式碼進行非同步操作

讓我們利用 WebClient 協程擴展來看看如何鏈式調用非同步調用。我們將請求一個遠端 HTTP 端點以取得額外的 UserDetail1UserDetail2

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails {
		val userDetail1 = client.get().uri("/userdetail1/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail1>()
		val userDetail2 = client.get().uri("/userdetail2/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail2>()
		return UserWithDetails(user, userDetail1, userDetail2)
	}
}

在這裡,我們使用 WebClient 協程擴展(如 awaitExchange()awaitBody())以純粹命令式的方式執行非同步和非阻塞操作。而且由於 Flow map 操作符 接受一個暫停函數參數,我們可以在其中執行這樣的操作,而無需像在 Java 的反應式 API 中那樣使用 flatMap

平行分解

如前所述,協程預設是依序執行的,但它們也可以用於平行執行操作。讓我們重構之前的範例,以便同時執行 2 個遠端調用。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails = coroutineScope {
		val asyncDetail1 = async {
			client.get().uri("/userdetail1/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail1>()
		}
		val asyncDetail2 = async {
			client.get().uri("/userdetail2/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail2>()
		}
		UserWithDetails(user, asyncDetail1.await(), asyncDetail2.await())
	}
}

在這裡,我們利用 結構化併發,透過 async {} 建立器建立 Deferred<UserDetail1>Deferred<UserDetail2> 實例來觸發 2 個使用者詳細資訊的平行檢索,然後透過調用 2 個 await() 方法來等待它們完成,這些方法將在 UserDetail1UserDetail2 實例可用時返回。

結論

我認為將 Spring 反應式堆疊與這樣的協程和 Kotlin Flow API 結合使用,可以在命令式和宣告式方法之間提供一個有趣的權衡。它允許以一種非常平易近人的方式利用 WebFlux 和 Spring Data 反應式的可擴展性和功能。

Spring WebFlux 和 Spring Data 中的協程支援將作為即將發布的 Spring Boot 2.2 版本的一部分提供。你可以閱讀 參考文檔,並且可以期待進一步的改進,例如協程支援 RSocket @MessageMapping 端點和 RSocketRequester 擴展。Spring Data Moore 也將在 Spring Data MongoDB、Cassandra 和 Redis 上提供類似的協程擴展。並且 Spring Data 可以在某個時候提供對 協程儲存庫 的支援。我們還將使 Reactor 和協程上下文可互操作,以支援安全性和反應式交易。

最後,我要感謝許多才華橫溢的工程師,沒有他們,這一切都不可能實現

  • Kotlin 團隊的 Roman Elizarov 和 Vsevolod Tolstopyatov 對於他們在協程和 Flow 上的出色工作表示感謝

  • 感謝 Konrad Kaminski 最初的社群驅動的 Spring 協程支援

  • 感謝 Jake Wharton 圍繞統一 Rx 和協程的早期原型設計

  • 感謝 Stéphane Maldini 和 David Karnok 的啟發性工作

  • 感謝 Juergen Hoeller、Rossen Stoyanchev 和 Brian Dussault 的信任

  • 感謝 Mark Paluch 和 Oliver Drotbohm 在持久層面的支持

如同以往,我期待各位的回饋,也歡迎 Kotlin 團隊針對 Flow API 提供意見,畢竟它目前還在預覽模式。歡迎來 Devoxx FranceJAXSpring I/OSunny Tech 聽我即將發表的演講,以了解更多資訊。

乾杯!

訂閱 Spring 電子報

隨時掌握 Spring 的最新資訊,訂閱電子報

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進制檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部