Spring Cloud Stream 2.0 - Polled Consumers (輪詢消費者)

工程 | Gary Russell | 2018 年 2 月 27 日 | ...

這是 Spring Cloud Stream 2.0.0.RELEASE 準備發布前的一系列預先發布部落格中的第二篇。

前言

Spring Cloud Stream 2.0 引入了輪詢消費者 (Polled Consumers),應用程式可以控制訊息處理速率。

簡介

Spring Cloud Stream 具有生產者和消費者的概念;當使用訊息傳遞範例時,MessageChannels 會繫結到目的地 (例如 Kafka 主題、Rabbit Exchanges/Queues)。 至今,在消費者端,只要有閒置的消費者可用,就會傳遞訊息。 實際上,broker 控制了傳遞速率;通常,下一個訊息會在處理完目前的訊息後立即傳遞。

2.0 引入了輪詢消費者 (Polled Consumers),應用程式可以控制訊息消耗的速率。 Kafka 和 RabbitMQ binder 支援輪詢消費者。

詳細資訊

使用輪詢消費者時,我們不是將 MessageChannel 繫結到目的地,而是繫結 PollableMessageSource; 例如,可以像這樣配置 PolledProcessor 繫結

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

訊息來源具有方法

boolean poll(MessageHandler handler);

在處理程序的 handleRequest 方法退出之前,不會確認訊息。

MessageHandler 是 spring-messaging 中的介面;您可以提供標準的 Spring Integration 訊息處理程序之一,也可以提供您自己的實作 (通常是 lambda)。 由於 handleMessage 方法採用 Message<?> 引數,因此沒有類型資訊,並且不會轉換 payload。

但是,訊息轉換 正如本系列第一篇部落格中所討論的,也可以應用於輪詢消費者。 為了將類型資訊傳達給轉換服務,我們在重載的 poll() 方法中提供參數化的類型參考

boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)

並且訊息 payload 將轉換為類型,它可以很簡單,例如,內容類型為 text/plain

  • new ParameterizedTypeReference<String>() {}

或者更複雜,例如,JSON 內容類型

  • new ParameterizedTypeReference<Map<String, Foo>>() {}

整合在一起

以下簡單的 Spring Boot 應用程式提供了一個完整的範例; 它接收 String payload,將它們轉換為大寫,然後將結果轉發到另一個目的地。

@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {

  private final Logger logger =
  	  LoggerFactory.getLogger(Blog2Application.class);

  public static void main(String[] args) {
    SpringApplication.run(Blog2Application.class, args);
  }

  @Bean
  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
          logger.info("Received: " + payload);
          dest.send(MessageBuilder.withPayload(payload.toUpperCase())
              .copyHeaders(m.getHeaders())
              .build());
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
          logger.info("Processed a message");
        }
        else {
          logger.info("Nothing to do");
        }
        Thread.sleep(5_000);
      }
    };
  }

  public static interface PolledProcessor {

    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();

  }

}

結論

應用程式現在可以控制消耗訊息的速率。

如需更多資訊,請參閱參考手冊中的 使用輪詢消費者 (Using Polled Consumers)

我們鼓勵您使用以下工具之一提供意見反應

請享用!

取得 Spring 電子報

訂閱 Spring 電子報,與我們保持聯繫

訂閱

取得領先

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

檢視所有