阅读 105

Kafka 常用操作命令

Kafka 常用操作命令

操作命令

  • 查询系统的所有 Topic

# 获取所有的主题 ./bin/kafka-topics.sh --list --zookeeper localhost:2181 # 或者 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list  # 结果 #__consumer_offsets # myTopic 复制代码

  • __consumer_offsets_x 是系统的主题,是判断消费者消费的偏移量,一同会有50 个分区映射到0-49

  • 一个主题会对应多个日志目录,每个文件夹对应着一个分区

  • 创建一个 Topic

# 创建一个myTopic 3个分区,且一个副本,并且注册中心为 localhost:2181 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic yourTopic --replication-factor 1 --partitions 3 复制代码

  • 查询 Topic 的详细信息

# 查询yourTopic主题的详细信息 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic yourTopic # 返回结果 Topic: yourTopic PartitionCount: 3 ReplicationFactor: 1 Configs:  Topic: yourTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: yourTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: yourTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 复制代码

  • 查询 Topic 所有的分区信息

复制代码

  • 开启一个 Producer(生产者)

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic yourTopic 复制代码

  • 创建一个 Consumer(消费者)

# --from-beginning 可以消费历史数据 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourTopic --from-beginning 复制代码

Kafka主题

  • topic (主题) 相关的脚本

    • --bootstrap-server 与 --zookeeper

    • bin/kafka-topics.sh

    • 观察参数输出提示

  • 如果在发送消息时,所指定的主题并不存在,那么根据 Kafka 的配置,可能会有如下的两种情况发生。

    • Kafka Server 会报错,告诉发送者该主题不存在,需要先创建好主题后再发送消息。

    • Kafka Server 会自动创建所指定的主题,并将所发送的消息归类到所创建的这个主题下面。

  • 之所以会有如上两种区别,关键在于Kafka的配置文件中的一个参数项:

    • auto.create.topic.enable = true

  • 如果将该参数项指定为 true,那么在发送消息时,如果所指定的主题不存在,Kafka 就会帮我们自动创建该主题反之,则会报错。

  • __consumer_offsets 是Kafka Server 所创建的用于标识消费者偏移量的主题(Kafka 中的消息都是顺序保存在磁盘上的,通过 offset

偏移量来标识消息的顺序),它由Kafka Server 内部使用

  • 如果想要查看某个具体主题(如yourTopic),执行如下命令即可

    # 查询yourTopic主题的详细信息 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic yourTopic # 返回结果 Topic: yourTopic PartitionCount: 3 ReplicationFactor: 1 Configs:  Topic: yourTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: yourTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: yourTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0 复制代码

    信息;后续的每一行则给出一个分区的信息,如果只有一个分区,那么就只会显示出一行,正如上述输出那样。

    • 主题名:yourTopic

    • 当前分区:0

    • Leader Broker: 0

    • 副本:0

    • isr (in-sync replica): 0

    • 主题名:yourTopic

    • 分区数:3

    • 副本数:1

    • 上述第一行表示信息为:

    • 第二行信息表示为:

    • 第一行显示出所有的分区信息(yourTopic 主题的分区数是3 , 即在之前创建主题时所指定的 --partition 3 这个参数所确定的)的一个总结

  • 还可以查看 Kafka Server 自己所创建的用于管理消息偏移量的主题:__consumer_offsets 的详细信息,执行如下命令

./bin/kafka-topics.sh --describe --topic __consumer_offsets --zookeeper localhost:2181 复制代码

  • 执行结果可以看到,该主题有50个分区,副本数为1,同时也输出了相应的配置信息

  • 从第二行开始,列出了每个分区的信息,分区从0到49。由于我们这里使用了单台 Kafka Server ,因此可以卡进到每个分区的 Leader 都是0

这表示每个分区的 Leader 都是同一台 Server,即我们所启动这台 Kafka Server .

Kafka 中的 Zookeeper

  • 那么,这些主题都是保存在 Zookeeper 中的,  Kafka 是重度依赖Zookeeper 的, Zookeeper 保存了Kafka所需的元信息,以及关于主题、消息

