阅读 112

Kafka

消息队列有点对点方式和发布订阅方式

消息队列作用有解耦、削峰、异步

Kafka核心概念

Broker:Kafka服务节点

Topic:对消息进行分类

Partition:对主题进行分区

Replication:Leader和Flower,对分区备份

Group:每个消费者组只能消费一条消息

Docker安装Kafka

安装zookeeper

docker pull wurstmeister/zookeeper docker run -d --name myZK -p 2181:2181 wurstmeister/zookeeper:latest 复制代码

安装kafka

docker pull wurstmeister/kafka docker run -d \ --name myKafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=182.42.113.36:2181 \ -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://182.42.113.36:9092 \ #把kafka的地址端口注册给zookeeper -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ #配置kafka的监听端口 wurstmeister/kafka 复制代码

Windows安装Kafka

配置config/server.properties

#节点id broker.id=0 #存储数据的地方 log.dir=/usr/local/kafka/data #缓存数据的时间 log.retention.hours=168 #zk连接的主机和端口 zookeeper.connect=localhost:2181 #zk连接超时时间 zookeeper.connection.timeout.ms=6000 复制代码

启动kafka

#后台运行,指定配置文件server.properties ./bin/kafka-server-start.sh -daemon ./config/server.properties 复制代码

查看启动日志

tail -f -n 1000 ./logs/kafkaServer.out  复制代码

Kafka基本使用

查看zk中的kafka

#存储的节点信息 ls /brokers/ids #存储leader信息 ls /controller #记录leader迭代信息 ls /controller_epoch 复制代码

主题

#创建topic,分区和副本 ./kafka-topics.sh --zookeeper 182.42.113.36:2181 --create --topic topicName --partitions 1 --replication-factor 1  #查看topic ./kafka-topics.sh --zookeeper 182.42.113.36:2181 --list 复制代码

生产者

#生产者发送消息 ./kafka-console-producer.sh --broker-list 182.42.113.36:9092 --topic myNewTopic 复制代码

消费者

#从最后一条消息开始接收消息 ./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --topic myNewTopic #从起始位置接收消息 ./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --from-beginning --topic myNewTopic 复制代码

消费者组

#同一组中不能有多个消费者接受同一消息 ./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --consumer-property group.id=myGroup --topic myNewTopic #CURRENT-OFFSET:当前偏移   LOG-END-OFFSET:消息总量    LAG:剩余消息数量  ./kafka-consumer-groups.sh --bootstrap-server 182.42.113.36:9092 --describe --group myGroup 复制代码

日志

  • docker中的kafka主题的分区信息存放在/kafka/kafka-logs-7d0690199775/中

  • __consumer_offset-n存放消费者消费主题的偏移量,通过hash函数提交到对应的分区

Kafka集群

创建多个Kafka服务

docker run -d \ --name myKafka \ -p 9093:9093 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=182.42.113.36:2181 \ -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://182.42.113.36:9093 \ -e KAFKA_LISTENERS=PLAINTEXT://182.42.113.36:9093 \  wurstmeister/kafka 复制代码

查看zk

zkCli.cmd -server 182.42.113.36:2181 ls /brokers/ids 复制代码

查看主题

#Leader:当前节点的leader分区    Replicas:向别的节本的副本上同步    Isr:副本的存活状态 ./kafka-topics.sh --zookeeper 182.42.113.36:2181 --describe --topic topicName  复制代码

发送消息

./kafka-console-producer.sh --broker-list 182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094 --topic topicName 复制代码

消费消息

./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094 --from-beginning --topic topicName 复制代码

Java客户端

导包

  • kafka-client

  • slf4j-api

生产者

Properties prop = new Properties(); prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094"); prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(prop); ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topicName", 0, "myKey", "myValue"); //同步 RecordMetadata message = producer.send(producerRecord).get(); //异步 RecordMetadata message = producer.send(producerRecord, (recordMetadata, e) -> {     if(e != null) {         System.out.println("消息发送失败");     } else {         System.out.println("消息发送成功");     } }).get(); System.out.println("==消息发送完毕=="); System.out.println(message); 复制代码

