領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多嗨,Spring 粉絲們!在這一期的 Spring 小技巧中,我們要來看看阿里巴巴的 Apache RocketMQ。我們之前在 Spring 小技巧中討論過阿里巴巴。請查看先前的 Spring 小技巧,我們探討了一些 Spring Cloud Alibaba。
為了使用 Apache RocketMQ,您需要按照 RocketMQ 快速入門中的步驟進行操作。這一期的 Spring 小技巧介紹了 Apache RocketMQ,它最初是阿里巴巴內部開發和使用的技術,並在著名的中國銷售節日 11/11(類似於美國的「網購星期一」或「黑色星期五」)的磨練中得到驗證。有點像那樣,但規模大得多。在 2019 年,阿里巴巴(單獨,沒有其他電子商務引擎參與)在 24 小時內創造了近 400 億美元的收入。這需要發送數兆條訊息,並透過可以擴展以滿足需求的系統。RocketMQ 是他們唯一可以信任的東西。
執行 Apache RocketMQ 時,您需要使用 Java 8。(當然,在編寫連接到 Apache RocketMQ 的 Spring 應用程式時,可以使用任何版本的 Java。)我使用 SDK 管理器(「SDKman」- sdk
)切換到適當的 Java 版本。
sdk use java 8.0.242.hs-adpt
如果尚未安裝,這將安裝一個可運作的版本。完成後,您需要執行 NameServer。
${ROCKETMQ_HOME}/bin/mqnamesrv
然後您需要執行 Broker 本身。
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876
如果您想使用基於 SQL 的過濾,則需要在 Broker 的組態檔 $ROCKETMQ_HOME/conf/broker.conf
中新增一個屬性,然後告知 RocketMQ 使用該組態。
enablePropertyFilter = true
我使用類似這樣的腳本來啟動所有內容。
export JAVA_HOME=$HOME/.sdkman/candidates/java/8.0.242.hs-adpt
${ROCKETMQ_HOME}/bin/mqnamesrv &
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c ${ROCKETMQ_HOME}/conf/broker.conf
讓我們看看一個簡單的 Producer 類別,它使用 Spring Boot 自動組態和 RocketMQTemplate
。
為了使用它,您需要在 Spring Initializr 上建立一個新專案。我使用最新版本的 Java 產生了一個新專案,然後確保包含 Lombok
。我們還需要 Apache RocketMQ 客戶端和適當的 Spring Boot 自動組態
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
自動組態將建立與正在執行的 Apache RocketMQ Broker 的連線,並由某些屬性告知。
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=greetings-producer-group
第一個屬性 name-server
告知應用程式 Apache RocketMQ Nameserver 的位置。然後,Nameserver 知道 Broker 的位置。您還需要為 Producer 和 Consumer 指定一個群組。在這裡,我們使用 greetings-producer-group
。
package com.example.producer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import java.time.Instant;
@RequiredArgsConstructor
@SpringBootApplication
public class ProducerApplication {
@Bean
ApplicationListener<ApplicationReadyEvent> ready(RocketMQTemplate template) {
return event -> {
var now = Instant.now();
var destination = "greetings-topic";
for (var name : "Tammie,Kimly,Josh,Rob,Mario,Mia".split(",")) {
var payload = new Greeting("Hello @ " + name + " @ " + now.toString());
var messagePostProcessor = new MessagePostProcessor() {
@Override
public Message<?> postProcessMessage(Message<?> message) {
var headerValue = Character.toString(name.toLowerCase().charAt(0));
return MessageBuilder
.fromMessage(message)
.setHeader("letter", headerValue)
.build();
}
};
template.convertAndSend(destination, payload, messagePostProcessor);
}
};
}
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
private String message;
}
我想不到比這更簡單的了!這是一個簡單的 for 迴圈,處理每個名稱,建立一個新的 Greeting
物件,然後使用 RocketMQTemplate
將 Payload 發送到 Apache RocketMQ 主題 greetings-topic
。在這裡,我們使用了 RocketMQTemplate
物件的 Overload,它接受 MessagePostProcessor
。MessagePostProcessor
是一個回呼,我們可以在其中轉換將要發送的 Spring Framework Message
物件。在此範例中,我們貢獻了一個標頭值 letter
,其中包含名稱的第一個字母。我們將在 Consumer 中使用它。
讓我們看看 Consumer。從 Spring Initializr 產生一個新的 Spring Boot 應用程式,並確保新增 Apache RocketMQ 自動組態。您還需要在客戶端的 application.properties
中指定 Nameserver。
自動組態支援定義實作 RocketMQListener<T>
的 Bean,其中 T
是 Consumer 將接收的 Payload 類型。在這種情況下,Payload 是 Greeting
。
package com.example.consumer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.spring.annotation.SelectorType.SQL92;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
private String message;
}
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
consumerGroup = "simple-group"
)
class SimpleConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info(greeting.toString());
}
}
在本範例中,SimpleConsumer
只是記錄來自 Apache RocketMQ 中 greetings-topic
主題的所有傳入訊息。在這裡,Consumer 將處理主題上的 *所有* 訊息。讓我們看看另一個不錯的功能 - 選擇器 - 它可以讓我們有選擇地處理傳入的訊息。讓我們用兩個新的 Listener 替換現有的 RocketMQ Listener。每個 Listener 都將使用 SQL92 相容的 Predicate 來判斷是否應處理傳入的訊息。一個 Listener 僅處理具有與 m
、k
或 t
相符的 letter
標頭的訊息。另一個 Listener 僅處理那些 letter
標頭與 j
相符的訊息。
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
selectorExpression = " letter = 'm' or letter = 'k' or letter = 't' ",
selectorType = SQL92,
consumerGroup = "sql-consumer-group-mkt"
)
class MktSqlSelectorConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info("'m', 'k', 't': " + greeting.toString());
}
}
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
selectorExpression = " letter = 'j' ",
selectorType = SQL92,
consumerGroup = "sql-consumer-group-j"
)
class JSqlSelectorConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info("'j': " + greeting.toString());
}
}
還不錯吧?除了在 24 小時內處理數兆條訊息外,Apache RocketMQ 還支援許多其他功能!它可以將長尾訊息儲存在磁碟上,而不會降低效能。它支援訊息的序列化(排序)、交易、批次處理等。它甚至支援排程訊息 - 僅在特定間隔後傳遞的訊息。不用說,我是 Apache RocketMQ 的忠實粉絲。