取得領先
VMware 提供培訓和認證,以加速您的進度。
了解更多應用程式產生的資料比以往更多,在分析之前,首先要解決的巨大挑戰是處理負載。Apache Kafka 正是為了解決這個挑戰而生。它最初由 LinkedIn 設計,隨後在 2011 年開源。該專案旨在提供一個統一的、高吞吐量、低延遲的平台,用於處理即時資料饋送。其設計深受交易日誌的影響。它是一個訊息傳遞系統,類似於傳統的訊息傳遞系統,如 RabbitMQ、ActiveMQ、MQSeries,但它非常適合日誌聚合、持久訊息傳遞、快速(每秒 *數百* 兆位元組!)讀取和寫入,並且可以容納大量客戶端。 自然地,這使其非常適合雲端規模的架構!
Kafka 為許多大型生產系統提供動力。 LinkedIn 使用它進行活動資料和操作指標,為 LinkedIn 新聞饋送和 LinkedIn Today 以及進入 Hadoop 的離線分析提供支援。 Twitter 使用它作為其流處理基礎架構的一部分。 Kafka 為 Foursquare 提供線上到線上和線上到離線訊息傳遞功能。 它用於將 Foursquare 監控和生產系統與基於 Hadoop 的離線基礎架構整合。 Square 使用 Kafka 作為匯流排,將所有系統事件傳輸到 Square 的各個資料中心。 這包括指標、日誌、自訂事件等。 在消費者端,它輸出到 Splunk、Graphite 或類似 Esper 的即時警報。 Netflix 每天使用它傳輸 300-600 億條訊息。 Airbnb、Mozilla、Goldman Sachs、Tumblr、Yahoo、PayPal、Coursera、Urban Airship、Hotels.com 和看似無窮無盡的其他大型網路明星也使用它。 顯然,它在某些強大的系統中發揮了作用!
有多種不同的方式可以安裝 Apache Kafka。 如果您使用的是 OSX,並且您使用的是 Homebrew,它可以像 brew install kafka
一樣簡單。 您也可以從 Apache 下載最新的發布版本。 我下載了 kafka_2.10-0.8.2.1.tgz
,解壓縮它,然後在其中您會發現還有 Apache Zookeeper 和 Kafka 的發布版本,因此不需要其他任何東西。 我將 Apache Kafka 安裝在我的 $HOME
目錄下,在另一個目錄 bin
下,然後我建立了一個環境變數 KAFKA_HOME
,它指向 $HOME/bin/kafka
。
首先啟動 Apache Zookeeper,指定它需要的配置屬性檔案的位置
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
Apache Kafka 發布版本附帶 Zookeeper 和 Kafka 的預設配置檔案,這使得入門變得容易。 在更進階的用例中,您需要自訂這些檔案。
然後啟動 Apache Kafka。 它也需要一個配置檔案,如下所示
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
server.properties
檔案包含連接到 Apache Zookeeper 的預設值(zookeeper.connect
)、應該透過 sockets 傳輸多少資料、預設有多少分割區以及 broker ID(broker.id
- 在叢集中必須是唯一的)。
在同一目錄中還有其他指令碼可用於傳送和接收虛擬資料,這對於確定一切是否已啟動並執行非常有用!
現在 Apache Kafka 已經啟動並執行,讓我們看看如何從我們的應用程式中使用 Apache Kafka。
Kafka broker 叢集由一個或多個伺服器組成,每個伺服器可以執行一個或多個 broker 處理程序。 Apache Kafka 被設計為具有高可用性; 沒有主節點。 所有節點都是可互換的。 資料從一個節點複製到另一個節點,以確保在發生故障時仍然可用。
在 Kafka 中,topic 是一種類別,類似於 JMS 目的地或 AMQP exchange 和佇列。 Topic 被分割,訊息應該傳送到哪個 Topic 的哪個分割區的選擇由訊息產生者決定。 分割區中的每個訊息都會被分配一個唯一的序列 ID,即它的 offset。 更多的分割區允許更大的並行消耗,但這也會導致 broker 上出現更多的檔案。
產生者將訊息傳送到 Apache Kafka broker topic,並指定每個訊息要使用的分割區。 訊息產生可以是同步的或非同步的。 產生者還指定他們想要什麼樣的複製保證。
消費者監聽 topic 上的訊息並處理已發布訊息的饋送。 如果您使用過其他訊息傳遞系統,那麼正如您所期望的那樣,這通常(並且有用!)是非同步的。
與 Spring XD 和許多其他分散式系統一樣,Apache Kafka 使用 Apache Zookeeper 來協調叢集資訊。 Apache Zookeeper 提供了一個共享的階層式命名空間(稱為 znodes),節點可以共享這些命名空間以了解叢集拓撲和可用性(這也是 Spring Cloud 即將支援它的另一個原因)。
Zookeeper 在您與 Apache Kafka 的互動中非常重要。 例如,Apache Kafka 有兩個不同的 API 可用作消費者。 較高階的 API 更容易上手,它可以處理處理分割區等的所有細微差別。 它將需要引用 Zookeeper 執行個體來保持協調狀態。
現在讓我們轉向使用 Spring 的 Apache Kafka。
最近發布的 適用於 Apache Kafka 1.1 的 Spring Integration 非常強大,並提供了用於處理較低階 Apache Kafka API 和較高階 API 的入站適配器。
目前,該適配器首先是 XML 配置,儘管已經在開發適用於適配器的 Spring Integration Java 配置 DSL,並且里程碑版本已經可用。 我們現在將在這裡查看這兩者。
為了使所有這些範例都能正常工作,我新增了 libs-milestone-local Maven 儲存庫 並使用了以下依賴項
首先,讓我們看看如何使用 Spring Integration 出站適配器將來自 Spring Integration 流的 Message<T>
執行個體傳送到外部 Apache Kafka 執行個體。 該範例非常簡單:名為 inputToKafka
的 Spring Integration channel
作為一個管道,將 Message<T>
訊息轉發到出站適配器 kafkaOutboundChannelAdapter
。 適配器本身可以從 kafka:producer-context
元素中指定的預設值獲取其配置,也可以從適配器本地配置覆寫中獲取配置。 給定的 kafka:producer-context
元素中可能有一個或多個配置。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="event-stream"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
</beans>
以下是來自 Spring Boot 應用程式的 Java 程式碼,透過將訊息傳送到傳入的 inputToKafka
MessageChannel
來觸發使用出站適配器的訊息傳送。
package xml;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
@SpringBootApplication
@EnableIntegration
@ImportResource("/xml/outbound-kafka-integration.xml")
public class DemoApplication {
private Log log = LogFactory.getLog(getClass());
@Bean
@DependsOn("kafkaOutboundChannelAdapter")
CommandLineRunner kickOff(@Qualifier("inputToKafka") MessageChannel in) {
return args -> {
for (int i = 0; i < 1000; i++) {
in.send(new GenericMessage<>("#" + i));
log.info("sending message #" + i);
}
};
}
public static void main(String args[]) {
SpringApplication.run(DemoApplication.class, args);
}
}
在 Spring Integration 1.1 發布後不久,Spring Integration rockstar Artem Bilan 開始努力新增 Spring Integration Java 配置 DSL 類似物,結果非常漂亮! 它還不是 GA(您現在需要新增 libs-milestone
儲存庫),但我鼓勵您嘗試一下並試用一下。 它對我來說效果很好,Spring Integration 團隊總是渴望盡可能獲得早期回饋! 以下是一個範例,示範了如何從兩個不同的 IntegrationFlow
傳送和使用訊息。 產生者類似於上面的 XML 範例。
此範例中的新增功能是輪詢消費者。 它是以批次為中心的,並且會以固定的間隔拉取它看到的所有訊息。 在我們的程式碼中,收到的訊息將是一個 map,它的 key 是 topic,value 是另一個 map,其中包含分割區 ID 和批次(在本例中為 10 條記錄)的記錄。 還有一個基於 MessageListenerContainer
的替代方案,它會在訊息到達時處理它們。
package jc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.integration.dsl.kafka.KafkaHighLevelConsumerMessageSourceSpec;
import org.springframework.integration.dsl.kafka.KafkaProducerMessageHandlerSpec;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* Demonstrates using the Spring Integration Apache Kafka Java Configuration DSL.
* Thanks to Spring Integration ninja <a href="https://spring.dev.org.tw/team/artembilan">Artem Bilan</a>
* for getting the Java Configuration DSL working so quickly!
*
* @author Josh Long
*/
@EnableIntegration
@SpringBootApplication
public class DemoApplication {
public static final String TEST_TOPIC_ID = "event-stream";
@Component
public static class KafkaConfig {
@Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
private String topic;
@Value("${kafka.address:localhost:9092}")
private String brokerAddress;
@Value("${zookeeper.address:localhost:2181}")
private String zookeeperAddress;
KafkaConfig() {
}
public KafkaConfig(String t, String b, String zk) {
this.topic = t;
this.brokerAddress = b;
this.zookeeperAddress = zk;
}
public String getTopic() {
return topic;
}
public String getBrokerAddress() {
return brokerAddress;
}
public String getZookeeperAddress() {
return zookeeperAddress;
}
}
@Configuration
public static class ProducerConfiguration {
@Autowired
private KafkaConfig kafkaConfig;
private static final String OUTBOUND_ID = "outbound";
private Log log = LogFactory.getLog(getClass());
@Bean
@DependsOn(OUTBOUND_ID)
CommandLineRunner kickOff(
@Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
return args -> {
for (int i = 0; i < 1000; i++) {
in.send(new GenericMessage<>("#" + i));
log.info("sending message #" + i);
}
};
}
@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
log.info("starting producer flow..");
return flowDefinition -> {
Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> spec =
(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata)->
metadata.async(true)
.batchNumMessages(10)
.valueClassType(String.class)
.<String>valueEncoder(String::getBytes);
KafkaProducerMessageHandlerSpec messageHandlerSpec =
Kafka.outboundChannelAdapter(
props -> props.put("queue.buffering.max.ms", "15000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(this.kafkaConfig.getTopic(),
this.kafkaConfig.getBrokerAddress(), spec);
flowDefinition
.handle(messageHandlerSpec);
};
}
}
@Configuration
public static class ConsumerConfiguration {
@Autowired
private KafkaConfig kafkaConfig;
private Log log = LogFactory.getLog(getClass());
@Bean
IntegrationFlow consumer() {
log.info("starting consumer..");
KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
.consumerProperties(props ->
props.put("auto.offset.reset", "smallest")
.put("auto.commit.interval.ms", "100"))
.addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
.topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
.maxMessages(10)
.valueDecoder(String::new));
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));
return IntegrationFlows
.from(messageSourceSpec, endpointConfigurer)
.<Map<String, List<String>>>handle((payload, headers) -> {
payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
return null;
})
.get();
}
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
該範例大量使用了 Java 8 lambdas。
生產者會花一些時間來確定在單次傳送作業中將傳送多少訊息、金鑰和值的編碼方式(畢竟,Kafka 只知道 byte[]
陣列),以及訊息應該同步或非同步傳送。在下一行,我們設定輸出配接器本身,然後定義一個 IntegrationFlow
,以便所有訊息都透過 Kafka 輸出配接器傳送出去。
消費者會花一些時間來確定要連接到哪個 Zookeeper 實例、在一個批次中要接收多少訊息 (10) 等等。一旦接收到訊息批次,它們就會被交給 handle
方法,我在這裡傳入一個 lambda,它將枚舉 payload 的主體並將其列印出來。沒什麼特別的。
Apache Kafka 是一個訊息匯流排,當它被用作整合匯流排時,它會非常強大。 然而,它真正發揮作用是因為它夠快且夠可擴展,因此可用於通過處理管道路由大數據。 如果您正在進行資料處理,您真的會想要Spring XD! Spring XD 使使用 Apache Kafka 變得非常簡單(因為該支持建立在 Apache Kafka Spring Integration 配接器之上!),可以應用在複雜的流處理管道中。 Apache Kafka 作為 Spring XD 的來源(資料來自的地方)和接收器(資料去的地方)公開。
Spring XD 公開了一個超級方便的 DSL,用於建立類似 bash
的管道和過濾器流程。 Spring XD 是一個集中式運行時,用於管理、擴展和監控資料處理作業。 它建立在 Spring Integration、Spring Batch、Spring Data 和 Spring for Hadoop 之上,成為一站式資料處理商店。 Spring XD Jobs 從來源讀取資料,透過可能計數、過濾、豐富或轉換資料的處理組件運行它們,然後將它們寫入接收器。
Spring Integration 和 Spring XD ninja Marius Bogoevici,他在 Spring Integration 和 Spring XD 的 Apache Kafka 實作中完成了大量近期工作,他整理了一個非常棒的範例,展示了如何使完整的 Spring XD 和 Kafka 流程工作。 README
會引導您完成 Apache Kafka、Spring XD 和必要的 topic 的所有設定。 然而,本質是當您使用 Spring XD shell 和 shell DSL 來組合一個流時。 Spring XD 組件是被命名的組件,它們是預先配置的,但有很多參數可以使用 XD shell 和 DSL 透過 --..
參數來覆蓋。(順便說一句,這個 DSL 是由 Spring Expression language 的著名人物 Andy Clement 編寫的!) 這是一個範例,它配置一個流來從 Apache Kafka 來源讀取資料,然後將訊息寫入名為 log
的組件,它是一個接收器。 在這種情況下,log
可以是 syslogd、Splunk、HDFS 等。
xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=event-stream | log" --deploy
就是這樣! 當然,這只是 Spring XD 的一個嘗試,但希望您會同意這些可能性令人著迷。
使用 Lattice 輕鬆設定一個 Kafka 安裝範例,Lattice 是一個分散式運行時,它支援非常流行的 Docker 映像格式等容器格式。Spotify 提供了一個 Docker 映像,它設定了一個並置的 Zookeeper 和 Kafka 映像。 您可以輕鬆地將其部署到 Lattice 叢集,如下所示
ltc create --run-as-root m-kafka spotify/kafka
從那裡,您可以輕鬆地擴展 Apache Kafka 實例,並且更輕鬆地從基於雲端的服務使用 Apache Kafka。
您可以在我的 GitHub 帳戶上找到 此部落格的程式碼。
我們只是觸及了表面!
如果您想了解更多(為什麼不想呢?),請務必查看 Marius Bogoevici 和 Mark Pollack 博士即將舉辦的 使用 Spring XD 和 Apache Kafka 的反應式資料管道網路研討會,他們將展示使用 RxJava、Spring XD 和 Apache Kafka 有多麼容易!