領先一步
VMware 提供培訓和認證,以加速您的進展。
了解更多為高度可擴展的系統設計良好的路由拓撲,就像繪製圖表一樣。許多事情需要考量,例如問題、環境的限制、訊息傳遞實作的限制,以及效能策略。我們經常遇到的問題是,在將路由調整到我們的需求時,缺乏彈性和表達性。這正是 RabbitMQ 的突出之處。
api.agents.agent-{id}.operations.{operationName}
在更複雜的情況下,路由金鑰可以與訊息標頭欄位和/或其內容的路由結合使用。交換器檢查訊息的屬性、標頭欄位、主體內容,以及可能來自其他來源的資料,然後決定如何路由訊息。從上述路由金鑰概念衍生的綁定模式可能看起來像 api.agents.*.operations.*
,我們將交換器 E1
綁定到佇列 Q1
,綁定模式為 api.agents.*.operations.*
,以便發送到 E1
的任何訊息,如果其路由金鑰與綁定模式匹配,則路由到 Q1
。
Rabbit Broker 的結構與 JMS Broker 不同。每個 RabbitMQ 伺服器都包含至少一個節點(broker),或更常見的是叢集中的節點。每個節點都有一個預設的虛擬主機「/」,並且可以建立進一步的虛擬主機,例如「/development」。Rabbit 虛擬主機就像 Tomcat 虛擬主機,並將 broker 資料劃分為子集。在這些虛擬主機中是交換器和佇列。當使用者使用其憑證連線時,它正在連線到 Rabbit 節點上的虛擬主機。
在這裡,我們連線到 Rabbit 節點,宣告要發布的交換器、要消費的佇列、綁定模式,然後使用 RabbitMQ java client api 發布一些訊息
package org.demo.simple.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public final class RocketSender {
public void sendRockets() throws IOException {
List<String> rocketsWithRoutings = new RocketRouter().build();
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
String rocketExchange = "rockets.launched";
channel.exchangeDeclare(rocketExchange, "topic");
String rocketQueue = channel.queueDeclare().getQueue();
channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");
for (String rocketTo : rocketsWithRoutings) {
channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
}
channel.close();
connection.close();
}
}
一個簡單的「已著陸」火箭消費可能看起來像
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);
int landed = 0;
while (landed < launched) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
String rocketLanded = new String(delivery.getBody());
if (rocketLanded.equalsIgnoreCase("Alderaan")) {
System.out.println("That's no moon, that's a space station.");
}
landed++;
}
為了保持簡單,讓我們考慮兩種策略
請注意綁定變動。在策略編號 2 中,如果您建立許多新的佇列及其綁定,每當消費者連接時,您可能會遇到問題。例如,假設交換器 E1...En
正在向其發布許多訊息,每當消費者 Cm
連接時,它都會從自己的佇列建立到所有 E1...En
的綁定,這可能會導致問題,具體取決於連線速率。
為了減輕綁定變動,請考慮交換器到交換器的綁定,這是 2.3.1 版的新功能。每個消費者都可以有自己的輔助交換器 Ym
,它不得自動刪除。然後將所有 E1...En
綁定到 Ym
。這樣,這些綁定始終存在。在這種情況下,每當消費者 Cm
連接時,它只需要宣告其佇列並將該佇列綁定到 Ym
。如果 Ym 是 fanout 交換器,它將非常快,並將每個連線的綁定變動率降低到 1,而不是每個連線可能為 n。
現在考慮建立共享主題交換器:一個用於代理程式到伺服器路徑的交換器,另一個用於伺服器到代理程式路徑的交換器,第三個用於處理未經身份驗證的代理程式,該代理程式僅路由到那些不需要安全性的佇列。現在我們使用綁定模式、訊息路由金鑰進行分割,並為每個伺服器啟動一組這些,以供連線到它的所有代理程式共享。然後,在其最簡單的形式中,當每個代理程式上線時,它會宣告一個私有交換器和佇列,並將其交換器綁定到共享主題交換器。
我們的關係現在由交換器到交換器的映射表示,這降低了變動率,並使代理程式無需「知道」伺服器佇列即可解耦。使用這種模式,系統是乾淨、解耦且可擴展的。
經過身份驗證的客戶端交換器路由從代理程式到伺服器的訊息。它處理所有操作,將訊息發布到單一消費者佇列,包括那些產生最高訊息頻率的佇列。在當前拓撲下,這是一個潛在的瓶頸,對於 10,000 個客戶端或每天 86,400,000 條訊息,每分鐘約有 60,000 條訊息。這很容易解決,RabbitMQ 每天可以處理超過 10 億條訊息,具體取決於您的配置,例如您是否持久化訊息。
我們的伺服器應用程式正在運行 RabbitMQ 叢集。請記住,在叢集中,宣告交換器會使其出現在所有節點上,而宣告佇列只會在其中一個節點上建立它,因此我們必須配置一個解決方案。
RabbitMQ 動態地將訊息推送到磁碟以釋放 RAM,因此佇列的記憶體佔用空間不取決於其內容。在佇列閒置 10 秒或更長時間後,它將「休眠」,這會在該佇列上引起 GC。因此,佇列所需的記憶體量可能會大幅縮減。例如,1000 個空的閒置佇列可能佔用 10MB 的 RAM。當它們都處於活動狀態時(即使是空的),它們當然可能會消耗更多的記憶體,具體取決於記憶體碎片。強制它們恢復休眠狀態以測試行為很困難,因為 Erlang VM 不會立即將記憶體交還給作業系統。
但是,您可以觀察到一個大型進程休眠並且記憶體嚴重碎片化,因為回收的記憶體量可能足以迫使 VM 將記憶體交還給作業系統。如果您運行一個穩定增加 Rabbit 記憶體佔用空間的測試,您可以觀察到休眠對閒置進程的影響,因為它降低了記憶體使用率的增加速度。
Erlang 是一個多執行緒 VM,它利用多個核心。它向開發人員呈現綠色執行緒,這些綠色執行緒被稱為「進程」,因為與執行緒不同,它們在概念上不共享位址空間。這是關於 Erlang VM 和進程 的一個有趣的自白。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>