領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多去年,就在 SpringOne Platform 2018 的中間,我發表了這個系列的第一篇文章, _SpringOne Platform 2018 的反應式革命 (第 1/N 部分)_,其中介紹了我們在 SpringOne Platform 2018 上剛推出的一些重要功能。我想追蹤並重新審視那篇部落格和我去年介紹的概念,並展示在過去一年裡我們取得了多大的進展。總而言之:事情變得*容易多了*!
去年,我們宣布了我們正在努力支援使用名為 R2DBC 的新專案進行反應式 SQL 資料存取的標準。JVM 上傳統的 SQL 資料存取方法(如 JDBC)是阻塞式 API。它們會佔用執行緒來完成其工作,並抵消 Spring 等反應式平台的可擴展性優勢。我們希望改善開發人員的這種情況,因此我們構建了 R2DBC。R2DBC 現在已公開發布一年多,並且在此之前已在內部開發了相當長的時間。我們剛剛發布了 0.0.8 版本。我們即將發布 GA 版本。我不確定何時,但我認為我們將在一個可能為 GA 但尚未達到 1.0 的版本中提供大多數開發人員需要的大部分功能。諸如預存程序之類的功能可能會稍後發布。
自去年以來,我們取得了很大的進展。現在,有許多不同的供應商(一些可用,一些正在積極開發中)提供基於 R2DBC 的資料存取支援,包括 PostgresSQL、MySQL、Google Cloud Spanner、H2、Microsoft SQL Server 等。
而且開始使用*更容易*了。只需訪問我在網路上第二喜歡的地方,*在生產環境之後*,Spring Initializr,然後選擇R2DBC
並選擇PostgresSQL
。點擊Generate
。產生的檔案幾乎可以立即運行。您需要指定連接資訊,但與所有 Spring Boot 自動配置一樣,這些詳細資訊最好在 Java 代码之外處理。您可以在環境、-D
參數和屬性檔案中處理它。以下是我在src/main/resources/application.properties
中的樣子。
spring.r2dbc.url=r2dbc:postgres://127.0.0.1/orders
spring.r2dbc.username=orders
spring.r2dbc.password=orders
這是與去年 Java 代码相比,更加等效的程式碼
package com.example.r2dbc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
@SpringBootApplication
public class R2dbcApplication {
public static void main(String[] args) {
SpringApplication.run(R2dbcApplication.class, args);
}
}
interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Customer {
private Long id;
private String email;
}
很酷吧?這些 API 本身就是反應式的,意味著您可以利用這些 API 提供的重試和組合設施。我們可以更進一步。由於 Spring Framework 5.2 中引入的新的ReactiveTransactionManager
抽象,現在支援反應式交易管理。這個介面已經有實現,類似於 Spring Framework 中的PlatformTransactionManager
,適用於 Spring Data Neo4j RX(新的、反應式優先實現)、Spring Data MongoDB,當然還有 R2DBC。這是一個基於我們的 R2DBC 代码的反應式、事務性服務。
@Service
@RequiredArgsConstructor
class CustomerService {
private final TransactionalOperator transactionalOperator;
@Transactional
Flux<Customer> saveAllWithTransactionalAnnotation(String... emails) {
return this.validCustomersFromEmails(emails);
}
Flux<Customer> saveAllWithTransactionalOperator(String... emails) {
return this.transactionalOperator.transactional(
this.validCustomersFromEmails(emails));
}
private Flux<Customer> validCustomersFromEmails(String... emails) {
return Flux.fromArray(emails)
.map(email -> new Customer(null, email))
.doOnNext(c -> Assert.isTrue(c.getEmail().contains("@"), "the email must contain a '@'"));
}
}
您需要明確配置交易。這是方法
@Configuration
@EnableTransactionManagement
class TransactionConfiguration {
@Bean
ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory cf) {
return new R2dbcTransactionManager(cf);
}
@Bean
TransactionalOperator transactionalOperator(ReactiveTransactionManager txm) {
return TransactionalOperator.create(txm);
}
}
在文章的評論中,有人問到是否有人正在與 Jakarta EE 討論所有這些。我提到了(現在已失效的)ADBA 專案。R2DBC 現在是使反應式 SQL 資料存取正常運作的最佳選擇。我希望您能試一試,親自體驗一下,並提供回饋!
去年,我們還首次推出了對 RSocket 的支援,RSocket 是一種由 Netflix(後來搬到 Facebook)和 Netifi 的人員開發的協定。RSocket 是一種線路協定,它將反應式處理的租戶作為協定本身的一部分。Facebook 開發了不同的 RSocket 客戶端,例如,支援 C++ 和 Java。Java RSocket 客戶端基於 Reactor 專案!RSocket 是一種二進位協定,因此理論上您也可以使用其他語言構建客戶端。
RSocket 是一種通用資料傳輸協定。它與有效負載無關。它不在乎您透過線路傳送什麼。它也是為操作而構建的!它甚至在協定中具有專用的訊息框架,以傳達諸如服務健康狀況和運行時間之類的信息。它支援許多訊息交換模式或樣式,包括但不限於請求-回應、發送即忘、發布-訂閱和流式傳輸。天空是極限!這篇文章不可能徹底介紹所有選項,因此讓我們來看一個簡單的流式傳輸範例,它有兩個元件,一個生產者和一個消費者。為了使其正常工作,我去了 Spring Initializr,選擇了Lombok
和RSocket
。我這樣做了兩次,一次用於生產者,另一次用於消費者。讓我們看看。
與以前一樣,您需要在src/main/resources/application.properties
中提供一些配置。
spring.rsocket.server.port=7777
這將啟動一個 RSocket 服務。這是實際的代码。
package com.example.rsocketservice;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import java.time.Duration;
@SpringBootApplication
public class RsocketServiceApplication {
public static void main(String[] args) {
SpringApplication.run(RsocketServiceApplication.class, args);
}
}
@Controller
class GreetingsController {
@MessageMapping("intervals")
Flux<GreetingResponse> interval(GreetingRequest request) {
return Flux
.interval(Duration.ofMillis(1000))
.map(interval -> new GreetingResponse("Hello (#" + interval + ") " + request.getName() + "!"));
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingRequest {
private String name;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingResponse {
private String message;
}
比去年的代码好多了!它在功能上是等效的,但它做了一件更有趣的事情,例如封送一個物件而不是一個簡單的String
。請記住,RSocket 連接是多路複用、有狀態、長時間存在的連接。您可以打開一個連接並重複使用它來處理多個請求。它們可能會斷開連接,並且由您來在那裡加入重試邏輯。這對速度產生了深遠的影響;您無法相信在典型的 HTTP 中連接和斷開連接花了多少時間!
讓我們看看客戶端,一個消費 Greetings 服務的東西。前往 Spring Initializer,啟動一個新專案,選擇與之前相同的相依性,並且僅更改專案名稱。
package com.example.rsocketclient;
import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Component;
@SpringBootApplication
@RequiredArgsConstructor
public class RsocketClientApplication {
@SneakyThrows
public static void main(String[] args) {
SpringApplication.run(RsocketClientApplication.class, args);
System.in.read();
}
@Bean
RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
return builder.connectTcp("localhost", 7777).block();
}
}
@Log4j2
@Component
@RequiredArgsConstructor
class Client {
private final RSocketRequester rSocketRequester;
@EventListener(ApplicationReadyEvent.class)
public void ready() {
this.rSocketRequester
.route("intervals")
.data(new GreetingRequest("World"))
.retrieveFlux(GreetingResponse.class)
.subscribe(im -> log.info("consuming " + im.getMessage() + "."));
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingRequest {
private String name;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class GreetingResponse {
private String message;
}
還不錯吧?我們可以更進一步,並使用 Spring Security 中對 RSocket 的新支援來保護它,並且我們可以使用 Spring Cloud Gateway 的新 RSocket 支援來充當我們服務的前端。
我迫不及待地想在一年後使用最新最好的東西更新這個系列!