聯(lián)系我們 - 廣告服務(wù) - 聯(lián)系電話:
您的當(dāng)前位置: > 關(guān)注 > > 正文

焦點(diǎn)!配置完Kafka集群后 通過(guò)JavaAPI方式來(lái)操作

來(lái)源:CSDN 時(shí)間:2023-01-28 14:00:44


(資料圖)

配置完Kafka集群后,下面通過(guò)Java API的方式來(lái)操作 需要導(dǎo)入的Jar包

kafka_2.10-0.8.1.1.jar    log4j-1.2.15.jar    metrics-core-2.2.0.jar    scala-library-2.10.1.jar    slf4j-api-1.7.2.jar

以上jar包均可從Kafka的發(fā)布包中找的到,在lib目錄下面

生產(chǎn)者(Producers)

代碼:

import java.util.*; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class TestProducer {    public static void main(String[] args) {        long events = Long.parseLong(args[0]);        Random rnd = new Random();                //在以下屬性中定義了Producer如何找到集群,序列化消息等        Properties props = new Properties();        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");        props.put("serializer.class", "kafka.serializer.StringEncoder");        props.put("partitioner.class", "example.producer.SimplePartitioner");        props.put("request.required.acks", "1");        ProducerConfig config = new ProducerConfig(props);                //定義生產(chǎn)者對(duì)象,該類指定了兩個(gè)參數(shù)的泛型,第一個(gè)參數(shù)表示分區(qū)鍵值的類型,第二參數(shù)表示消息類型        Producerproducer = new Producer(config);        for (long nEvents = 0; nEvents < events; nEvents++) {                long runtime = new Date().getTime();                 String ip = “192.168.2.” + rnd.nextInt(255);                String msg = runtime + “,www.example.com,” + ip;                //發(fā)送消息到消息中介,test指定要接受消息的主題。               KeyedMessagedata = new KeyedMessage("test", ip, msg);               //執(zhí)行發(fā)送               producer.send(data);        }        producer.close();    }}

Producer配置參數(shù):

metadata.broker.list:定義一個(gè)或者多個(gè)消息中介(broker),Produder通過(guò)broker決定主題leader的位置。這里無(wú)需配置所有的broker,但建議配置多于一個(gè)。  serializer.class:定義準(zhǔn)備傳遞數(shù)據(jù)給broker時(shí)使用哪個(gè)序列化器。  partitioner.class:這個(gè)是可選項(xiàng),該類將決定消息將發(fā)送到哪個(gè)主題分區(qū)上。  request.required.acks:該值設(shè)置為1后,broker收到消息后將發(fā)送一個(gè)確認(rèn)信息給producer。

在上述程序運(yùn)行之前請(qǐng)確保Kafka已經(jīng)存在名稱為test的主題,如果沒(méi)有可以使用下面命令創(chuàng)建      bin/kafka-create-topic.sh --topic test --replica 3--zookeeper localhost:2181--partition 5然后使用下面命令查看:      bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic test --from-beginning【參考】: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

責(zé)任編輯:

標(biāo)簽:

相關(guān)推薦:

精彩放送:

新聞聚焦
Top