領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多Reactor 團隊坦白說有點興奮,終於能夠宣布 Reactor 2.0 的初始里程碑版本發布了!本次更新包含了完全符合規範的 Reactive Streams 實作,在完全重寫的 Stream
和 Promise
API 中!這對 Reactor 使用者來說是一大步。它開啟了與其他 Reactive Streams 實作的整合,例如 Akka Streams、Ratpack、RxJava 等等。Reactor 提供了一個穩固的基礎,讓您能夠建構具有嚴苛高吞吐量和低延遲需求的現代 #uberfastdata
應用程式。
Reactor 2.0 的主要變更是 Stream API。事實上,程式碼庫的大部分其他部分在 1.1 和 2.0 之間都只是稍微調整或保持不變。Stream
和 Promise
則不然。這些組件已從頭開始完全重寫,以利用 Reactive Streams 規範,在函數式反應式串流管線中提供完全非阻塞的反壓。
Akka 的 Dr. Roland Kuhn 雄辯滔滔地談論過這個主題,如果您有興趣探索非阻塞反壓背後的原理,我們鼓勵您觀看他在會議上關於這個主題的演講,其中大部分可在 YouTube 上找到。
簡而言之 (TL;DR)
反壓是發布者和訂閱者關係的倒置,其中 Subscriber
對 Publisher
說「給我接下來 N 個可用的項目」,而不是 Publisher
對 Subscriber
說「拿走我擁有的所有這些項目,無論你是否可以處理它們」。由於 Publisher
是被動地向 Subscriber
提供資料元素,而不是反過來,因此不需要(在完全 Reactive Streams 管線中)緩衝資料,因為資料的 inflight 永遠不會超過您可以處理的量。實際上,某些緩衝或佇列是必要的,但像 Reactor 這樣的函式庫消除了您需要擔心如何完成此操作的需求,因此您可以編寫完全反應式的程式碼,以回應變得可用的資料,而不是試圖弄清楚必須採用哪些 BlockingQueue
或其他種類的低效率方案組合,以確保非同步組件彼此正確隔離。
Reactor 團隊投入了令人沮喪的大量時間來實作 Reactive Streams 規範的全面實作。Reactor 的 Stream
組件為您提供了有用且易於理解的掛鉤,您可以在其上掛載業務邏輯,以便您只需專注於編寫適當範圍的函數式組件,該組件將回應單個資料元素,而不必用大量的樣板邏輯來弄髒您的程式碼,以處理將資料從一個 Thread 傳遞到另一個 Thread、執行有界佇列和緩衝,以及通常在使用反應式、非同步組件時需要的各種其他任務。
您可以在 Reactive Geocoder Demo 中找到程式碼中如何呈現的範例,Reactor 團隊在本年度於德州達拉斯舉行的 SpringOne 中討論了這個範例(重播 可在 InfoQ 上提供給 SpringOne2GX 2014 參與者,稍後將公開)。
以下是一小段程式碼片段,展示如何建立新的 Stream
、將業務邏輯附加到它,然後將資料發布到其中。
// by default Streams use the Disruptor RingBufferDispatcher
HotStream<String> helloStream = Streams.defer(env);
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
helloStream.broadcastNext("World");
當您執行此程式碼時,您將看到記錄的文字「Hello World!」。您還應該注意到,記錄是從 RingBuffer thread 進行的,而不是從您的主 thread 進行的。換句話說,您剛剛將一個任務提交到另一個 Thread 以非同步執行,將結果轉換為其他內容,然後使用 Reactive Streams 非阻塞、基於需求的背壓來回應結果,而無需任何基於 Future 的嘈雜阻塞程式碼!
您也可以建立「冷」串流,這與使用 RxJava 的 Observable
非常相似。
// stream contains the single value "Hello World!"
Stream<String> helloStream = Streams.just("World");
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
當您執行此程式碼時,您將看到記錄的文字「Hello World!」,與之前的範例類似。這裡的不同之處在於,我們從未必須呼叫 broadcastNext(String)
方法,因為當我們附加 Consumer<String>
時,這已為我們處理。您可以像建立 RxJava Observable
一樣,從任何值或值集合中建立串流。這讓您可以將標準 Java Collection API 與反應式串流 API 混合使用。
像 Spark、Storm 和其他大數據函式庫這樣的串流 API 證明,當在沒有無限資源的系統上執行時(基本上是我們在雲端運行的任何東西),以更函數式和反應式的方式處理資料更有效率,而且由於用於建構處理管線的 DSL 的宣告式、自我記錄性質,因此也更易於理解(在許多情況下)。當您將業務邏輯歸結為其本質時,您真的會注意到沒有多少東西不能表示為轉換或消費者函數。您要么取得輸入並產生輸出,要么只是取得輸入。Reactor 的 Stream API 沉浸在這個範例中,因此為您提供了大量(誰能告訴我這個電影參考:「您會說我擁有一大堆皮納塔嗎?」)選項,用於在資料通過您的管線時處理資料。除了像 map(Function<T,V>)
和 filter(Predicate<T>)
這樣的簡單函數之外,還有更複雜的選項,例如 buffer(int)
或 buffer(int, long, TimeUnit)
。後者提供了非常實用的長度和基於時間的「微批次處理」。例如,對於一組難以跨 WAN 連線傳送的資料庫更新,您可能希望緩衝它們,直到達到一定數量或經過一定的逾時時間。
// create a stream backed by a load-balanced, round-robin assigned Dispatcher
Stream<Update> updateStream = Streams.defer(env, env.getDefaultDispatcherFactory().get());
updateStream.buffer(1024, 350, TimeUnit.MILLISECONDS)
.consume(driver::batchUpdate);
這將收集串流更新,直到收集到 1024 個更新或 350 毫秒過期(以先到者為準)。然後,它將通過傳遞一個 List<Update>
,其中包含 1024 個元素或 350 毫秒內收集到的元素數量,來觸發下游處理。這讓您可以編寫非常非常有效率的系統,以批次處理大量資料,以最大限度地減少網路頻寬使用量並最大化吞吐量(同時仍保持可預測的延遲)。
除了微批次處理之外,Stream
還提供了諸如 filter
、flatMap
、movingBuffer
、join
、merge
、sample
、sort
和許多其他操作,這些操作在很大程度上是不言自明的。與 Scala 的集合 API 或 RxJava 的 Observable 非常相似,Reactor 的 Stream
提供了函數式和反應式的方式來快速、有效率且以極高的吞吐量處理資料,同時保持可預測的低延遲。毫不誇張地說,您可以使用 Stream
作為基礎組件來編寫您的整個應用程式,該組件用於提交非同步任務以供執行,也用於以反應式方式處理傳統的資料集合,然後通過將即時資料與歷史資料結合來混合這兩種方法。
有時需要將資料串流拆分為平行管線以進行並行處理。Reactor 的 Stream
提供了一種非常方便的方法,可以使用 parallel(int)
操作來執行此操作。您只需將您的業務邏輯附加到 parallel
呼叫後提供的 Stream
,資料將在下游管線之間進行循環分配,以進行並行處理。
HotStream<String> stream = Streams.defer(env);
// by default uses number of CPUs as thread count
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info));
以下是一個有趣的 Reactive Streams 實作範例,它在您的程式碼中顯示自身:當您執行此程式碼時,您不會得到任何輸出。.parallel()
操作不會在管線中產生「需求」。在 Reactive Streams 系統中,是管線的末端將資料拉入操作,而不是生產者推送資料。由於此管線末端沒有終端操作,因此資料無法被拉取通過。實際上,這通常不是問題,因為您實際上想要在真實應用程式中處理資料。在此範例中,我們只需在 .parallel()
後面新增 .drain()
呼叫即可產生需求並拉取資料通過。我們可能不會在生產系統中執行此操作,但對於測試和演示,我們可以輕鬆地使用 drain。
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
使用反應式系統時,有時很難理解為什麼事情不像您期望的那樣運作。雖然函式庫在改善 IDE 內部非同步串流的即時偵錯過程方面沒有太多可以做的,但始終有經過考驗的豐富記錄方法。Reactor 新增了幾個有點隱藏的方法,稱為 .debug()
和 .log()
,它們應該可以幫助您了解串流是如何建構以及它們在做什麼。.debug()
方法將為您提供串流如何連線的輸出。它將顯示哪些動作連接到哪些動作,以及每個動作中目前可用的容量。.log()
方法將記錄動作附加到您的串流,並輸出訂閱和發布事件。
如果我們在上述範例中的 .parallel()
之前新增 .log()
呼叫,我們將獲得額外的記錄,以告訴我們正在發生什麼事
stream.log()
.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
將產生
[ringBuffer-1] INFO r.r.a.LoggerAction - onSubscribe: {capacity=0/8188 [0%], current=0, pending=0, waiting=0} [main] INFO r.r.a.LoggerAction - subscribe: ConcurrentAction-{dispatcher=RingBuffer:8192, max-capacity=8188} [ringBuffer-1] INFO r.r.a.LoggerAction - request: 9223372036854775807 [ringBuffer-1] INFO r.r.a.LoggerAction - onNext: World [ringBufferGroup-2] INFO r.r.StreamTests - Hello World!
要將現有應用程式升級到 Reactor 2.0,您可能只需要調整一些內容。如果您在 Reactor 1.1 中使用串流,您會發現 Reactor 2.0 串流在發布值方面有所不同。.broadcastNext()
方法定義在 Action
子類別和 HotStream
上,但未定義在某些其他操作上。Reactor 1.1 使用 Deferred
來發布值,因此您的程式碼需要進行調整,以將發布者類型變更為可以存取 .broadcastNext()
方法的類型。如果您使用純 Reactor
或基於 Spring 和註解的事件處理,您幾乎不需要變更任何內容。
要存取里程碑 artifacts,請在您選擇的建置系統中使用 http://repo.spring.io/libs-milestone
儲存庫。例如,如果使用 Gradle(當然您正在使用 Gradle,對吧?),只需像這樣配置您的 repositories
區塊
repositories {
maven { url 'http://repo.spring.io/libs-milestone' }
mavenCentral()
}
要報告錯誤、追蹤 Reactor 2.0 的開發進度、閱讀 wiki 或以其他方式參與 Reactor 社群,請造訪 Reactor 的 GitHub 首頁:https://github.com/reactor/reactor。您也可以在這裡線上閱讀 JavaDoc:http://reactor.github.io/docs/api/2.0.0.M1/index.html