Spring 小技巧:Apache RocketMQ

工程 | Josh Long | 2020 年 2 月 25 日 | ...

嗨,Spring 粉絲們!在這一期的 Spring 小技巧中,我們要來看看阿里巴巴的 Apache RocketMQ。我們之前在 Spring 小技巧中討論過阿里巴巴。請查看先前的 Spring 小技巧,我們探討了一些 Spring Cloud Alibaba

執行 Apache RocketMQ

為了使用 Apache RocketMQ,您需要按照 RocketMQ 快速入門中的步驟進行操作。這一期的 Spring 小技巧介紹了 Apache RocketMQ,它最初是阿里巴巴內部開發和使用的技術,並在著名的中國銷售節日 11/11(類似於美國的「網購星期一」或「黑色星期五」)的磨練中得到驗證。有點像那樣,但規模大得多。在 2019 年,阿里巴巴(單獨,沒有其他電子商務引擎參與)在 24 小時內創造了近 400 億美元的收入。這需要發送數兆條訊息,並透過可以擴展以滿足需求的系統。RocketMQ 是他們唯一可以信任的東西。

執行 Apache RocketMQ 時,您需要使用 Java 8。(當然,在編寫連接到 Apache RocketMQ 的 Spring 應用程式時,可以使用任何版本的 Java。)我使用 SDK 管理器(「SDKman」- sdk)切換到適當的 Java 版本。

sdk use java 8.0.242.hs-adpt

如果尚未安裝,這將安裝一個可運作的版本。完成後,您需要執行 NameServer。

${ROCKETMQ_HOME}/bin/mqnamesrv 

然後您需要執行 Broker 本身。

${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876

如果您想使用基於 SQL 的過濾,則需要在 Broker 的組態檔 $ROCKETMQ_HOME/conf/broker.conf 中新增一個屬性,然後告知 RocketMQ 使用該組態。

enablePropertyFilter = true

我使用類似這樣的腳本來啟動所有內容。

export JAVA_HOME=$HOME/.sdkman/candidates/java/8.0.242.hs-adpt
${ROCKETMQ_HOME}/bin/mqnamesrv &  
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c ${ROCKETMQ_HOME}/conf/broker.conf

從 Java 程式碼使用 Apache RocketMQ

讓我們看看一個簡單的 Producer 類別,它使用 Spring Boot 自動組態和 RocketMQTemplate

為了使用它,您需要在 Spring Initializr 上建立一個新專案。我使用最新版本的 Java 產生了一個新專案,然後確保包含 Lombok。我們還需要 Apache RocketMQ 客戶端和適當的 Spring Boot 自動組態

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.0.4</version>
</dependency>

自動組態將建立與正在執行的 Apache RocketMQ Broker 的連線,並由某些屬性告知。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=greetings-producer-group

第一個屬性 name-server 告知應用程式 Apache RocketMQ Nameserver 的位置。然後,Nameserver 知道 Broker 的位置。您還需要為 Producer 和 Consumer 指定一個群組。在這裡,我們使用 greetings-producer-group


package com.example.producer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;

import java.time.Instant;

@RequiredArgsConstructor
@SpringBootApplication
public class ProducerApplication {

	@Bean
	ApplicationListener<ApplicationReadyEvent> ready(RocketMQTemplate template) {
		return event -> {

			var now = Instant.now();
			var destination = "greetings-topic";

			for (var name : "Tammie,Kimly,Josh,Rob,Mario,Mia".split(",")) {

				var payload = new Greeting("Hello @ " + name + " @ " + now.toString());
				var messagePostProcessor = new MessagePostProcessor() {

					@Override
					public Message<?> postProcessMessage(Message<?> message) {
						var headerValue = Character.toString(name.toLowerCase().charAt(0));
						return MessageBuilder
							.fromMessage(message)
							.setHeader("letter", headerValue)
							.build();
					}
				};
				template.convertAndSend(destination, payload, messagePostProcessor);
			}
		};
	}

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
	private String message;
}

我想不到比這更簡單的了!這是一個簡單的 for 迴圈,處理每個名稱,建立一個新的 Greeting 物件,然後使用 RocketMQTemplate 將 Payload 發送到 Apache RocketMQ 主題 greetings-topic。在這裡,我們使用了 RocketMQTemplate 物件的 Overload,它接受 MessagePostProcessorMessagePostProcessor 是一個回呼,我們可以在其中轉換將要發送的 Spring Framework Message 物件。在此範例中,我們貢獻了一個標頭值 letter,其中包含名稱的第一個字母。我們將在 Consumer 中使用它。

讓我們看看 Consumer。從 Spring Initializr 產生一個新的 Spring Boot 應用程式,並確保新增 Apache RocketMQ 自動組態。您還需要在客戶端的 application.properties 中指定 Nameserver。

自動組態支援定義實作 RocketMQListener<T> 的 Bean,其中 T 是 Consumer 將接收的 Payload 類型。在這種情況下,Payload 是 Greeting

package com.example.consumer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

import static org.apache.rocketmq.spring.annotation.SelectorType.SQL92;

@SpringBootApplication
public class ConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConsumerApplication.class, args);
	}
}

@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
	private String message;
}

@Log4j2
@Service
@RocketMQMessageListener(
	topic = "greetings-topic",
	consumerGroup = "simple-group"
)
class SimpleConsumer implements RocketMQListener<Greeting> {

	@Override
	public void onMessage(Greeting greeting) {
		log.info(greeting.toString());
	}
}

在本範例中,SimpleConsumer 只是記錄來自 Apache RocketMQ 中 greetings-topic 主題的所有傳入訊息。在這裡,Consumer 將處理主題上的 *所有* 訊息。讓我們看看另一個不錯的功能 - 選擇器 - 它可以讓我們有選擇地處理傳入的訊息。讓我們用兩個新的 Listener 替換現有的 RocketMQ Listener。每個 Listener 都將使用 SQL92 相容的 Predicate 來判斷是否應處理傳入的訊息。一個 Listener 僅處理具有與 mkt 相符的 letter 標頭的訊息。另一個 Listener 僅處理那些 letter 標頭與 j 相符的訊息。


@Log4j2
@Service
@RocketMQMessageListener(
	topic = "greetings-topic",
	selectorExpression = " letter = 'm' or letter = 'k' or letter = 't' ",
	selectorType = SQL92,
	consumerGroup = "sql-consumer-group-mkt"
)
class MktSqlSelectorConsumer implements RocketMQListener<Greeting> {

	@Override
	public void onMessage(Greeting greeting) {
		log.info("'m', 'k', 't': " + greeting.toString());
	}
}


@Log4j2
@Service
@RocketMQMessageListener(
	topic = "greetings-topic",
	selectorExpression = " letter = 'j' ",
	selectorType = SQL92,
	consumerGroup = "sql-consumer-group-j"
)
class JSqlSelectorConsumer implements RocketMQListener<Greeting> {

	@Override
	public void onMessage(Greeting greeting) {
		log.info("'j': " + greeting.toString());
	}
}

還不錯吧?除了在 24 小時內處理數兆條訊息外,Apache RocketMQ 還支援許多其他功能!它可以將長尾訊息儲存在磁碟上,而不會降低效能。它支援訊息的序列化(排序)、交易、批次處理等。它甚至支援排程訊息 - 僅在特定間隔後傳遞的訊息。不用說,我是 Apache RocketMQ 的忠實粉絲。

取得 Spring 電子報

保持與 Spring 電子報的聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔。

了解更多

即將舉辦的活動

查看 Spring 社群中所有即將舉辦的活動。

查看全部