取得領先
VMware 提供培訓和認證,以加速您的進度。
了解更多Spring 的粉絲們您好! 這個 Spring 週真是瘋狂! 我正在參加 2018 年 SpringOne Platform,沉浸在所有令人興奮的社群活動中,與來自世界各地熱愛 Pivotal 和 Spring 的人們互動! 我剛被邀請與一位來自越南的女士合影,當時我在距離展覽會五英里的地方 - 在我碰巧去吃社群晚餐的當地購物中心! Pivotal 將來自如此多不同文化和地方的人們聚集在一起真是太棒了。
今天,2018 年 9 月 26 日星期三,真是個 *特別的日子*! 當然,本週發生了很多精彩的事情,但今天對我來說非常特別。 今天我們揭開了 Pivotal 在兩個方面所做的驚人工作。 我只想在這篇文章中簡要地談一下這些主題。 毫無疑問,在未來的幾週裡,您會從我們這裡聽到更多關於這些的信息!
首先,我們宣布了我們在嘗試支援使用 R2DBC 進行反應式 SQL 資料存取標準方面的工作。 R2DBC 目前還處於早期階段,但非常令人興奮。 到目前為止,當我們談論資料存取時,我都會很快提醒人們,雖然他們可以在反應式應用程式中使用 JDBC,但他們將擴展該互動的問題視為自己的責任。 他們需要配置更多執行緒到分配給任何 SQL 資料存取發生的反應式串流的 Scheduler
。 這是必要的,因為 JDBC 是一個從根本上阻塞和同步的 API。 它沒有執行 IO,然後在有活動時回呼您的能力; 客戶端執行緒正在等待回覆。 R2DBC 提供了一種替代方案。 它 *並不是* JDBC 的封裝,而是支援建立在原生反應式 SQL 資料庫驅動程式之上的功能性反應式資料存取。 我們有一個 SPI 層和一個支援 PostgreSQL 的實作。
讓我們看一個例子。 為了使這個工作,我去了 Spring Initializr 並選擇了 Reactive Web
和 Lombok
。 我確保選擇 Spring Boot 的 SNAPSHOT
版本。 您不需要 Spring Boot 本身的 SNAPSHOT,*本身*,但您會希望 Spring Initializr 將 Spring 快照儲存庫添加到您的建置中,以便您可以解析 r2dbc-postgresql
依賴項。 然後,我(手動!可怕!)編輯了 Maven 建置檔案 pom.xml
,並在建置中添加了對 io.r2dbc:r2dbc-postgresql:1.0.0.BUILD-SNAPSHOT
依賴項的依賴。
package s1p.r2dbc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlResult;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.net.URI;
@Log4j2
@SpringBootApplication
public class PostgresqlApplication {
public static void main(String args[]) {
SpringApplication.run(PostgresqlApplication.class, args);
}
@Bean
PostgresqlConnectionFactory connectionFactory(
@Value("${spring.datasource.url}") String url) {
URI uri = URI.create(url);
String host = uri.getHost();
String userInfo = uri.getUserInfo();
String user = userInfo, pw = "";
if (userInfo.contains(":")) {
user = userInfo.split(":")[0];
pw = userInfo.split(":")[1];
}
String name = uri.getPath().substring(1);
PostgresqlConnectionConfiguration configuration = PostgresqlConnectionConfiguration
.builder() //
.database(name) //
.host(host) //
.username(user) //
.password(pw) //
.build();
return new PostgresqlConnectionFactory(configuration);
}
}
@Log4j2
@Service
class CustomerService {
private final ConnectionFactory connectionFactory;
CustomerService(PostgresqlConnectionFactory pgc) {
this.connectionFactory = pgc;
}
Flux<Result> delete(Long id) {
return Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("DELETE FROM customers where id = $1")
.bind("$1", id) //
.execute());
}
Flux<Result> create(Long id, String email) {
return Mono.from(this.connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("INSERT INTO customers(id,email) VALUES($1, $2)")
.bind("$1", id) //
.bind("$2", email) //
.add().execute());
}
Flux<Customer> all() {
return Mono
.from(this.connectionFactory
.create())
.flatMapMany(connection -> Flux.from(
connection.createStatement("select * from customers").execute())
.flatMap(result -> result.map((row, rowMetadata) -> new Customer(row.get("id", Long.class),
row.get("email", String.class)))));
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {
private Long id;
private String email;
}
很酷吧? 這些 API 是原生反應式的,意味著您可以利用這些 API 提供的重試和組合功能。
我們也首次亮相了對 RSocket 的支援,RSocket 是一種由 Netflix(其中一些人後來轉到 Facebook)的人員開發的協定。 RSocket 是一種線路協定,它將反應式處理的租戶作為協定本身的一部分。 Facebook 開發了兩個 RSocket 客戶端:一個用 C++ 編寫,另一個用 Java 編寫。 Java RSocket 客戶端建立在 Reactor 專案之上! RSocket 是一種二進位制協定,因此從理論上講,您也可以使用其他語言建置客戶端。
RSocket 是一種通用資料傳輸協定。 它支援多種訊息交換模式或樣式,包括但不限於請求-回應、fire-and-forget、發布-訂閱和串流。 沒有限制! 這篇文章無法徹底介紹所有選項,因此讓我們看一個簡單的串流範例,該範例有兩個元件:生產者和消費者。 為了使這個工作,我去了 Spring Initializr,選擇了 Lombok
,然後我選擇了最新(穩定)版本的 Spring Boot。 在建置檔案 pom.xml
中,我添加了兩個依賴項:io.rsocket:rsocket-transport-netty:0.11.5
和 io.rsocket:rsocket-core:0.11.5
。
這裡的目標是示範這些 API 是多麼簡單,以及多麼靈活。 您可以直接使用它們,或者,例如 Netifi 所做的那樣,將它們作為一整套基礎架構的基礎,以支援更進階的用例。
這個簡單範例的關鍵在於生產者將每秒發出一個新記錄,永遠! 只要宇宙沒有經歷過熱寂,這個範例就應該繼續進行!
首先,讓我們看一個微不足道的生產者範例。 我將其設置為監聽 ApplicationReadyEvent
,然後才開始提供請求。 我想保持 Java 進程的運行(並因此監聽請求),所以作為一個有點笨拙的方法,我使用 System.in.read()
來輪詢控制台以獲取輸入。 請記住,此 API 是非阻塞且異步的! 如果我們不協助它,它不會保持主執行緒的存活。
package s1p.rsocket.producer;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
@SpringBootApplication
@Log4j2
public class ProducerApplication implements ApplicationListener<ApplicationReadyEvent> {
public static void main(String[] args) throws IOException {
SpringApplication.run(ProducerApplication.class, args);
System.in.read();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
SocketAcceptor sa = (connectionSetupPayload, rSocket) ->
Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) { // produce a result when asked...
return Flux
.interval(Duration.ofMillis(1000)) // ...every one second
.map(aLong -> DefaultPayload.create("interval: " + aLong));
}
});
RSocketFactory
.receive()
.acceptor(sa)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.onTerminateDetach()
.subscribe(nettyContextCloseable -> log.info("started the server @ " + Instant.now().toString()));
}
}
消費者同樣簡單。 它向服務發起請求,從服務取得反應式串流 Publisher<T>
,並迭代每個記錄,將二進位制有效負載解壓縮為 String
,並在它到達時記錄它。 在這裡,我也使用 System.in.read()
作為保持執行緒運行的一種方式。
package s1p.rsocket.consumer;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import java.io.IOException;
@Log4j2
@SpringBootApplication
public class ConsumerApplication implements ApplicationListener<ApplicationReadyEvent> {
public static void main(String[] args) throws IOException {
SpringApplication.run(ConsumerApplication.class, args);
System.in.read();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent evt) {
RSocketFactory
.connect()
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.flatMapMany(socket ->
socket
.requestStream(DefaultPayload.create("Hello"))
.map(Payload::getDataUtf8)
.doFinally(signal -> socket.dispose())
)
.subscribe(name -> log.info("consuming " + name + "."));
}
}
生產者和消費者都連接到 localhost:7000
並開始互動。 RSocket 像 HTTP 一樣,實際上並不關心訊息的有效負載是什麼。 畢竟,它是線路上的二進位制資料。
在 SpringOne Platform 上錄製了很多擴展這兩個主題的講座! 它們很快就會上線。(您不希望您在現場嗎?)Spring 在這裡扮演著重要的角色,因為它是端到端反應式的。 想像一下! 您可以建置使用 RSocket 進行服務到服務通訊的反應式微服務。 為什麼不使用 @Tailable
反應式 Spring Data MongoDB 儲存庫方法對 MongoDB 資料集中的資料進行某種連續查詢,並透過 RSocket 串流這些結果,因為它們會到達您的 MongoDB 儲存庫中? 您也可以反向操作。 您的 RSocket 客戶端可以串流大量要寫入 MongoDB 的資料,使用剛發布的 Spring Data Lovelace 中 Spring Data MonogDB 中的新 *反應式* 事務支援! 為什麼不使用 R2DBC 並透過 RSocket 以串流方式傳回大量資料。 RSocket 也非常適合邊緣! 例如,您可以透過 websocket 執行 RSocket。 想像一下:基於 RSocket 的服務可以直接被 HTML5 客戶端使用。 或者,您可以使用 Spring WebFlux 建立反應式 HTTP 服務,進而調用基於 RSocket 的服務,進而調用 R2DBC。 正如他們所說,天空可能是無限的和異步的!
Reactor 團隊 和 Spring 團隊的 Ben Hale 等人,已經在這些專案上努力工作了一年多,所以我知道他們非常興奮能與您分享這個消息,並回答您在反應式革命中拿起眾所周知的武器時遇到的任何問題!