阅读 96

kafka生产者 API Demo

添加POM文件

<dependency>
    <groupId>org.apache.kafkagroupId>
    <artifactId>kafka-clientsartifactId>
     <version>0.11.0.0version>
dependency>

API生产者(注意:topic 要先创建: 本案例topic 为 two)

package com.kpwong.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {

        //Create kafka 生产者配置信息
        Properties properties = new Properties();
        //kafka 集群, broker list
        properties.put("bootstrap.servers", "hadoop202:9092");
        properties.put("acks", "all");
        //重试次数
        properties.put("retries", 1);
        //批次大小
        properties.put("batch.size", 16384);
        //等待时间
        properties.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        // key value 的序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //创建生产者对象
        KafkaProducer producer = new KafkaProducer(properties);

        //发送数据
        for(int i = 11 ;i <= 20;i++)
        {
            producer.send(new ProducerRecord("two","kpwong--"+i));
        }

        //关闭连接
        producer.close();

    }
}

消费者接受消息.

bin/kafka-console-consumer.sh  --bootstrap-server hadoop202:9092 --topic two

 备注:ProducerRecord:K,V值 的消息:

 

原文:https://www.cnblogs.com/kpwong/p/14052410.html

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