反應式程式設計筆記 Part II:編寫一些程式碼

工程 | Dave Syer | 2016 年 6 月 13 日 | ...

在本文中,我們將繼續探討反應式程式設計系列,並專注於透過實際的程式碼範例來解釋一些概念。最終結果應該是讓您更了解反應式程式設計的不同之處,以及其函數式的特性。此處的範例相當抽象,但它們為您提供了一種思考 API 和程式設計風格的方式,並開始感受它的不同之處。我們將看到反應式的元素,並學習如何控制資料流,以及在必要時在背景執行緒中處理。

設定專案

我們將使用 Reactor 程式庫來說明我們需要闡述的重點。程式碼也可以使用其他工具輕鬆編寫。如果您想嘗試程式碼並查看其運作方式,而無需複製貼上任何內容,則可以在 Github 中找到包含測試的工作範例。

若要開始,請從 https://start.spring.io 取得一個空白專案,並新增 Reactor Core 相依性。使用 Maven:

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.0.0.RC2</version>
		</dependency>

使用 Gradle 也很類似:

    compile 'io.projectreactor:reactor-core:3.0.0.RC2'

現在讓我們編寫一些程式碼。

函數式的特性是什麼?

反應式程式設計的基本建構區塊是事件序列,以及兩個主角:事件的發佈者和訂閱者。將序列稱為「串流」也可以,因為它本質上就是串流。如果需要,我們將使用小寫「s」的「串流」一詞,但 Java 8 有一個不同的 java.util.Stream,因此盡量不要混淆。我們將盡力將敘述重點放在發佈者和訂閱者上(這也是 Reactive Streams 的目標)。

Reactor 是我們將在範例中使用的程式庫,因此我們將堅持使用那裡的符號,並將發佈者稱為 Flux(它實作了 Reactive Streams 的介面 Publisher)。RxJava 程式庫非常相似,並且具有許多並行功能,因此在這種情況下,我們將討論 Observable,但程式碼會非常相似。(Reactor 2.0 將其稱為 Stream,如果我們也需要談論 Java 8 Streams,這會令人困惑,因此我們只會在 Reactor 3.0 中使用新程式碼。)

產生器

Flux 是特定 POJO 類型事件序列的發佈者,因此它是泛型的,即 Flux<T>T 的發佈者。Flux 有一些靜態便利方法,可以從各種來源建立自身的實例。例如,若要從陣列建立 Flux

Flux<String> flux = Flux.just("red", "white", "blue");

我們剛剛產生了一個 Flux,現在我們可以對它做一些事情。實際上,您只能對它做兩件事:對它進行操作(轉換它,或將其與其他序列組合),訂閱它(它是一個發佈者)。

單值序列

您經常會遇到已知只有一個或零個元素的序列,例如,透過 ID 尋找實體的存放庫方法。Reactor 有一個 Mono 類型,表示單值或空的 FluxMono 具有與 Flux 非常相似的 API,但更集中,因為並非所有運算子都適用於單值序列。RxJava 在版本 1.x 中也有一個附加元件,稱為 Single,以及用於空序列的 Completable。Reactor 中的空序列是 Mono<Void>

運算子

Flux 上有很多方法,幾乎所有方法都是運算子。我們不會在這裡查看所有方法,因為有更好的地方可以查看(例如 Javadocs)。我們只需要了解運算子是什麼,以及它可以為您做什麼。

例如,若要要求將 Flux 內部的事件記錄到標準輸出,您可以呼叫 .log() 方法。或者您可以使用 map() 轉換它:

Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

在此程式碼中,我們透過將輸入中的字串轉換為大寫來轉換它們。到目前為止,這很簡單。

這個小範例有趣之處 — 如果您不習慣,甚至會感到震撼 — 是尚未處理任何資料。甚至沒有記錄任何內容,因為實際上,什麼都沒發生(試試看您就會明白)。在 Flux 上呼叫運算子相當於為稍後建構執行計畫。它是完全宣告式的,這也是人們稱其為「函數式」的原因。運算子中實作的邏輯僅在資料開始流動時執行,而這只有在有人訂閱 Flux(或等效於 Publisher)時才會發生。

處理資料序列的相同宣告式、函數式方法存在於所有反應式程式庫中,也存在於 Java 8 Streams 中。請考慮這個外觀相似的程式碼,使用與 Flux 相同內容的 Stream

Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
});

我們對 Flux 所做的觀察也適用於此處:沒有處理任何資料,這只是一個執行計畫。但是,FluxStream 之間存在一些重要的差異,這使得 Stream 不適合用於反應式用例。Flux 有更多的運算子,其中大部分只是便利性,但真正的差異在於您想要使用資料時,因此這就是我們接下來需要查看的內容。

