RabbitMQ 效能與擴充性的路由拓撲

工程 | Helena Edelson | 2011 年 4 月 01 日 | ...

為高度可擴展的系統設計良好的路由拓撲,就像繪製圖表一樣。許多事情需要考量,例如問題、環境的限制、訊息傳遞實作的限制,以及效能策略。我們經常遇到的問題是,在將路由調整到我們的需求時,缺乏彈性和表達性。這正是 RabbitMQ 的突出之處。

基本概念

任何熟悉一般訊息傳遞的人都知道將訊息從 A 路由到 B 的概念。路由可以很簡單或很複雜,當為可擴展的複雜系統設計路由拓撲時,它必須是優雅的。保持乾淨和解耦,組件可以在不同的負載下良好地節流。這可以用簡單的地圖或複雜的圖表來表示。在其最簡單的形式中,路由拓撲可以表示為節點,例如階層式節點

Hierarchical nodes in message routing topology

對於那些 RabbitMQ 或 AMQP 新手(請注意,Rabbit 適用於多種協定,包括 STOMP、HTTP、HTTPS、XMPP 和 SMTP),以下是一些基本組件描述
  • Exchange(交換器) 伺服器內的實體,它從生產者應用程式接收訊息,並可選擇將這些訊息路由到伺服器內部的訊息佇列
  • Exchange type(交換器類型) 特定交換器模型的演算法和實作。與「交換器實例」相反,「交換器實例」是在伺服器內接收和路由訊息的實體
  • Message queue(訊息佇列) 一個具名的實體,用於保存訊息並將其轉發到消費者應用程式
  • Binding(綁定) 一個在訊息佇列和交換器之間建立關聯的實體
  • Routing key(路由金鑰) 交換器可以用於決定如何路由特定訊息的虛擬位址
對於點對點路由,路由金鑰通常是訊息佇列的名稱。對於主題發布-訂閱路由,路由金鑰通常本質上是階層式的

api.agents.agent-{id}.operations.{operationName}

在更複雜的情況下,路由金鑰可以與訊息標頭欄位和/或其內容的路由結合使用。交換器檢查訊息的屬性、標頭欄位、主體內容,以及可能來自其他來源的資料,然後決定如何路由訊息。從上述路由金鑰概念衍生的綁定模式可能看起來像 api.agents.*.operations.*,我們將交換器 E1 綁定到佇列 Q1,綁定模式為 api.agents.*.operations.*,以便發送到 E1 的任何訊息,如果其路由金鑰與綁定模式匹配,則路由到 Q1

Rabbit Broker 的結構與 JMS Broker 不同。每個 RabbitMQ 伺服器都包含至少一個節點(broker),或更常見的是叢集中的節點。每個節點都有一個預設的虛擬主機「/」,並且可以建立進一步的虛擬主機,例如「/development」。Rabbit 虛擬主機就像 Tomcat 虛擬主機,並將 broker 資料劃分為子集。在這些虛擬主機中是交換器和佇列。當使用者使用其憑證連線時,它正在連線到 Rabbit 節點上的虛擬主機。

在這裡,我們連線到 Rabbit 節點,宣告要發布的交換器、要消費的佇列、綁定模式,然後使用 RabbitMQ java client api 發布一些訊息

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

一個簡單的「已著陸」火箭消費可能看起來像


 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
 channel.basicConsume(rocketQueue, false, queueingConsumer);

 int landed = 0;
 while (landed < launched) {
     QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     String rocketLanded = new String(delivery.getBody());

     if (rocketLanded.equalsIgnoreCase("Alderaan")) {
         System.out.println("That's no moon, that's a space station.");
     }
     landed++;
 }

問題

在考慮哪些路由策略在可擴展環境中表現最佳,且效能本身也可以提高時,有很多選擇。關於訊息傳遞的一大優點是一系列可用的配置,以及找出正確的配置來解決當前和不斷增長的需求。

為了保持簡單,讓我們考慮兩種策略

  1. 高度分割的路由,具有階層式路由金鑰,較少的主題交換器
  2. 大量的直接交換器和佇列,路由分割遠遠較少
每個場景都遵循這個用例:每個必須擴展的應用程式既是生產者又是消費者

從哪裡開始

在深入研究隨著時間推移將乾淨且有效率地擴展的路由解決方案之前,最好先盤點您的環境及其組件。例如,什麼有助於擴展?一般來說,解耦、分散式、非同步、平行處理、抽象層次和間接層次等等。然後考慮哪些元素是當前或潛在的瓶頸。一個基本原則是,高流量/高容量路徑需要更有效率的吞吐量,否則您會面臨分配中的瓶頸風險。一個練習是根據流量或熱圖對這些進行排名。接下來,您可以對您的流量進行分類嗎?是否存在總體模式、主題或類似的訊息類型,以及它們之間的關係是什麼?現在開始考慮整合,如何以及在何處可以提高效率,並應用經過測試的模式來解決這些熱點、解耦以進行擴展並提高效能。

一般路由考量

