YMNNALFT:使用 Project Reactor 的反應式資料流

工程 | Josh Long | 2021 年 1 月 6 日 | ...

歡迎來到另一個單元 你可能不需要另一個函式庫 (YMNNALFT)! 自 2016 年以來,我花了很多時間在 我的 Spring Tips 影片中闡明(或試圖闡明!)Spring 生態系統中一些更巨大的機會。 然而,今天,我以不同的精神來到你面前,想要專注於那些小而有時隱藏的瑰寶,它們做著很棒的事情,並且可以讓你免於額外的第三方依賴及其隱含的複雜性。

我們涵蓋了一些減少複雜性的瑰寶也是一件好事,因為世界是一個令人困惑、複雜和眼花繚亂的地方,而這在反應式資料流組成的世界中尤其明顯。 生活來得很快,資料更是如此。 資料來自所有事物(網路服務、資料庫、記憶體內計算、執行緒程式碼等)。 資料有多種不同的尺寸和形狀(一筆記錄、十筆記錄、無限筆記錄等)。 資料有不同的容量,使消費者飢餓或不堪重負。 資料以不同的節奏和時間到達:現在、稍後、一次全部、週期性等。 與尚未實現的資料相比,談論已經在記憶體中,手頭上的資料要自然得多。 與在並行案例中處理資料相比,在單執行緒案例中談論資料更自然。

考慮起來令人困惑! 從歷史上看,在程式碼中解決不同的資料維度甚至更加乏味。 也就是說,直到反應式程式設計出現。

反應式程式設計提供了一個統一的世界觀,使我們能夠以易於使用的 DSL 與運算子的形式來思考(可能異步且可能並行的)整合的混亂世界。 這些運算子支援反應式資料流管道的定義和組成。 反應式程式設計提供了一種結構化的並行處理模式,極大地簡化了編寫安全、可擴展、資源高效的程式碼。

有一些很棒的函式庫(例如 RxJavaAkka Streams)以非常相似的方式運作。 如果你沒有特別的想法,但想要一個世界級的選項並且已經在使用 Spring,那麼你最好使用 Project Reactor。 它包含在內了!

Spring 團隊開發了 Project Reactor,以支援 Spring 生態系統中的反應式工作。 你不需要 Spring 即可使用 Project Reactor,但 Spring 生態系統中的所有反應式 API 都是以 Project Reactor 為基礎來進行資料流選擇。 Microsoft 強制要求使用 Project Reactor 建立其所有 SDK 客戶端和 API。 Facebook 使用 Project Reactor 為其 RSocket 協定開發了 Java 客戶端。 Project Reactor 已經很成熟 - 它自 2010 年以來就存在了! - 但一直在增長新功能。 如果你仍然沒有得到你需要的東西,它可以透過可互操作的 Reactive Streams 類型與其他反應式資料流函式庫完美地協同工作。

讓我們看一個範例,瞭解 Project Reactor 如何簡化不同資料流來源和接收器的組成,並且幾乎消除了任何手動執行緒程式碼。 這是一個巨大的勝利。 請記住:只有一個人真正瞭解如何編寫安全、有用的多執行緒 Java 程式碼......而且不是你! 我不知道是誰。 這並不重要。 不要考驗命運; 讓 Project Reactor 幫助你。

你需要以下依賴項。

  • Spring Initializr 上的反應式 Web - org.springframework.boot : spring-boot-starter-webflux

現在,讓我們看一個範例。 這個範例示範了給定不同種類的資料時,正規化處理有多麼容易。 在這個範例中,我們查看 Java 8 java.util.Stream<T?>CompletableFuture<T>,但天空才是極限。 在大多數反應式應用程式中,你不一定會將非反應式類型轉換為反應式類型(例如 Flux<T>Mono<T>)。 這些範例會更簡單。 這個範例假設你有兩個資料來源並且需要組成它們。

package bootiful.rx;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

@SpringBootApplication
public class BootifulApplication {

	CompletableFuture<String> returnCompletableFuture(int counter) {
		return CompletableFuture.supplyAsync(() -> {
			var start = System.currentTimeMillis();
			try {
				Thread.sleep((long) (Math.max((Math.random() * 10), 5) * 1000));
			}
			catch (InterruptedException e) {
				// threads smdh
			}
			var stop = System.currentTimeMillis();
			var delta = stop - start;
			return "(" + Thread.currentThread().getName() + ") Hello, #" + counter + "! (after " + delta + " ms.)";
		});
	}

	Stream<Integer> returnStream() {
		return Stream.iterate(0, integer -> integer + 1);
	}

	@Bean
	ApplicationListener<ApplicationReadyEvent> begin() {
		return event -> {

			Flux<String> count = Flux//
					.fromStream(this.returnStream()) //
					.take(10) //
					.flatMap(c -> Flux.zip(Mono.just(c), Mono.fromCompletionStage(this.returnCompletableFuture(c)))) //
					.map(tuple -> tuple.getT2() + " #" + tuple.getT1()); //

			count.subscribe(System.out::println);
		};
	}

	public static void main(String[] args) {
		SpringApplication.run(BootifulApplication.class, args);
	}

}

你知道使用 Reactor 還能得到什麼嗎? 運算子支援重試、錯誤處理、逾時等,如果這些運算子未包含在 Project Reactor 中,你還會將它們外包給另一個第三方函式庫。 雙贏。

我可以繼續談論給定像 Project Reactor 這樣的東西所呈現的機會。 確實,我做到了。 查看我的書 Reactive Spring 以獲得(更多)資訊。

嗯? 你玩得開心,也許學到了一些東西嗎? 與往常一樣,我很想收到你的來信,所以 請在 Twitter 上發聲 (@starbuxman) ! 我將在本週晚些時候返回另一個 YMNNALFT 單元,所以一定要錯過。 我有關於 Easy RPC、「*Utils 物件花園」、使用 Micrometer 進行多維指標以及許多其他主題的單元。

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速你的進度。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔。

瞭解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部