取得領先
VMware 提供培訓和認證,以加速您的進度。
了解更多這是 Spring Cloud Stream 2.0.0.RELEASE 準備發布前的一系列預先發布部落格中的第二篇。
Spring Cloud Stream 2.0 引入了輪詢消費者 (Polled Consumers),應用程式可以控制訊息處理速率。
Spring Cloud Stream 具有生產者和消費者的概念;當使用訊息傳遞範例時,MessageChannel
s 會繫結到目的地 (例如 Kafka 主題、Rabbit Exchanges/Queues)。 至今,在消費者端,只要有閒置的消費者可用,就會傳遞訊息。 實際上,broker 控制了傳遞速率;通常,下一個訊息會在處理完目前的訊息後立即傳遞。
2.0 引入了輪詢消費者 (Polled Consumers),應用程式可以控制訊息消耗的速率。 Kafka 和 RabbitMQ binder 支援輪詢消費者。
使用輪詢消費者時,我們不是將 MessageChannel
繫結到目的地,而是繫結 PollableMessageSource
; 例如,可以像這樣配置 PolledProcessor
繫結
public interface PolledProcessor {
@Input
PollableMessageSource destIn();
@Output
MessageChannel destOut();
}
訊息來源具有方法
boolean poll(MessageHandler handler);
在處理程序的 handleRequest
方法退出之前,不會確認訊息。
MessageHandler
是 spring-messaging 中的介面;您可以提供標準的 Spring Integration 訊息處理程序之一,也可以提供您自己的實作 (通常是 lambda)。 由於 handleMessage
方法採用 Message<?>
引數,因此沒有類型資訊,並且不會轉換 payload。
但是,訊息轉換 正如本系列第一篇部落格中所討論的,也可以應用於輪詢消費者。 為了將類型資訊傳達給轉換服務,我們在重載的 poll()
方法中提供參數化的類型參考
boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)
並且訊息 payload 將轉換為類型,它可以很簡單,例如,內容類型為 text/plain
new ParameterizedTypeReference<String>() {}
或者更複雜,例如,JSON 內容類型
new ParameterizedTypeReference<Map<String, Foo>>() {}
以下簡單的 Spring Boot 應用程式提供了一個完整的範例; 它接收 String payload,將它們轉換為大寫,然後將結果轉發到另一個目的地。
@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {
private final Logger logger =
LoggerFactory.getLogger(Blog2Application.class);
public static void main(String[] args) {
SpringApplication.run(Blog2Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
應用程式現在可以控制消耗訊息的速率。
如需更多資訊,請參閱參考手冊中的 使用輪詢消費者 (Using Polled Consumers)。
我們鼓勵您使用以下工具之一提供意見反應
請享用!