所有交換器類型的行為都不同。以下是一些一般規則
  • 如果應用程式的圖表中路由金鑰的網域是有限的,那麼許多 fanout 交換器可能是合適的(每個路由金鑰的交換器 1:1 映射)
  • 如果您有潛在無限數量的路由金鑰,請考慮主題交換器
  • 對於主題路由,效能會隨著綁定數量的增加而降低
  • Fanout 交換器非常快,因為它們沒有路由要處理,但如果綁定到大量佇列,情況就會改變
  • Direct 交換器是主題交換器的一種更快形式,前提是您不需要萬用字元
  • 跨越 100,000 多個佇列進行問題排查可能很繁瑣,相比之下,具有更多綁定、更少交換器和佇列的拓撲則不然
  • 非常大量的交換器和佇列會佔用更多記憶體,這可能很重要,但這實際上取決於情況
截至 RabbitMQ 2.4.0(2011 年 3 月 23 日發布),一種新的主題路由演算法最佳化可用,其峰值速度比以前的主題演算法快 60 倍。因此,一個建議是減少交換器和佇列,增加路由,因為時間增加現在已降至最低。

效能

什麼是便宜的?

就記憶體成本而言,是交換器和綁定。在 RabbitMQ 所基於的 Erlang 中,每個節點(broker)都是一個進程,每個佇列也是如此。預設情況下,Erlang VM 進程限制設定為 1M,可以提高。但是,交換器不是出於可擴展性原因的進程,它只是 RabbitMQ 內建 Mnesia 資料庫中的一行。在叢集中,宣告交換器會使其出現在叢集的所有節點上,而宣告佇列只會在其中一個節點上建立它。這解釋了為什麼交換器在節點重新啟動或在叢集中建立節點後仍然存在,但佇列卻不行。

請注意綁定變動。在策略編號 2 中,如果您建立許多新的佇列及其綁定,每當消費者連接時,您可能會遇到問題。例如,假設交換器 E1...En 正在向其發布許多訊息,每當消費者 Cm 連接時,它都會從自己的佇列建立到所有 E1...En 的綁定,這可能會導致問題,具體取決於連線速率。

為了減輕綁定變動,請考慮交換器到交換器的綁定,這是 2.3.1 版的新功能。每個消費者都可以有自己的輔助交換器 Ym,它不得自動刪除。然後將所有 E1...En 綁定到 Ym。這樣,這些綁定始終存在。在這種情況下,每當消費者 Cm 連接時,它只需要宣告其佇列並將該佇列綁定到 Ym。如果 Ym 是 fanout 交換器,它將非常快,並將每個連線的綁定變動率降低到 1,而不是每個連線可能為 n。

Exchange-to-Exchange Binding

用例

交換器到交換器的可擴展用例

考慮一個具有自主代理程式的伺服器應用程式。每個代理程式都在虛擬機器上,該虛擬機器是彈性擴展系統的一部分。當每個代理程式啟動時,它會向伺服器發送一條訊息,表明它已上線,然後是許多其他訊息,例如身份驗證和資料傳輸。如果我們有 1,000 個代理程式,每個代理程式宣告 50 個 direct 交換器、佇列和綁定,那麼每個代理程式都必須知道伺服器的佇列,以便履行佇列上綁定合約。宣告操作。這不是一個可擴展的解決方案。

現在考慮建立共享主題交換器:一個用於代理程式到伺服器路徑的交換器,另一個用於伺服器到代理程式路徑的交換器,第三個用於處理未經身份驗證的代理程式,該代理程式僅路由到那些不需要安全性的佇列。現在我們使用綁定模式、訊息路由金鑰進行分割,並為每個伺服器啟動一組這些,以供連線到它的所有代理程式共享。然後,在其最簡單的形式中,當每個代理程式上線時,它會宣告一個私有交換器和佇列,並將其交換器綁定到共享主題交換器。

我們的關係現在由交換器到交換器的映射表示,這降低了變動率,並使代理程式無需「知道」伺服器佇列即可解耦。使用這種模式,系統是乾淨、解耦且可擴展的。

彈性擴展用例

讓我們將之前的場景更進一步。我們已經在場景 2 上使用了主題發布-訂閱路由:許多直接路由。現在假設系統需求增加到在資料中心內擴展伺服器應用程式叢集,其中包含 50,000 個或更多代理程式。我們如何節流不同的負載?

經過身份驗證的客戶端交換器路由從代理程式到伺服器的訊息。它處理所有操作,將訊息發布到單一消費者佇列,包括那些產生最高訊息頻率的佇列。在當前拓撲下,這是一個潛在的瓶頸,對於 10,000 個客戶端或每天 86,400,000 條訊息,每分鐘約有 60,000 條訊息。這很容易解決,RabbitMQ 每天可以處理超過 10 億條訊息,具體取決於您的配置,例如您是否持久化訊息。

我們的伺服器應用程式正在運行 RabbitMQ 叢集。請記住,在叢集中,宣告交換器會使其出現在所有節點上,而宣告佇列只會在其中一個節點上建立它,因此我們必須配置一個解決方案。

