領先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多Reactor 團隊很高興宣布,Reactor 框架的一些重大更新現已在 Reactor 彈性、非同步、快速資料框架的 1.1.0.RELEASE 版本中提供。此版本包含許多錯誤修正,並重寫了主要元件,使其速度更快,而且可能更重要的是,在記憶體使用方面更有效率。Reactor 1.1 現在包含來自 Goldman Sachs [1] 的出色 gs-collections
函式庫,它為處理各種地圖和集合提供了非常流暢的 API。
以下列出 Reactor 1.0 與 1.1 之間的部分變更:
版本 1.0 中 Stream API 的一些更有用的新增功能包括 Stream.window
和 Stream.timeout
方法。這可讓您收集給定時間段內的值,並將其傳遞到處理鏈中。例如,若要處理每 500 毫秒收集的任何值,請使用 window
Deferred<Pojo, Stream<Pojo>> in = Streams.defer(env);
// add all collected values every half-second
in.compose()
.window(500)
.consume(values -> service.addAll(values));
// another service emits data into the `Deferred`
Pojo p;
while(null != (p = input.next())) {
in.accept(p);
}
RingBuffer
的穩健 HashWheelTimer 實作gs-collections
5.0 [1] 的新 Consumer Registry 實作如果您需要以更可預測的方式控制記憶體使用量,Reactor 包含一個分配 API,該 API 可以由您需要的任何特定池化實作來支援。Reactor 1.1 隨附兩種實作:基於 RingBuffer 的 Allocator
和參考計數的 Allocator
。
基於 RingBuffer 的 Allocator
可以配置為非常像標準 Disruptor RingBuffer
,並具有事件處理程序。但是,如果您只需要阻止生產者並使用基於插槽的分配策略,那麼使用 RingBuffer 進行分配非常簡單
Allocator<Event<Buffer>> pool = new RingBufferAllocatorSpec<Event<Buffer>>()
.ringSize(16 * 1024)
.allocator(() -> new Event<Buffer>(null))
.waitStrategy(new BusySpinWaitStrategy())
.get();
// in your code, maintain a `Reference` you can release
Reference<Event<Buffer>> ref = pool.allocate();
// pass your data POJO to other services
Event<Buffer> ev = ref.get().setData(buffer);
service.invoke(ev);
// when you're done, release the reference
ref.release();
記錄可能非常不利於非同步應用程式的效能,尤其是使用像 RingBuffer 這樣的技術的應用程式,RingBuffer 使用單一執行緒來支援許多任務。如果該執行緒因一個任務執行 IO 寫入記錄項目而被封鎖,則可能會向後級聯到應用程式並導致其完全停止運作。
Reactor 包含一個適用於 Logback [2] 的高效非同步 Appender
實作,它將實際附加操作移至專用的記錄執行緒。這應有助於減輕大多數應用程式中由記錄引起的執行緒壓力。但有時即使這樣也不夠,並且需要更高輸送量的解決方案。這就是 Reactor 基於 Java Chronicle 的 Appender
派上用場的地方。
Java Chronicle [3] 是一個高速訊息傳遞函式庫,它使用記憶體對應檔案來實現快速高效的資料持久性。Reactor 通過提供一個 Appender
將其與 Logback 集成,該 Appender
記錄來自您應用程式的原始事件資料,但不必調用下游 appender。這表示您的記錄事件儲存在 Chronicle
中,但處於原始狀態。需要一個額外的公用程式來後處理「持久」記錄檔,並將這些事件發送到「真實」appender(例如檔案或資料庫),或查看 Chronicle
並尋找符合給定模式的條目。這在生產環境中非常有用,在生產環境中,如果應用程式正常運作,您不必關心記錄,但如果出現問題,您可以輕鬆地從 Chronicle
中提取資料到標準記錄檔中以進行鑑識分析。
若要配置 Reactor DurableAsyncAppender
進行高速記錄,只需在您的 Logback 配置中宣告它。以下是在 logback.xml
配置中使用它的一個範例
<appender name="chronicle" class="reactor.logback.DurableAsyncAppender">
<!-- Uncomment to have log events also sent to a "normal" file appender -->
<!--appender-ref ref="logfile"/-->
<basePath>log/</basePath>
<backlog>2097152</backlog>
</appender>
如果出現問題,您可以使用隨附的公用程式分析 chronicle,方法是將從 chronicle 中提取的事件定向到給定的「真實」Appender
。此範例調用了記錄公用程式(reactor-logback.jar
構件必須在類別路徑上),並從 log/
目錄讀取持久記錄檔,從 logback.xml
讀取 Logback 配置,然後將所有 ERROR 訊息輸出到 logfile
appender,該 appender 在 logback.xml
配置檔中定義。
java reactor.logback.DurableLogUtility --path log/ --config logback.xml --output logfile --level ERROR
Groovy 2.3.0 剛剛發布,包含 大量新功能和效能改進,以及 lambda 閉包支援和其他酷炫的 JDK 8 功能。Reactor 的 Groovy 支援已準備好在 Groovy 2.3 中使用,同時仍與 JDK 7 上的 Groovy 2.2 相容。
TCP 模組已進行改進,其中包含對 UDP 的支援以及基於 ZeroMQ 的新實作。[4]
Reactor 中的 ZeroMQ 支援具有 tcp
和 inproc
支援,並提供簡潔流暢的 API,可使用 Reactor 的高效編解碼器設施快速建立用戶端和伺服器。
ZeroMQ<JsonData> zmq = new ZeroMQ<>(reactorEnv)
.codec(new JacksonJsonCodec());
zmq.router("inproc://queue")
.consume(channel -> channel.consume(service::invoke));
zmq.dealer("inproc://queue")
.consume(channel -> {
JsonData data;
while(null != (data = in.next())) {
channel.sendAndForget(data);
}
});
構件可在 Maven Central 和 repo.spring.io/libs-release
中取得。請注意,Spring 支援的坐標在 1.1 版本中已變更為 org.projectreactor.spring:reactor-spring-*
[6]。
參考文件可在 GitHub wiki 中取得。
更新的 API 文件位於 GitHub pages 網站上。
[1] - https://github.com/goldmansachs/gs-collections
[2] - http://logback.qos.ch/
[3] - https://github.com/OpenHFT/Java-Chronicle
[4] - http://zeromq.org/
[5] - http://openjdk.java.net/projects/code-tools/jmh/
[6] - http://repo.spring.io/libs-release/org/projectreactor/spring/