Reactor 1.0.0.M1 - JVM 上非同步快速資料應用程式的基礎

工程 | Jon Brisbin | 2013年7月18日 | ...

我非常興奮地宣布 Project Reactor 的第一個里程碑版本!Project Reactor 是一個基礎框架,用於在 JVM 上建構非同步、快速資料應用程式。Reactor 1.0.0.M1 中的一些優點包括:反應式組合輔助工具 Stream 和 Promise、TcpServer 和 TcpClient,以及 Groovy 和 Spring 支援。受到 Reactive Extensions、RxJava、新的 JDK 8 Stream API(以及 Scala 和其他...)的啟發,這些 Composables 使協調非同步任務變得非常簡單。它們支援使用 Consumers 的傳統回呼樣式程式設計,但也提供簡潔的組合 API,其中包含 map(Function fn)filter(Predicate<t> p)</t>batch(int size) 等方法。

Reactor 解決了什麼問題?

Reactor 從一開始就被設計為一個高效能、高擴展性的平台,用於建構下一代大數據應用程式。在將應用程式擴展到數百、數千甚至數百萬使用者時,非同步架構明顯優於每個請求一個執行緒的架構。Reactor 的非同步基礎為每秒處理數萬或數十萬—甚至數百萬—事件的大數據應用程式提供了堅實的基礎。它提供了簡單的工具來將非同步任務鏈接在一起,並使執行這些任務像呼叫單一方法一樣容易。

使用 Reactor 進行組合

Stream 是一種以非同步方式處理流經應用程式的資料的簡單方法。在 Reactor 中,Stream 實際上包含兩個部分:Deferred(發布者)和實際的 Stream(消費者)。您可以在 Stream 上分配處理常式,以使用組合方法和簡單的回呼來處理資料。

在將資料放入佇列以使用 Stream 和 JDK Lambda 進行進一步處理之前,轉換和篩選進入應用程式的資料,看起來會像這樣


// Create Environment in which Reactors operate
Environment env = new Environment();

// Create a Stream using the high-speed LMAX Disruptor RingBuffer
Deferred<Trade, Stream<Trade>> incoming = Streams.<Trade>defer()
		.env(env)
		.dispatcher(Environment.RING_BUFFER)
		.get();

// Work with the incoming trades
Stream<Trade> trades = incoming.compose();
Stream<Order> orders = trades.map(trade -> tradeService.placeTrade(trade));

// Filter out large orders from small
Stream<Order> highPriority = orders.filter(order -> order.getSize() >= 1000);
Stream<Order> lowPriority = orders.filter(order -> order.getSize() < 1000);

// Consume the orders in different ways
highPriority.consume(order -> orderService.executeNow(order));
lowPriority.consume(order -> orderService.executeLater(order));

TCP 支援

M1 還包括一個易於使用的 TCP 客戶端和伺服器。由速度驚人的 Netty 網路程式庫提供支援,Reactor 驅動的 syslog 伺服器可以在伺服器級硬體上每秒擷取約 100 萬條訊息。Reactor TCP 支援包括一個簡單的 Codec 工具,它很容易擴展到核心中提供的預設編解碼器集之外,並且旨在通過使用 Reactor 的 Buffer 類來實現輕量化,Buffer 類提供了諸如對資料進行極高效視圖之類的功能,以及大量用於處理標準 Java NIO ByteBuffers 的輔助方法—但無需直接處理 ByteBuffer 的痛苦。

Reactor 的 TCP 支援開箱即用就支援 JSON。要建立一個使用 JSON 作為協定的基於 TCP 的 RPC 伺服器,就像這樣簡單


TcpServer<Pojo, Pojo> server = new TcpServerSpec<Pojo, Pojo>(NettyTcpServer.class)
		.env(env)
		.codec(new JsonCodec<>(Pojo.class))
		.consume(conn -> {
			conn.consume(data -> {
				// handle incoming data
			});
		})
		.get()
		.start();

Groovy 和 Spring 支援

Reactor M1 還提供出色的 Groovy 支援。它提供了輔助工具,使使用 Closures 消費事件變得非常簡潔。毋庸置疑,在 Groovy 中編寫 Reactor 事件處理程式非常容易。使用 Closures 處理 Reactors 使非同步程式碼實際上可讀!


def env = new Environment()

// Create Reactor using default RingBuffer Dispatcher
def reactor = Reactors.reactor().env(env).get()

reactor.on('topic') { String s ->
	// handle data
}

// Publish an event to a topic
r1.notify 'topic', 'Hello World!'

Reactor M1 還包括 Spring 支援,使編寫事件驅動的 POJO 像 MVC Controller 一樣容易。通過使用 @On 注釋方法,組件掃描拾取的 bean 可以自動連接到 Reactor 並收到事件通知。

一個簡單的基於 JavaConfig 的 Spring 配置可能如下所示


public class HandlerBean {
	@On(reactor = "@rootReactor", selector = '$("test")')
	public void handleTest() {
		// event 'test' was fired
	}
}

@Configuration
public class AnnotatedHandlerConfig {

	@Bean
	public Environment env() {
		return new Environment();
	}

	@Bean
	public Reactor rootReactor() {
		return env().getRootReactor();
	}
}

只需將您的 Reactor 注入到服務層,當事件準備就緒時,使用 notify() 方法在 Reactor 上發布它們。

Artifacts、原始碼和文件

Maven artifacts 可在 SpringSource Artifactory 儲存庫中找到。在 Gradle 專案中,您可以像這樣拉取 Reactor


ext {
  reactorVersion = '1.0.0.M1'
}

repositories {
	maven { url 'http://repo.springsource.org/libs-milestone' }
  mavenCentral()
}

dependencies {
  // Reactor Core
  compile 'org.projectreactor:reactor-core:$reactorVersion'
}

原始碼可在 GitHub 上取得:https://github.com/reactor/reactor

加入 Reactor Google+ 社群 以隨時了解 Reactor 的最新動態,或 在 Twitter 上關注我們 @ProjectReactor。

文件可在 GitHub WikiAPI Javadoc 上取得。

您也可以在 GitHub Issues 上提交問題並追蹤開發進度。

來參加 SpringOne!

我們將在今年的 SpringOne 上舉辦 關於 Reactor 的完整會議。如果您還沒有計劃參加,那麼您真的應該參加!議程排滿了關於 Spring 社群正在做的令人興奮的事情的精彩會議。快來加入我們!

SpringOne2GX 2013,9 月 9-12 日,加利福尼亞州聖克拉拉

我迫不及待地想深入下一個衝刺,為 1.0 GA 而努力。我們很樂意邀請您一同參與!

取得 Spring 電子報

隨時關注 Spring 電子報

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部