提示

Sebastien Deleuze 在 反應式類型 上有一篇有用的部落格文章,他在文章中透過查看它們定義的類型以及您將如何使用它們來描述各種串流和反應式 API 之間的差異。FluxStream 之間的差異在那裡得到了更詳細的強調。

訂閱者

若要讓資料流動,您必須使用 subscribe() 方法之一訂閱 Flux。只有這些方法才能讓資料流動。它們會回溯您在序列上宣告的運算子鏈(如果有的話),並要求發佈者開始建立資料。在我們一直在處理的範例範例中,這表示基礎字串集合會被迭代。在更複雜的用例中,它可能會觸發從檔案系統讀取檔案、從資料庫提取資料或呼叫 HTTP 服務。

以下是在動作中的 subscribe() 呼叫:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();

輸出為:

09:17:59.665 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

因此,我們可以從中看到,沒有引數的 subscribe() 的效果是要求發佈者傳送所有資料 — 只有一個 request() 被記錄,並且它是「無界限的」。我們還可以查看為發佈的每個項目(onNext())、序列的結尾(onComplete())和原始訂閱(onSubscribe())的回呼。如果您需要,您可以使用 Flux 中的 doOn*() 方法自行監聽這些事件,這些方法本身是運算子,而不是訂閱者,因此它們本身不會導致任何資料流動。

subscribe() 方法已多載,其他變體為您提供不同的選項來控制發生的情況。一種重要且方便的形式是使用回呼作為引數的 subscribe()。第一個引數是 Consumer,它為您提供每個項目的回呼,您也可以選擇性地新增一個 Consumer 來處理錯誤(如果有的話),以及一個普通的 Runnable,以便在序列完成時執行。例如,僅使用每個項目的回呼:

Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

以下是輸出:

09:56:12.680 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

我們可以透過多種方式控制資料流,使其成為「有界限的」。控制的原始 API 是您從 Subscriber 取得的 Subscription。上面簡短呼叫 subscribe() 的等效長形式是:

.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

});

若要控制流程,例如,一次最多使用 2 個項目,您可以更聰明地使用 Subscription

.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
...

Subscriber 一次「批次處理」2 個項目。這是一個常見的用例,因此您可能會考慮將實作提取到便利類別中,這也會使程式碼更具可讀性。輸出如下所示:

09:47:13.562 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

事實上,批次處理訂閱者是一個非常常見的用例,以至於 Flux 中已經提供了便利方法。上面的批次處理範例可以像這樣實作:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe(null, 2);

(請注意使用請求限制呼叫 subscribe())。以下是輸出:

10:25:43.739 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

提示

像 Spring Reactive Web 這樣會為您處理序列的程式庫可以處理訂閱。能夠將這些問題向下推到堆疊中是件好事,因為它可以讓您免於用非業務邏輯來混亂您的程式碼,使其更具可讀性,更易於測試和維護。因此,作為一個規則,如果您可以避免訂閱序列,或者至少將該程式碼推入處理層,並從業務邏輯中移出,那是一件好事。

執行緒、排程器和背景處理

以上所有記錄的一個有趣特點是它們都在「main」執行緒上,這是呼叫 subscribe() 的呼叫端的執行緒。這突顯了一個重點:Reactor 在執行緒方面非常節儉,因為這為您提供了獲得最佳效能的最大機會。如果您在過去 5 年中一直在與執行緒和執行緒池以及非同步執行作鬥爭,試圖從您的服務中榨取更多效能,那麼這可能是一個令人驚訝的說法。但這是事實:在沒有任何切換執行緒的必要條件下,即使 JVM 經過最佳化以非常有效率地處理執行緒,在單一執行緒上進行運算始終更快。Reactor 已將控制所有非同步處理的鑰匙交給您,並且它假設您知道自己在做什麼。

Flux 提供了一些配置器方法,用於控制執行緒邊界。例如,您可以使用 Flux.subscribeOn() 配置在背景執行緒中處理訂閱:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2);

結果可以在輸出中看到:

13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()

提示

如果您自己編寫此程式碼或複製貼上它,請記得等待處理停止,然後 JVM 才會結束。

請注意,訂閱和所有處理都在單一背景執行緒「parallel-1-1」上進行 — 這是因為我們要求我們的主要 Flux 的訂閱者在背景中。如果項目處理是 CPU 密集型的,這很好(但實際上在背景執行緒中是毫無意義的,因為您為上下文切換付出了代價,但並沒有更快地獲得結果)。您可能還希望能夠執行 I/O 密集型且可能阻塞的項目處理。在這種情況下,您希望盡快完成它,而不會阻塞呼叫端。執行緒池仍然是您的朋友,這也是您從 Schedulers.parallel() 獲得的。若要將個別項目的處理切換到單獨的執行緒(最多達到池的限制),我們需要將它們分解為單獨的發佈者,並針對每個發佈者要求在背景執行緒中取得結果。一種方法是使用稱為 flatMap() 的運算子,它將項目對應到 Publisher(可能是不同類型的),然後對應回新類型的序列:

Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