偏移量等诸多信息,下面我们就到 Zookeeper 中插件一下相关的内容。

  • 可以通过 Kafka 集成的 Zookeeper 客户端脚本来连接到 Zookeeper Server 上

    • ./zookeeper-shell.sh localhost:2181

  • Zookeeper 命令执行

    是 ls + stat 两个命令的集合体,而stat命令则是用于输出状态信息的。)

    • ls /config/topics

    • ls2 /config/topics

    • 该命令不仅输出了主题的名字,还输出了相关的统计信息,如创建时间、版本号等信息。

    • ls /

    • ls2 /

    • 上述命令除了会列出 Zookeeper 根(/)下面的所有节点外,还会额外输出其他相关的信息(可以任务ls2 命令是ls 命令的增强,ls2 命令相当于

  • Zookeeper 的本质是一种树形结构,有一个跟节点/ 。它下面可以有若干个子节点,子节点下面还可以有子节点,每个子节点有自己的属性等信息,其

结构如下图所示

zk_nodes.jpg

  • Zookeeper 是Kafka的得力助手,同时也是很多系统所依赖的底层协调框架。对于Zookeeper 来说,有很多图形化的客户端能以比较直观的方式列出

各个节点的信息,不过这里还是建议大家先掌握Zookeeper 的命令行操作方式,以加深对其掌握和理解。

多消费者消费

  • 执行如下命令

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic  复制代码

  • 接下来,启动两个 Kafka Consumer, 分别执行如下命令

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning 复制代码

  • 在生产者发送消息

  • 通过这个操作过程,我们能够看看到多个 Kafka Consumer 可以消费同一个主题的同一条信息,这显然就是之前课程中所介绍的,广播的概念。

即多个客户端是可以获取到通一个主题的同一条信息并惊醒消费的。

  • 下面,关闭这两个 Kafka Server (ctrl + c); 然后再分别在这两个控制台窗口中执行上述同样的命令。

  • 我们发现,消费者中会显示出 Kafka Server 中myTopic 主题下已经拥有的N条历史消息。

  • 现在我们再次关闭这两个窗口中 去除 --from-beginning 那么就不会接受到历史接受到的消息。

  • 通过这个过程,实际上我们就讲述了 --from-beginning 参数的作用,它的作用是:

    • 如果消费者尚没有已建立的可用于消费的偏移量(offset),那么就从 Kafka Server 日期中最终开始消费的消息,而非最新消息开始消费。

消费者组

  • 我们停止以上的消费者然后执行如下命令

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --group test 复制代码

  • 如果我们在生产者输入"Hello World" 我们会观察到只会有一个消费者窗口收到消息,另外的消费者是无法收到消息的

  • 如果我们停掉其中一个消费者进程,那么另外一个消费者进程才能收到消息

  • 如果两个在不同的消费者组那么两个消费者进程都能收到消息

主题删除

  • 查询kafka 中已有的主题

./bin/kafka-topics.sh --list --zookeeper localhost:2181 # 返回 __consumer_offsets myTopic myTopic2 yourTopic 复制代码

  • 删除一个主题

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic2 复制代码

  • 该输出表示:主题 myTopic2 已经被标记为删除状态。同时还给出了一个提示信息,即如果没有配置项

delete.topic.enable 设为true ,那么这个删除操作将不会起任何作用。

  • 该配置在 Kafka Server 的 config 目录下的 server.properties 配置文件中进行的配置的, Kafka Server 默认是没有这个配置的。

  • 如果这个时候我们再去主题列表中查询是查询不到的

./bin/kafka-topics.sh --zookeeper localhost:2181 --list 复制代码

  • 查询详细信息会抛出异常,表示该主题不存在。(如果该主题没有被完全删除那么不会有任何输出也不会报错)

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic myTopic2 # 输出如下 Error while executing topic command : Topic 'myTopic2' does not exist as expected [2020-02-06 13:28:08,201] ERROR java.lang.IllegalArgumentException: Topic 'myTopic2' does not exist as expected at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:484) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:390) at kafka.admin.TopicCommand$.main(TopicCommand.scala:67) at kafka.admin.TopicCommand.main(TopicCommand.scala)  (kafka.admin.TopicCommand$) 复制代码

  • 当主体被完全删除后,日志文件目录下的日志文件夹以及下面的所有文件都会被删除。

  • 从 Kafka Server 的输出日志上可以看到,Kafka Server 是先删除了主题相关的索引信息,然后删除日志信息,即数据文件

[2020-02-06 13:20:33,631] INFO Deleted log /Users/zhengsh/Desktop/kafka-logs/kafka-logs/myTopic-0.875b1d2b9a9141bab80d3c0d0a4dbecd-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment) [2020-02-06 13:20:33,645] INFO Deleted offset index /Users/zhengsh/Desktop/kafka-logs/kafka-logs/myTopic-0.875b1d2b9a9141bab80d3c0d0a4dbecd-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment) [2020-02-06 13:20:33,647] INFO Deleted time index /Users/zhengsh/Desktop/kafka-logs/kafka-logs/myTopic-0.875b1d2b9a9141bab80d3c0d0a4dbecd-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment) [2020-02-06 13:20:33,673] INFO Deleted log for partition myTopic-0 in /Users/zhengsh/Desktop/kafka-logs/kafka-logs/myTopic-0.875b1d2b9a9141bab80d3c0d0a4dbecd-delete. (kafka.log.LogManager) 复制代码

  • 在zookeeper 中查看主题相关的信息是否被删除

./bin/zookeeper-shell.sh localhost:2181 ls /config/topics 复制代码

  • Kafka topic 删除的变化

    增加 delete.topic.enable=true. 这个一个配置项。然而,在Kafka 1.0 中,该参数默认就是 true 因此,无需无需显式指定即可成功删除主题; 如果不希望删除主题,那么就需要显式将 delete.topic.enable=false 添加到 server.properties 的配置文件中。

    被删除了,但与主题相关的消息数据依然还会被保留,需要用户手动到相关的数据目录下自行删除,不过这一切在Kafka1.0 中都发生了变化,在 Kafka1.0 中,当主题被删除后,与主题相关的数据也一并删除,并且是不可逆的。**

    • 另外,在Kafka 1.0 之前的版本中,如果删除了主题,那么被删除的主题名字会保存到 Zookeeper 的 /admin/delete_topics 节点中。**虽然主题

    • 从 Kafka 1.0 开始,主题删除操作以及相关配置与之前的版本比较,发生了较大的变化。

    • 值得注意的是,在Kafka 1.0 之前的版本中,delete.topic.enable 属性默认为false , 若想删除主题,需要在server.properties 配置文件中显式

  • 下面是Kafka 官方文档上的描述以得出的结论

    • Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to retain the

    • previous behavior should set the broker config delete.topic.enable to false Keep in mind that topic deletion removes

    • data and the operation is not reversible (i.e there is no "undelete" operation)


作者:老郑_
链接:https://juejin.cn/post/7022145069851361311


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