Kafka
消息队列有点对点方式和发布订阅方式
消息队列作用有解耦、削峰、异步
Kafka核心概念
Broker:Kafka服务节点
Topic:对消息进行分类
Partition:对主题进行分区
Replication:Leader和Flower,对分区备份
Group:每个消费者组只能消费一条消息
Docker安装Kafka
安装zookeeper
docker pull wurstmeister/zookeeper docker run -d --name myZK -p 2181:2181 wurstmeister/zookeeper:latest 复制代码
安装kafka
docker pull wurstmeister/kafka docker run -d \ --name myKafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=182.42.113.36:2181 \ -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://182.42.113.36:9092 \ #把kafka的地址端口注册给zookeeper -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ #配置kafka的监听端口 wurstmeister/kafka 复制代码
Windows安装Kafka
配置config/server.properties
#节点id broker.id=0 #存储数据的地方 log.dir=/usr/local/kafka/data #缓存数据的时间 log.retention.hours=168 #zk连接的主机和端口 zookeeper.connect=localhost:2181 #zk连接超时时间 zookeeper.connection.timeout.ms=6000 复制代码
启动kafka
#后台运行,指定配置文件server.properties ./bin/kafka-server-start.sh -daemon ./config/server.properties 复制代码
查看启动日志
tail -f -n 1000 ./logs/kafkaServer.out 复制代码
Kafka基本使用
查看zk中的kafka
#存储的节点信息 ls /brokers/ids #存储leader信息 ls /controller #记录leader迭代信息 ls /controller_epoch 复制代码
主题
#创建topic,分区和副本 ./kafka-topics.sh --zookeeper 182.42.113.36:2181 --create --topic topicName --partitions 1 --replication-factor 1 #查看topic ./kafka-topics.sh --zookeeper 182.42.113.36:2181 --list 复制代码
生产者
#生产者发送消息 ./kafka-console-producer.sh --broker-list 182.42.113.36:9092 --topic myNewTopic 复制代码
消费者
#从最后一条消息开始接收消息 ./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --topic myNewTopic #从起始位置接收消息 ./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --from-beginning --topic myNewTopic 复制代码
消费者组
#同一组中不能有多个消费者接受同一消息 ./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --consumer-property group.id=myGroup --topic myNewTopic #CURRENT-OFFSET:当前偏移 LOG-END-OFFSET:消息总量 LAG:剩余消息数量 ./kafka-consumer-groups.sh --bootstrap-server 182.42.113.36:9092 --describe --group myGroup 复制代码
日志
docker中的kafka主题的分区信息存放在/kafka/kafka-logs-7d0690199775/中
__consumer_offset-n存放消费者消费主题的偏移量,通过hash函数提交到对应的分区
Kafka集群
创建多个Kafka服务
docker run -d \ --name myKafka \ -p 9093:9093 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=182.42.113.36:2181 \ -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://182.42.113.36:9093 \ -e KAFKA_LISTENERS=PLAINTEXT://182.42.113.36:9093 \ wurstmeister/kafka 复制代码
查看zk
zkCli.cmd -server 182.42.113.36:2181 ls /brokers/ids 复制代码
查看主题
#Leader:当前节点的leader分区 Replicas:向别的节本的副本上同步 Isr:副本的存活状态 ./kafka-topics.sh --zookeeper 182.42.113.36:2181 --describe --topic topicName 复制代码
发送消息
./kafka-console-producer.sh --broker-list 182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094 --topic topicName 复制代码
消费消息
./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094 --from-beginning --topic topicName 复制代码
Java客户端
导包
kafka-client
slf4j-api
生产者
Properties prop = new Properties(); prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094"); prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(prop); ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topicName", 0, "myKey", "myValue"); //同步 RecordMetadata message = producer.send(producerRecord).get(); //异步 RecordMetadata message = producer.send(producerRecord, (recordMetadata, e) -> { if(e != null) { System.out.println("消息发送失败"); } else { System.out.println("消息发送成功"); } }).get(); System.out.println("==消息发送完毕=="); System.out.println(message); 复制代码
同步发送
ack=0时,发送到kafka服务后立即返回结果
ack=1时,发送到kafka中leader后返回结果
ack=-1时,发送到kafka中min.insync.replicas个broker后返回结果
//设置ack prop.put(ProducerConfig.ACKS_CONFIG, "-1"); //设置重试次数和重试间隔 prop.put(ProducerConfig.RETRIES_CONFIG, 3); prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); 复制代码
消息缓冲区(生产者中有缓冲区,和发送数据的线程)
//设置发送缓冲区大小 prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 65535); //设置一次发送的大小 prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 65535); //设置发送最大事件间隔 prop.put(ProducerConfig.LINGER_MS_CONFIG, 10); 复制代码
消费者
Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumer"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Arrays.asList("topicName")); while(true) { ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String, String> record : records) { System.out.println("【接收到消息】" + record.value()); } } 复制代码
手动提交和自动提交
自动提交是消费者将消息poll下来提交offset
手动提交是消费者将消息消费过后再提交offset
//设置自动提交和提交间隔时间 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //设置手动同步提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumer.commitAsync(); //设置手动异步提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumer.commitAsync((offsets, e) -> {}); 复制代码
长轮询poll
//设置一次poll的最大拉取条数 prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); //在拉取时设置最大时长为1秒 ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); //设置两次拉取的最大间隔时间,如果超过,就将消费者踢出消费者组 prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 30); 复制代码
心跳检查
//设置消费者每1秒向kafka发送一次心跳 prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); //设置kafka每10秒检查一次消费者心跳,如果没有就将消费者踢出消费者组 prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); 复制代码
指定分区和偏移量消费
//指定0号分区消费 consumer.assign(Arrays.asList(new TopicPartition("topicName", 1))); //offset从0开始消费 consumer.seekToBeginning(Arrays.asList(new TopicPartition("topicName", 1))); //指定消费偏移量开始消费 consumer.seek(new TopicPartition("topicName", 1), 3); //从该消费者组的偏移量开始消费 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 复制代码
整合SpringBoot
导入依赖
org.springframework.kafka
配置文件
spring: kafka: producer: retries: 3 #发送失败重试次数 batch-size: 65535 #设置一次发送大小 buffer-memory: 65534 #设置缓冲区大小 acks: 1 #设置应答机制 key-serializer: ...StringSerializer value-serializer: ...StringSerializer consumer: group-id: default-group #消费者组 enable-auto-commit: false #设置手动提交 auto-offset-reset: earliest #设置消费者组最早偏移 key-deserializer: ...StringDeserializer value-deserializer: ...StringDeserializer max-poll-record: 500 #一次最大拉取数 listener: ack-mode: MANUAL #一批poll处理后再提交offset ack-mode: MANUAL_IMMEDIATE #每一条消息单独提交offset 复制代码
生产者
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void send() { kafkaTemplate.send("topicName", 0, "myKey", "myValue"); } 复制代码
消费者
@KafkaListener(groupId="myGroup", topicPartitions={ @TopicPartition(topic="t1", partition="0"), @TopicPartition(topic="t2", partition="1"), }) public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); //手动提交offset ack.acknowledge(); } 复制代码
Kafka机制
Rebalance(分区数或消费者组发生变化时)
公式计算得到每个消费者消费几个分区
轮询
sticky,在不打乱原来分配的基础上进行分配
HW和LWO
HighWaterMark,当分片中的消息同步到所有副本中才可被消费者访问
LogEndOffset,消息最后的偏移量,即当LogEndOffset都达到的消息才可被消费
防止消息丢失
设置ack为1或-1可以防止消息丢失
min.insync.replicas设置为副本数
防止消费者重复消费
由于网络抖动,生产者未收到ack,导致生产者重复发送
解决幂等性从实际业务出发
保证消息顺序消费
指定key,将消息顺序发送到同一分区中
解决消息挤压问题
增加消费者或提升消费者数量
作者:锄禾___
链接:https://juejin.cn/post/7015741031773634574