領先一步
VMware 提供訓練和認證,以加速您的進展。
了解更多我很榮幸地宣布,Reactor 這個強大的基礎函式庫,用於在 JVM 上建構反應式、快速資料應用程式的函式庫,已經正式 GA!
Reactor 提供了必要的抽象概念,以建構高吞吐量、低延遲(我們現在稱之為「快速資料」)的應用程式,這些應用程式絕對必須處理每秒數千、數萬甚至數百萬個並行請求。
您應該關注 Reactor,因為現代應用程式與非人類消費者(例如手機和手機上運行的應用程式)產生的資料量,已超出傳統的每個連線執行緒伺服器所能支援的範圍,因此 Reactor 為您提供了工具和抽象概念,讓您能夠建構這些大規模的應用程式,而不會陷入管理狀態和在非同步應用程式中傳遞事件的繁瑣細節。現代 JVM 應用程式必須建立在非同步和反應式元件的穩固基礎之上,這些元件能有效率地管理極少量的系統執行緒上執行大量任務。Reactor 專門設計來幫助您建構這類型的應用程式,而不會妨礙您,也不會強迫您在帶有主觀意識的模式中工作。
Reactor 本身深受同名的著名設計模式影響,但它不僅僅從該模式中汲取靈感。其中也包含Actor 模型和傳統的事件驅動回呼程式設計的元素。
雖然 Reactor 是 Spring IO 平台的基礎部分,但 核心 Reactor 函式庫不依賴 Spring。Reactor core 是一個獨立的函式庫,它唯一的外部依賴項是 SLF4J 和出色的 LMAX Disruptor RingBuffer 函式庫。
在 Reactor 的核心之上,還有其他可選組件,以促進針對常見模式開發應用程式。Reactor 的一些內建、一流的支援包括
@CompileStatic
,並提供全面的環境建構和事件連接 DSL。Reactor 從頭開始設計,旨在使其功能靈活高效,以便它可以不妨礙您,並幫助您盡可能快速地處理應用程式中的資料。在其最快的配置中,標準的 RingBuffer 支援的 Reactor 可以在標準開發人員筆記型電腦上每秒發布超過 1000-1500 萬個事件。高效能的 Processor
抽象概念可以每秒將超過 1 億個事件推送到您的應用程式中。您的應用程式如何處理資料以減慢 Reactor 的速度可能會因任務而異。但是,在最佳的無操作模式下,吞吐量高達如此,應用程式不會坐在那裡等待 Reactor 完成其工作!
Reactor 核心包含一些基本抽象概念,這些概念的靈感來自 JDK 8 的新函數式抽象概念(在某些情況下直接基於它們),例如 Function<T,V>
、Consumer<T>
、Supplier<T>
和 Predicate<T>
。不僅 Reactor 本身建立在這些抽象概念的基礎之上,您的應用程式也可以利用它們。在未來的某個時候,JDK 8 的採用將普及到 Reactor 可以簡單地從 Reactor 中刪除這些抽象概念,並依賴 JDK 8 中的那些抽象概念。在那之前,您的 JDK 6 和 7 應用程式現在就可以從這些函數式抽象概念中受益。
受到 Reactive Extensions for .NET、Netflix 的 RxJava、JDK 8 Stream 抽象概念以及許多其他函式庫(更不用說 20 年的事件驅動電腦科學)的啟發,Reactor 提供了一種「反應式」程式設計模型,使協調非同步任務變得更加容易。Stream<T>
和 Promise<T>
等抽象概念使鏈結非阻塞動作變得容易且簡潔,而且沒有回呼義大利麵麵條程式碼!
@Inject
AsyncDataLoader loader;
Promise<Buffer> p = loader.get("U-U-I-D")
.map(new Function<Buffer, Data>() {
public Data apply(Buffer buff) {
// transform data
Data data = parser.parse(buff);
return data;
}
})
.filter(new Predicate<Data>() {
public boolean test(Data data) {
// check Data for certain conditions being true
return null != data.getName();
}
})
.consume(new Consumer<Data>() {
public void accept(Data data) {
// only Data that passes the Predicate test makes it here...
}
});
// Promises can also block like a Future
Buffer buff = p.await();
這些動作(map
、filter
、consume
)中的每一個都是執行(可能)非同步的不同動作。在傳統的多執行緒環境中,必須加入圍繞在 Futures 上阻塞和等待完成的嘈雜程式碼片段。但是,使用 Reactor,您只需以反應式方式將動作鏈結在一起,以便動作在先前的動作完成時「反應」資料。
Reactor 包含對 Groovy 語言的一流支援。它支援使用閉包作為回呼,具有強大的 DSL 來配置 Reactor 環境,並提供一些非常酷的運算符多載,用於編寫簡潔的程式碼。
Clojurewerkz 有一個名為 Meltdown 的函式庫,它是基於 Reactor 的。可以輕鬆地添加其他 JVM 語言支援。Reactor 的 API 設計為可擴展的,以便非 Java 語言可以從 Reactor 中的工具中受益。
Reactor 已為 Java 8 做好準備,因此我們先來看看一些使用 JDK 8 的出色 Lambda 功能的 Reactor 程式碼
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));
r.notify("topic", Event.wrap("Hello World!"));
Reactor 希望實現的目標之一是減少您必須編寫的程式碼量;上面的程式碼非常簡潔。但即使在 Java 6 和 7 中,它也非常簡潔
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.on($("topic"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.prinltn("greeting: " + ev.getData());
}
});
r.notify("topic", Event.wrap("Hello World!"));
在 Groovy 中,它甚至更簡潔(正如您所期望的那樣),因為語言支援負責將某些物件轉換為正確的類型,並允許使用閉包
def env = new Environment()
def r = Reactors.reactor(env).on("topic") { String greeting ->
println "greeting: $greeting"
}
r.notify "topic", "Hello World!"
Dispatcher
負責在給定的 Thread
上執行任務。Dispatcher
有多種內建實作,可以在呼叫執行緒、來自池的執行緒、使用單執行緒事件迴圈樣式分派或最快的分派器:RingBufferDispatcher
中執行任務,它使用 LMAX Disruptor RingBuffer 分派任務。
每當您在 Reactor 中建立組件時,通常會指定在分派事件時要使用的 Dispatcher
。RingBuffer 中的事件分派非常有效率,而不是使用執行緒池,這在大量應用程式中可能會變得非常消耗 CPU 和 GC。使用 RingBufferDispatcher
每秒可以分派數千萬個事件。
Selector
是動作到事件金鑰的動態映射。當您將動作指派給 Reactor
時,您會透過註冊 Selector
來告知它要回應哪些事件金鑰。有幾種內建實作可以比對 Object.equals()
之類的東西、執行基於字串的正規表示式比對、URI 範本比對,以便您可以使用熟悉的括號分隔符號表示法來比對 URI、Class.isAssignableFrom()
比對以僅選擇那些從通用抽象概念繼承的金鑰、Predicate
比對以允許您建立基於作用域述詞的任意 Predicate<T>
Selector,甚至還有一個可選的 JsonPathSelector
,它使用 JsonPath 來使用 JsonPath 表達式查詢金鑰中的資料。
您會在範例中注意到使用了一些東西,作為 Java 開發人員,您可能會對此感到有些困惑:用於建立 Selector
[1] 的 $
捷徑方法。如果您使用過 jQuery 進行 Web 開發,那麼您會感到賓至如歸,因為 $
方法只是建立 Selector
的捷徑,就像 jQuery 在編寫 $(".css-class")
之類的東西時建立 CSS 查詢一樣。如果美元符號對您來說太不尋常,Reactor 始終嘗試提供多種完成某項任務的方法;您可以使用 Selectors.object(T)
或 ObjectSelector.objectSelector()
靜態建立方法(或僅使用建構子建立 ObjectSelector
的新實例)。
[1]:除了 $(T)
之外,還有其他用於建立 Selector 的捷徑輔助方法。有 R(String)
用於建立 RegexSelector、T(Class<?>)
用於建立 ClassSelector,以及 U(String)
用於建立 UriTemplateSelector。
Reactor 的 Promise
和 Stream
提供了一種反應式、組合式的方式來協調多個非同步任務,而不會產生過多的回呼義大利麵麵條程式碼。Promise
是一個有狀態的組件,可以在您的應用程式中傳遞,並表示將從另一個執行緒填充的值。與傳統的 Future
類似,Promise
可以阻塞呼叫執行緒。但更重要的是,Promise
可以輕鬆轉換值並執行整個處理鏈。
Stream
與 Promise
類似,因為它提供了一個組合 API 來對未來的值做出反應。但 Stream
與 Promise
的不同之處在於,它旨在處理多個值的傳遞。
為了在 Promise
或 Stream
中填充值,您需要建立一個 Deferred
,它是 Consumer<T>
。您可以將此 Deferred
傳遞到您的服務層,以將最終值傳達回呼叫者。
// Only create one of these per JVM
static Environment env = new Environment();
public class DataLoader {
public Promise<Buffer> load(String key) {
Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);
// submit work to be done in another thread
// like reading data from a datastore
datastore.load(key, deferred);
return deferred.compose();
}
}
// Your service layer uses this API
@Inject
DataLoader loader;
loader.load("obj-key")
.onSuccess(new Consumer<Buffer>() {
public void accept(Buffer b) {
// handle eventual data
}
})
.onError(new Consumer<Throwable>() {
public void accept(Throwable t) {
// handle errors
}
});
Scala 的 Tuple 類別是一種型別安全的方式,可以傳遞封裝其他值的單個物件,而無需建立特定於應用程式的單次使用「持有者」Bean。Reactor 將此功能整合到其自身對 Tuple
類別的解釋中。
Tuple 非常易於使用。您可以使用 Tuple.from(T1, T2, …)
方法建立一個,並且可以使用 Tuple.getT1()
到 Tuple.getTN()
方法從中取得值。
reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
public void accept(Event<Tuple2<URI, Buffer>> ev) {
URI uri = tup.getT1();
Buffer buff = tup.getT2();
// deal with request from uri.getPath()
}
});
// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));
查看 Tuple API 文件,了解所有可能性。
Reactor 隨附功能齊全的 TCP 用戶端和伺服器抽象概念。它們提供了一種簡單的方式來建構可以支援大量用戶端的基於 TCP 的應用程式。Reactor TCP 支援中的基本抽象概念是通用的,可以建立多個實作來利用不同的 TCP 技術。但是,內建實作利用了出色的 Netty 函式庫 來進行非同步 IO。
Reactor 是開放原始碼且採用 Apache 授權。開發人員和使用者社群只是一群普通人,他們希望共同努力,為在 JVM 上建構反應式 FastData 應用程式建立一個絕佳的基礎。加入我們的社群以了解有關 Reactor 的更多資訊,或透過您希望看到的任何改進做出貢獻。
若要快速開始使用 Reactor 並查看各種情境中的一些程式碼,請查看快速入門
https://github.com/reactor/reactor-quickstart
或範例
https://github.com/reactor/reactor-samples
若要 fork 原始碼、閱讀 wiki 或提交問題,請在 GitHub 上造訪我們
https://github.com/reactor/reactor
您可以加入 Google Group 來提出問題或以其他方式為圍繞 Reactor 的討論做出貢獻
https://groups.google.com/forum/#!forum/reactor-framework
存取 Maven 構件以包含在您的專案中
<dependencies>
<!-- core components -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- groovy support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-groovy</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- tcp client/server -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-tcp</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- spring support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
</dependencies>