請注意,此處使用了 flatMap() 將項目向下推送到「子」發佈者中,在該發佈者中,我們可以控制每個項目的訂閱,而不是整個序列的訂閱。Reactor 內建了預設行為,盡可能長時間地掛在單一執行緒上,因此如果我們希望它在背景執行緒中處理特定項目或項目群組,則需要明確指定。實際上,這是強制並行處理的少數公認技巧之一(請參閱 Reactive Gems 問題以取得更多詳細資訊)。

輸出如下所示:

15:24:36.596 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE

請注意,現在有多個執行緒正在使用項目,並且 flatMap() 中的並行提示確保在任何給定時間處理 2 個項目,只要它們可用即可。我們經常看到 request(1),因為系統正在嘗試在管道中保留 2 個項目,並且通常它們不會同時完成處理。實際上,Reactor 在這裡嘗試非常聰明,並且它會從上游 Publisher 預先提取項目,以盡力消除訂閱者的等待時間(我們在這裡看不到這一點,因為數字很低 — 我們只處理 3 個項目)。

提示

三個項目(「red」、「white」、「blue」)可能太少,無法令人信服地看到多個背景執行緒,因此最好產生更多資料。例如,您可以使用隨機數字產生器來做到這一點。

Flux 也有一個 publishOn() 方法,它與 flatMap() 相同,但適用於監聽器(即 onNext() 或消費者回呼),而不是訂閱者本身:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
});

輸出如下所示:

15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE

請注意,消費者回呼(記錄「Consumed: …​」)位於發佈者執行緒 pub-1-1 上。如果您取出 subscribeOn() 呼叫,您也可能會看到所有第二個資料區塊在 pub-1-1 執行緒上處理。同樣,這是 Reactor 在執行緒方面很節儉 — 如果沒有明確要求切換執行緒,它會為下一個呼叫保留在同一個執行緒上,無論下一個呼叫是什麼。

注意

我們在此範例中將程式碼從 subscribe(null, 2) 變更為在 publishOn() 中新增 prefetch=2。在這種情況下,subscribe() 中的提取大小提示將被忽略。

提取器:來自黑暗面的訂閱者

還有另一種訂閱序列的方法,即呼叫 Mono.block()Mono.toFuture()Flux.toStream()(這些是「提取器」方法 — 它們將您從反應式類型中提取到靈活性較差的阻塞抽象中)。Flux 也有轉換器 collectList()collectMap(),它們將 Flux 轉換為 Mono。它們實際上並未訂閱序列,但它們確實丟棄了您可能對個別項目的訂閱擁有的任何控制權。

警告

一個好的經驗法則是「永遠不要呼叫提取器」。有一些例外情況(否則這些方法將不存在)。一個值得注意的例外情況是在測試中,因為能夠阻塞以允許結果累積很有用。

這些方法作為從反應式程式設計橋接到阻塞程式設計的逃生艙口而存在;例如,如果您需要適應舊版 API(例如 Spring MVC)。當您呼叫 Mono.block() 時,您會丟棄反應式串流的所有優點。這是反應式串流和 Java 8 Streams 之間的關鍵差異 — 原生 Java Stream 只有「全有或全無」訂閱模型,相當於 Mono.block()。當然,subscribe() 也可能阻塞呼叫執行緒,因此它與轉換器方法一樣危險,但您可以進行更多控制 — 您可以透過使用 subscribeOn() 來防止它阻塞,並且您可以透過應用背壓並定期決定是否繼續來滴流項目。

結論

在本文中,我們介紹了反應式串流和 Reactor API 的基礎知識。如果您需要了解更多資訊,有很多地方可以查閱,但沒有任何東西可以替代親自動手編碼,因此請使用 GitHub 中的程式碼(在名為「flux」的專案的測試中,用於本文),或前往 Lite RX Hands On 工作坊。到目前為止,這真的只是間接開銷,而且我們沒有學到任何我們無法使用非反應式工具以更明顯的方式完成的事情。本系列的下一篇文章將更深入地探討反應式模型的阻塞、分派和非同步方面,並向您展示有哪些機會可以獲得真正的益處。

取得 Spring 電子報

保持與 Spring 電子報的連線

訂閱

取得領先

VMware 提供訓練和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

檢視全部