Bootiful Azure:與 Azure Service Bus 整合 (4/6)

工程 | Josh Long | 2019 年 1 月 14 日 | ...

這是總共 6 部分系列的第 4 部分,每週一和週四發布新文章,介紹適用於 Spring 開發人員的 Microsoft Azure。 如果沒有 Microsoft 的 Asir Vedamuthu Selvasingh、Yitao Dong、Bruno Borges、Brian Benz 和 Theresa Nguyen 的投入,我不可能完成這篇文章。 您可以在 Github 上找到此系列的程式碼。 在您閱讀文章時,請在 Twitter (@starbuxman) 上與我聯繫,提供任何意見或問題。 您也可以在我的 Spring Tips (@SpringTipsLive) 文章 Bootiful Azure 中了解有關 Microsoft Azure 的更多資訊

以下是所有文章

Azure Service Bus 是一種雲端訊息服務和整合技術。 就像 CosmosDB 一樣,它也盡可能地具有彈性。 它像 RabbitMQ 一樣 支援 AMQP 1.0 協定。 AMQP 是一種彈性的線路協定。 除了與訊息佇列互動之外,該協定本身還包含管理訊息佇列的指示。 AMQP 訊息佇列非常適合整合,因為它們與語言和平台無關。 在 AMQP 訊息佇列中,生產者將訊息發送到交換器 (exchanges),然後交換器將訊息路由到佇列 (queues),消費者從佇列中讀取訊息。 交換器負責決定將訊息傳送到哪個佇列。 它以多種方式執行此操作,但通常涉及查看訊息標頭中稱為路由金鑰 (routing key) 的金鑰。

交換器和佇列之間的這種間接性使 AMQP 比基於 JMS 的訊息佇列更具彈性,在基於 JMS 的訊息佇列中,生產者將訊息直接發送到消費者從中讀取的 Destination 物件。 這表示生產者和消費者透過他們選擇的 Destination 連結在一起。 此外,JMS 是一個 JVM 的 API,它不是一個線路協定。 因此,生產者和消費者取決於他們使用的程式庫的版本是否正確。 也就是說,您也可以透過 JMS API 使用 Azure Service Bus

就像我說的,Azure Service Bus 非常有彈性!

AMQP 模型具有說明意義,因為基本上,Azure Service Bus 的原生模型看起來像 AMQP。 在 Azure Service Bus 中,您擁有主題或佇列,您可以將訊息傳送到其中。 然後,訊息會連接到訂閱,消費者從訂閱中讀取訊息。 讓我們建立一個簡單的範例,該範例會傳送然後使用訊息。 我們將不使用 AMQP 或 JMS,僅使用普通的 Microsoft Azure ServiceBus API。

在 Microsoft Azure 上設定 Azure Service Bus

您需要佈建一個 servicebus 命名空間、一個主題(我們將訊息傳送到該主題,多個消費者可以從中監聽)和一個訂閱(主題或佇列的消費者)以連接到該主題。 這是一個執行此操作的範例腳本。

#!/usr/bin/env bash

destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1

az servicebus namespace create --resource-group $rg \
    --name ${namespace}

az servicebus topic create --resource-group $rg \
    --namespace-name ${namespace} \
    --name ${topic}

az servicebus topic subscription create --resource-group $rg  \
    --namespace-name ${namespace} --topic-name ${topic} \
    --name ${subscription}

您需要一個連接字串才能將您的 Spring 應用程式連接到 servicebus。 執行此命令並記下 primaryConnectionString 屬性值,以供稍後使用。

az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey

將 Azure Service Bus 引入您的 Spring 應用程式

將以下相依性新增到您的建置:com.microsoft.azure : azure-servicebus-spring-boot-starter

我們將編寫兩個元件:一個生產者和一個消費者。 在真實的應用程式中,這些東西自然會存在於單獨的應用程式和單獨的流程中。 畢竟,訊息傳輸是為了支援不同應用程式的整合。 我們先來看消費者。 消費者需要在其他東西產生訊息之前註冊訂閱者,因此我們將使這些 bean 成為有序的 - Spring 容器將根據我們給定的 Ordered 值,在另一個之前對它們的初始化進行排序。

package com.example.bootifulazure;

import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Log4j2
@Component
class ServiceBusConsumer implements Ordered {

    private final ISubscriptionClient iSubscriptionClient;

    ServiceBusConsumer(ISubscriptionClient isc) {
        this.iSubscriptionClient = isc;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void consume() throws Exception {

        this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {

            @Override
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
                return CompletableFuture.completedFuture(null);
            }

            @Override
            public void notifyException(Throwable exception, ExceptionPhase phase) {
                log.error("eeks!", exception);
            }
        });

    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

當訊息到達時,我們會記錄其 bodymessageId

現在,讓我們看看生產者。

package com.example.bootifulazure;

import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

import java.time.Instant;

@Log4j2
@Component
class ServiceBusProducer implements Ordered {

    private final ITopicClient iTopicClient;

    ServiceBusProducer(ITopicClient iTopicClient) {
        this.iTopicClient = iTopicClient;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void produce() throws Exception {
        this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}

非常簡單,對吧? 這些類別的重點在於 consume()produce() 方法。 消費者先運行,然後是生產者。 如果您曾經使用訊息傳輸技術,您可能會發現缺少任何種類的目的地(主題或佇列)有點令人費解。 該設定都存在於屬性中(例如您 application.properties 檔案中的屬性),並且在自動設定 ITopicClientISubscriptionClient 時使用。 如果您想從多個目的地傳送訊息或使用訊息,只需自行定義相關的 bean,並確保不要在應用程式的屬性中指定 azure.service-bus.connection-string,否則預設的 Spring Boot 自動設定會啟動並嘗試為您建立這些 bean。

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

檢視全部