同步发送

  • ack=0时,发送到kafka服务后立即返回结果

  • ack=1时,发送到kafka中leader后返回结果

  • ack=-1时,发送到kafka中min.insync.replicas个broker后返回结果

//设置ack prop.put(ProducerConfig.ACKS_CONFIG, "-1"); //设置重试次数和重试间隔 prop.put(ProducerConfig.RETRIES_CONFIG, 3); prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); 复制代码

消息缓冲区(生产者中有缓冲区,和发送数据的线程)

//设置发送缓冲区大小 prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 65535); //设置一次发送的大小 prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 65535); //设置发送最大事件间隔 prop.put(ProducerConfig.LINGER_MS_CONFIG, 10); 复制代码

消费者

Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumer"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Arrays.asList("topicName")); while(true) {     ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));     for(ConsumerRecord<String, String> record : records) {         System.out.println("【接收到消息】" + record.value());     } } 复制代码

手动提交和自动提交

  • 自动提交是消费者将消息poll下来提交offset

  • 手动提交是消费者将消息消费过后再提交offset

//设置自动提交和提交间隔时间 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //设置手动同步提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumer.commitAsync(); //设置手动异步提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumer.commitAsync((offsets, e) -> {}); 复制代码

长轮询poll

//设置一次poll的最大拉取条数 prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); //在拉取时设置最大时长为1秒 ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); //设置两次拉取的最大间隔时间,如果超过,就将消费者踢出消费者组 prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 30); 复制代码

心跳检查

//设置消费者每1秒向kafka发送一次心跳 prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); //设置kafka每10秒检查一次消费者心跳,如果没有就将消费者踢出消费者组 prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); 复制代码

指定分区和偏移量消费

//指定0号分区消费 consumer.assign(Arrays.asList(new TopicPartition("topicName", 1))); //offset从0开始消费 consumer.seekToBeginning(Arrays.asList(new TopicPartition("topicName", 1))); //指定消费偏移量开始消费 consumer.seek(new TopicPartition("topicName", 1), 3); //从该消费者组的偏移量开始消费 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 复制代码

整合SpringBoot

导入依赖

  • org.springframework.kafka

配置文件

spring:   kafka:     producer:       retries: 3 #发送失败重试次数       batch-size: 65535 #设置一次发送大小       buffer-memory: 65534 #设置缓冲区大小       acks: 1 #设置应答机制       key-serializer: ...StringSerializer       value-serializer: ...StringSerializer     consumer:       group-id: default-group #消费者组       enable-auto-commit: false #设置手动提交       auto-offset-reset: earliest #设置消费者组最早偏移       key-deserializer: ...StringDeserializer       value-deserializer: ...StringDeserializer       max-poll-record: 500 #一次最大拉取数     listener:       ack-mode: MANUAL #一批poll处理后再提交offset       ack-mode: MANUAL_IMMEDIATE #每一条消息单独提交offset 复制代码

生产者

@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void send() {     kafkaTemplate.send("topicName", 0, "myKey", "myValue"); } 复制代码

消费者

@KafkaListener(groupId="myGroup", topicPartitions={     @TopicPartition(topic="t1", partition="0"),     @TopicPartition(topic="t2", partition="1"), }) public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {     String value = record.value();     //手动提交offset     ack.acknowledge(); } 复制代码

Kafka机制

Rebalance(分区数或消费者组发生变化时)

  • 公式计算得到每个消费者消费几个分区

  • 轮询

  • sticky,在不打乱原来分配的基础上进行分配

HW和LWO

  • HighWaterMark,当分片中的消息同步到所有副本中才可被消费者访问

  • LogEndOffset,消息最后的偏移量,即当LogEndOffset都达到的消息才可被消费

防止消息丢失

  • 设置ack为1或-1可以防止消息丢失

  • min.insync.replicas设置为副本数

防止消费者重复消费

  • 由于网络抖动,生产者未收到ack,导致生产者重复发送

  • 解决幂等性从实际业务出发

保证消息顺序消费

  • 指定key,将消息顺序发送到同一分区中

解决消息挤压问题

  • 增加消费者或提升消费者数量


作者:锄禾___
链接:https://juejin.cn/post/7015741031773634574


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