聯系我們 - 廣告服務 - 聯系電話:
您的當前位置: > 關注 > > 正文

焦點!配置完Kafka集群后 通過JavaAPI方式來操作

來源:CSDN 時間:2023-01-28 14:00:44


(資料圖)

配置完Kafka集群后,下面通過Java API的方式來操作 需要導入的Jar包

kafka_2.10-0.8.1.1.jar    log4j-1.2.15.jar    metrics-core-2.2.0.jar    scala-library-2.10.1.jar    slf4j-api-1.7.2.jar

以上jar包均可從Kafka的發布包中找的到,在lib目錄下面

生產者(Producers)

代碼:

import java.util.*; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class TestProducer {    public static void main(String[] args) {        long events = Long.parseLong(args[0]);        Random rnd = new Random();                //在以下屬性中定義了Producer如何找到集群,序列化消息等        Properties props = new Properties();        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");        props.put("serializer.class", "kafka.serializer.StringEncoder");        props.put("partitioner.class", "example.producer.SimplePartitioner");        props.put("request.required.acks", "1");        ProducerConfig config = new ProducerConfig(props);                //定義生產者對象,該類指定了兩個參數的泛型,第一個參數表示分區鍵值的類型,第二參數表示消息類型        Producerproducer = new Producer(config);        for (long nEvents = 0; nEvents < events; nEvents++) {                long runtime = new Date().getTime();                 String ip = “192.168.2.” + rnd.nextInt(255);                String msg = runtime + “,www.example.com,” + ip;                //發送消息到消息中介,test指定要接受消息的主題。               KeyedMessagedata = new KeyedMessage("test", ip, msg);               //執行發送               producer.send(data);        }        producer.close();    }}

Producer配置參數:

metadata.broker.list:定義一個或者多個消息中介(broker),Produder通過broker決定主題leader的位置。這里無需配置所有的broker,但建議配置多于一個。  serializer.class:定義準備傳遞數據給broker時使用哪個序列化器。  partitioner.class:這個是可選項,該類將決定消息將發送到哪個主題分區上。  request.required.acks:該值設置為1后,broker收到消息后將發送一個確認信息給producer。

在上述程序運行之前請確保Kafka已經存在名稱為test的主題,如果沒有可以使用下面命令創建      bin/kafka-create-topic.sh --topic test --replica 3--zookeeper localhost:2181--partition 5然后使用下面命令查看:      bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic test --from-beginning【參考】: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

責任編輯:

標簽:

相關推薦:

精彩放送:

新聞聚焦
Top 岛国精品在线