領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多上週的 Spring Data Kay M1 是第一個支援 reactive 資料存取的版本。其初始支援的儲存庫集合 — MongoDB、Apache Cassandra 和 Redis — 都已經包含了 reactive drivers,使其成為此原型非常自然的候選者。讓我們更詳細地了解新的編程模型以及構成該支援的 API。
儲存庫編程模型是 Spring Data 使用者通常處理的最高層級抽象。它們通常包含一組在 Spring Data 提供的介面中定義的 CRUD 方法和特定於網域的查詢方法。以下是 reactive Spring Data 儲存庫定義的樣子
public interface ReactivePersonRepository
extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(Mono<String> lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}
如您所見,與您習慣的沒有太大的不同。但是,與傳統儲存庫介面相比,reactive 儲存庫使用 reactive 類型作為返回類型,並且也可以對參數類型執行此操作。新引入的 ReactiveCrudRepository
中的 CRUD 方法當然也使用這些類型。
預設情況下,reactive 儲存庫使用 Project Reactor 類型,但也可以使用其他 reactive 函式庫。我們為這些函式庫提供自訂儲存庫基本介面 (例如 RxJava2CrudRepository
),並且還會根據需要自動調整查詢方法的類型,例如 RxJava 的 Observable
和 Single
。其餘的基本上保持不變。但是請注意,目前的里程碑尚不支援分頁,並且您當然必須在類別路徑上擁有必要的 reactive 函式庫才能啟用對特定函式庫的支援。
與我們在阻塞世界中擁有的類似,對 reactive Spring Data 的支援透過 @Enable…
註釋以及一些基礎結構設定來啟用
@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {
@Bean
public MongoClient mongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "reactive";
}
}
請參閱我們如何為基礎結構配置使用不同的基底類別,因為我們需要使用 MongoDB 異步驅動程式。
現在可以像阻塞儲存庫一樣使用儲存庫,除了結果的處理現在可以以 reactive 的方式完成
@RestController
class PersonController {
private final PersonRepository people;
public PersonController(PersonRepository people) {
this.people = people;
}
@GetMapping("/people")
Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {
Flux<Person> result = repository.findByLastname(lastname);
return result.map(it -> it.getFullName());
}
}
請參閱我們如何轉發 Spring Web Reactive 提供的 reactive 參數,將它們管道傳輸到儲存庫中,依序取回一個 Flux
,然後以 reactive 的方式處理執行結果。通常,reactive 查詢方法遵循與已知的儲存庫相同的查詢建立理念。傳遞給查詢方法的參數可以是純參數(例如 String
)封裝參數(Optional<String>
、Stream<String>
)或 reactive 封裝引數(Mono<String>
、Flux<String>
)。如果您使用 reactive 封裝器作為參數類型,則實作會延遲實際查詢建立和執行,直到實際訂閱。
正如傳統儲存庫基於傳統範本實作一樣,reactive 儲存庫建立在 reactive 範本之上。阻塞範本 API 中可用的大多數操作在 reactive 範本中都有對應的項目。我們將把阻塞世界中的更多功能移植到 reactive 範本 API 中,但是某些操作根本無法透過 reactive 驅動程式(尚未)使用,或者在 reactive 世界中沒有意義。
以下是 Spring Data MongoDB 中的 ReactiveMongoOperations
的摘錄。它由 ReactiveMongoTemplate
實現,並使用 Project Reactor 的 reactive 類型 (例如 Mono
和 Flux
) 來封裝回應。某些方法還接受 reactive 類型,以將資料流式傳輸到您的資料儲存中。
public interface ReactiveMongoOperations {
// …
/**
* Map the results of an ad-hoc query on the specified collection to a
* single instance of an object of the specified type.
*/
<T> Mono<T> findOne(Query query, Class<T> entityClass);
/**
* Map the results of an ad-hoc query on the collection for the entity
* class to a List of the specified type.
*/
<T> Flux<T> find(Query query, Class<T> entityClass);
/**
* Insert the object into the specified collection.
*/
<T> Mono<T> insert(T objectToSave, String collectionName);
/**
* Insert the object into the collection for the entity type of the object
* to save.
*/
<T> Mono<T> insert(Mono<? extends T> objectToSave);
// …
}
請注意,所有方法都遵循 reactive 執行模型,不會在調用時執行任何包含任何 I/O 的操作,而僅在訂閱傳回值時執行。
讓我們透過範本插入一些資料
Flux<Person> flux = Flux.just(new Person("Walter", "White"),
new Person("Skyler", "White"),
new Person("Saul", "Goodman"),
new Person("Jesse", "Pinkman"));
template.insertAll(flux).subscribe();
某些方法 (例如 insertAll(…)
) 接受 reactive 類型,以將傳入資料非同步流式傳輸到您的 MongoDB 資料庫中,例如來自您在 Spring Web Reactive 控制器中接收到的 Flux
,該控制器將透過 Jackson 非同步映射 JSON 陣列
@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {
return template.insertAll(people);
}
如您所見,儲存庫和範本 API 都允許您以 reactive、非阻塞的方式描述請求處理。也就是說,讓我們更深入地研究 Redis 對 reactive 資料存取的支援。
Spring Data Redis 在連線層級上提供了初始 reactive 支援,目前僅在 Lettuce 上提供,因為它是唯一支援 reactive 資料存取的 Redis 驅動程式。由於 Redis 通常在較低的抽象層級上使用,因此 Kay M1 版本從該較低層級的 reactive 抽象開始。LettuceConnectionFactory
允許存取 ReactiveRedisConnection
,依序提供對 Redis 命令的 reactive 版本的存取
使用運算符的功能鏈建立以 reactive 方式存取 Redis 資料的鏈。同樣,所有 I/O 都是非同步的。
ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
.flatMap(keyCommands::type)
.flatMap(System.out::println)
.subscribe();
此程式碼會取得一個隨機金鑰並列印其資料類型。不存在的隨機金鑰會作為空的 Mono
完成。
Reactive Redis 命令有兩種形式:接受純引數和接受命令發布者。命令發布者發出特定的 Redis 命令,以將資料直接流式傳輸到 Redis 中。每個發出的命令在執行命令後發出命令回應。
public interface ReactiveStringCommands {
// …
Mono<Boolean> set(ByteBuffer key, ByteBuffer value);
Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);
// …
}
傳統的 Spring Data Redis 在其阻塞 API 上使用 byte[]
來交換資料。如果資料在緩衝區(例如 ByteBuffer
或 Netty 的 ByteBuf
)中可用,則 byte[]
會強制資料重複。Reactive 支援在很大程度上與有效資源使用有關,因此我們決定公開接受和傳回 ByteBuffer
的方法。
我希望這篇部落格文章向您介紹了 Kay 提供的各種抽象層級的 reactive 功能。您可以在我們的範例儲存庫中找到所有這些的可執行範例。
我們期待在 2017 年 1 月發布另一個里程碑版本,然後再推出候選版本。