阅读 107

kafka表引擎使用

1 创建kafka topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 3 --replication-factor 1 --topic order

2 验证生产者消费者

# 生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order

# 订单消息
{"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:01"}
# 创建消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --from-beginning

消费结果

{"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:01"}

消费者能成功消费到消息,kafka在本机可用

3 创建clickhouse kafka表引擎

我们需要使用kafka表引擎创建一张用于消费kafka数据的表,建表语句

CREATE TABLE kafka_order_consumer
(
    id UInt64 COMMENT '订单id',
    goods_name String COMMENT '商品名称',
    price Decimal32(2) COMMENT '商品价格',
    user_name String COMMENT '用户名称',
    addr String COMMENT '收货地址',
    order_date DateTime COMMENT '订单日期'
)ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = '192.168.211.10:9092',
    kafka_topic_list = 'order',
    kafka_group_name = 'ck',
    kafka_format = 'JSONEachRow'

kafka_broker_list kafka地址
kafka_topic_list topic主题名称
kafka_group_name 消费者组
kafka_format kafka消息的数据类型

4 创建本地表

CREATE TABLE kafka_order_mergetree
(
    id UInt64 COMMENT '订单id',
    goods_name String COMMENT '商品名称',
    price Decimal32(2) COMMENT '商品价格',
    user_name String COMMENT '用户名称',
    addr String COMMENT '收货地址',
    order_date DateTime COMMENT '订单日期'
)ENGINE = MergeTree()
    partition by toYYYYMM(order_date)
    order by id 

5 创建物化视图

CREATE MATERIALIZED VIEW consumer TO kafka_order_mergetree AS
    SELECT id, goods_name, price, user_name, addr, order_date FROM kafka_order_consumer;

kafka_order_mergetree 最终存储的本地表
kafka_order_consumer 消费kafka的消费者表

6 kafka生产者发送数据

# 进入终端
bin/kafka-console-producer.sh --broker-list 192.168.211.10:9092 --topic order

# json数据
{"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:01:00"}
{"id":2, "goods_name":"小力丸", "price":888.88, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:39:00"}
{"id":3, "goods_name":"营养快线", "price":666.66, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:46:00"}

7 查看本地表数据

select * from kafka_order_mergetree;

查询结果

┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
│  2 │ 小力丸     │ 888.88 │ jack      │ 广东深圳 │ 2021-03-09 23:39:00 │
└────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
│  3 │ 营养快线   │ 666.66 │ jack      │ 广东深圳 │ 2021-03-09 23:46:00 │
└────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
│  1 │ 大力丸     │ 999.99 │ jack      │ 广东深圳 │ 2021-03-09 23:01:00 │
└────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘

8 中间的小坑

一开始kafka发送数据时,order_date字段的时间格式为yyyy-MM-dd HH:mm时间格式不正确,导致时间格式转换错误,修正的数据后重新发送数据,查看kafka消费表和本地表都没有数据,一度怀疑是clickhouse没有消费到kafka的数据,所以又通过kafka-console-consumer.sh的方式以同样的消费者组ck去消费消息,发现并没有消费到任何消息,说明消息已经被ck消费过了,接着重置了ck消费者组的offset,发现查kafka_order_consumer表报了错,怀疑是不是只要中间有一条数据有问题,将导致接下来的所有的数据都会消费失败;于是删除了topic,检查了数据order_date字段的日期格式,重新生产数据发现可以了;

后面查看错误日志发现了...

DB::UnionBlockInputStream::~UnionBlockInputStream(): Code: 41, e.displayText() = DB::ParsingException: Cannot parse datetime 2021-03-09 23:01"}{}:

或许我一开始就应该去clickhouse的log目录下看日志的!!!!天色不早了,这个问题有待进一步深入研究...

作者:FyK_21f8

原文链接:https://www.jianshu.com/p/a23e1aa20861

文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