取得領先
VMware 提供培訓和認證,以加速您的進度。
瞭解更多本文是部落格系列的一部分,該系列探討基於 Java Function 的新設計的 Spring Cloud Stream 應用程式。 在本文中,我們將研究 Elasticsearch Sink,它允許我們在 Elasticsearch 中索引記錄,以及其對應的 Consumer 函數。
以下是本部落格系列的所有先前部分。
在我們研究 Elasticsearch sink 應用程式之前,讓我們先看看為 sink 提供支援的 consumer 函數。正如我們之前在其他 sink 應用程式中所見,consumer 是一個標準的 java.util.function.Consumer
,它接受一個 Message<?>
。該 consumer 依賴 Spring Boot 對 Elasticsearch 的支援,它會自動配置來自 Elasticsearch 的 RestHighLevelClient
。該 consumer 支援具有以下 payload 類型的訊息。
字串 (String)
java.util.Map
來自 Elasticsearch 的 XContentBuilder。
使用 consumer 時,要使用的 Elasticsearch 索引由屬性 elasticsearch.consumer.index
給定。
您可以透過設定 INDEX_ID
訊息標頭,設定每個訊息要使用的 Elasticsearch ID。 或者,您可以設定 elasticsearch.consumer.id
屬性,該屬性接受 SpEL 表達式。 如果未設定這些屬性,Elasticsearch 將自動產生 ID。
透過將屬性 elasticsearch.consumer.async
設定為 true
,我們可以使索引操作成為非同步的。
我們可以將 consumer 函數注入到應用程式中,並直接呼叫其 accept
方法,以將記錄索引到 ElasticSearch。
例如,假設我們如下將 consumer bean 注入到應用程式中。
@Autowired
ElasticsearchConsumer elasticsearchConsumer
然後,我們可以使用以下 java.util.Map
來索引記錄。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();
elasticsearchConsumer.accept(message);
以上地圖上的相同資訊可以以純 JSON
格式提供,也可以使用來自 Elasticsearch 的 XContentBuilder 來提供。
正如我們在先前的部落格中所見,當與 Spring Cloud Stream 結合以使其成為 sink 應用程式時,consumer 函數會變得更加強大。 它具有與中間件技術以無縫方式通信的固有能力。 sink 應用程式從諸如 Apache Kafka 或 RabbitMQ 之類的中間件系統消耗資料,並將其傳送到 Elasticsearch。 我們已經為 Kafka 和 RabbitMQ 提供了現成的 Elasticsearch 變體。
讓我們逐步執行執行 Apache Kafka 的獨立 Elasticsearch sink 應用程式的步驟。
首先,繼續下載 sink 應用程式。 由於 sink 尚未普遍可用,因此讓我們使用最新的 milestone 版本。
wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar
在執行應用程式之前,請確保您已執行 Easticsearch。 這是在 docker 容器中啟動單節點 Elasticsearch 叢集的快速方法。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
我們還需要確保 Kafka 正在執行。
然後如下執行應用程式
java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
--spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing
透過提供輸入目的地屬性,我們要求 sink 從 Kafka 主題 data-in
接收資料,並將資料傳送到 Elasticsearch 索引 testing
。
將一些測試資料傳送到 Kafka 主題。 例如,如果您在連接埠 9092
上在本機執行 Kafka,則可以使用 Kafka 主控台 producer 指令碼來傳送它,如下所示。
kafka-console-producer.sh --broker-list localhost:9092 --topic data-in
然後傳送以下 JSON
資料。
{"key1":"value1"}
我們可以透過呼叫以下端點來驗證資料是否已索引。
curl localhost:9200/testing/_search
同樣,我們也可以下載 Elasticsearch sink 應用程式的 RabbitMQ 變體,並針對 RabbitMQ 叢集執行它。
正如我們在本系列中先前多次看到的那樣,當這些 Spring Cloud Stream 應用程式作為 Spring Cloud Data Flow 上的資料管線的一部分執行時,它們會變得更加強大且具有彈性。
我們上面看到的 Elasticsearch 可以與許多其他應用程式結合使用。 例如,TCP source 應用程式可能會從來源接收資料,然後將資料轉儲到中間件目的地,Elasticsearch sink 從該目的地消耗並索引資料。 然後,分析應用程式可以使用此索引來產生儀表板。 這只是一個範例,並且有幾個這樣的用例。 Spring Cloud Data Flow 使使用者可以無縫地協調這些管線。 我們鼓勵您查看我們在先前部落格中概述的有關如何在 Spring Cloud Data Flow 上部署應用程式的步驟。 使用相同的步驟,也可以部署 Elasticsearch sink 應用程式。
在本部落格中,我們看到了 Elasticsearch consumer 函數及其對應的 Spring Cloud Stream sink 的工作方式。 可以將 consumer 函數注入到自訂應用程式中,以與其他業務邏輯結合。 sink 應用程式是現成的,可用於 Kafka 和 RabbitMQ 中間件變體。
在本部落格系列中,我們還有更多集即將推出。 請繼續關注。