生產者和消費者之間的負載平衡

為了有效率地處理這些潛在的極高負載,隨著更多客戶端應用程式(代理程式)上線,我們可以透過幾種方式修改此拓撲。首先,從上述配置進行最佳化,以跨 Rabbit 叢集負載平衡訊息。我們可以為 Rabbit 叢集中的每個節點建立一個佇列。如果我們有四個節點,對於每個高流量佇列,我們為該操作建立 hfq.{0,1,2,3}。現在,每個代理程式都可以透過零到三之間的數字隨機選擇一個節點,或者更複雜的循環配置實作來發布到該節點。使用 RabbitMQ,可以使用 RPC 呼叫,或者您可以使用 Rabbit management plugin 來 GET 節點數量,您可以在循環配置演算法中使用它。

具有循環配置分發的工作佇列

工作佇列或任務佇列通常用於在多個工作人員之間有效率地分配耗時的任務,並輕鬆地平行處理工作。此外,此拓撲適用於消除執行資源密集型任務的需求以及必須阻塞直到它們完成的需求。運行多個工作佇列允許在它們之間分配這些任務。

使用工作佇列,預設情況下,Rabbit 使用循環配置分發方法,將每條訊息按順序發送到下一個消費者。每個消費者接收大致相同數量的訊息。如果您宣告一個佇列並啟動 3 個競爭的消費者,將它們綁定到交換器,並發送 20,000 條訊息,則訊息零將路由到第一個消費者,訊息一路由到第二個消費者,訊息二路由到第三個消費者,依此類推。如果我們開始累積任務積壓,我們可以簡單地添加更多工作人員,從而使系統能夠輕鬆擴展。

效能

記憶體

以上兩種選項都不一定會在 RabbitMQ 中引起高負載。可以建立的交換器和佇列數量沒有硬性限制,並且在一個 broker 上運行 100,000 個佇列是可以的。透過正確的調整和足夠的 RAM,您可以運行超過一百萬個佇列。

RabbitMQ 動態地將訊息推送到磁碟以釋放 RAM,因此佇列的記憶體佔用空間不取決於其內容。在佇列閒置 10 秒或更長時間後,它將「休眠」,這會在該佇列上引起 GC。因此,佇列所需的記憶體量可能會大幅縮減。例如,1000 個空的閒置佇列可能佔用 10MB 的 RAM。當它們都處於活動狀態時(即使是空的),它們當然可能會消耗更多的記憶體,具體取決於記憶體碎片。強制它們恢復休眠狀態以測試行為很困難,因為 Erlang VM 不會立即將記憶體交還給作業系統。

但是,您可以觀察到一個大型進程休眠並且記憶體嚴重碎片化,因為回收的記憶體量可能足以迫使 VM 將記憶體交還給作業系統。如果您運行一個穩定增加 Rabbit 記憶體佔用空間的測試,您可以觀察到休眠對閒置進程的影響,因為它降低了記憶體使用率的增加速度。

Erlang 是一個多執行緒 VM,它利用多個核心。它向開發人員呈現綠色執行緒,這些綠色執行緒被稱為「進程」,因為與執行緒不同,它們在概念上不共享位址空間。這是關於 Erlang VM 和進程 的一個有趣的自白。

交易

對 10,000 條訊息進行交易可能需要長達四分鐘才能發布。一個名為 Publisher Confirms 的新 RabbitMQ 功能比相同的但具有交易性的程式碼快 100 多倍。如果您沒有明確要求實作交易,但確實需要驗證,您可以考慮此選項。

重點

以下是一些最終的重點,可幫助您從實作中擠出最大的效能提升
  • 新的主題路由演算法最佳化在峰值時快 60 倍
  • 使用萬用字元 '*' 的主題綁定模式(匹配單個單字)比 '#'(匹配零個或多個單字)快得多。萬用字元 '#' 在路由表中處理的時間比 '*' 長
  • 交換器到交換器的綁定提高了去耦性、提高了拓撲靈活性、減少了綁定變動,並有助於提高效能
  • 交換器和綁定非常輕量級
  • RabbitMQ Publisher Confirms 比 AMQP 交易快 100 多倍
  • 在佇列閒置 >=10 秒後,它將「休眠」,從而在佇列上引起 GC,從而大幅減少該佇列所需的記憶體
  • 工作佇列有助於平行處理和分配工作負載
  • 在 Rabbit 叢集中分配工作佇列有助於擴展
  • 負載平衡您的拓撲
這絕不是關於該主題的論文,確實還有更多的模式、拓撲和效能細節需要考慮。與往常一樣,策略取決於許多因素,但我希望這足以幫助您或至少讓您朝著正確的方向思考。

取得

GitHub 上的 RabbitMQ 原始碼 RabbitMQ 二進位下載和外掛程式 Erlang 下載 Spring AMQP Java 和 .NET 中的 RabbitMQ API Hyperic 監控 RabbitMQ Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>

取得 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進展。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看全部