kafka表引擎使用
1 创建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 3 --replication-factor 1 --topic order
2 验证生产者消费者
# 生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order
# 订单消息
{"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳", "order_date":"2021-03-09 23:01"}
# 创建消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --from-beginning
消费结果
{"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳", "order_date":"2021-03-09 23:01"}
消费者能成功消费到消息,kafka在本机可用
3 创建clickhouse kafka表引擎
我们需要使用kafka表引擎创建一张用于消费kafka数据的表,建表语句
CREATE TABLE kafka_order_consumer
(
id UInt64 COMMENT '订单id',
goods_name String COMMENT '商品名称',
price Decimal32(2) COMMENT '商品价格',
user_name String COMMENT '用户名称',
addr String COMMENT '收货地址',
order_date DateTime COMMENT '订单日期'
)ENGINE = Kafka()
SETTINGS
kafka_broker_list = '192.168.211.10:9092',
kafka_topic_list = 'order',
kafka_group_name = 'ck',
kafka_format = 'JSONEachRow'
kafka_broker_list
kafka地址
kafka_topic_list
topic主题名称
kafka_group_name
消费者组
kafka_format
kafka消息的数据类型
4 创建本地表
CREATE TABLE kafka_order_mergetree
(
id UInt64 COMMENT '订单id',
goods_name String COMMENT '商品名称',
price Decimal32(2) COMMENT '商品价格',
user_name String COMMENT '用户名称',
addr String COMMENT '收货地址',
order_date DateTime COMMENT '订单日期'
)ENGINE = MergeTree()
partition by toYYYYMM(order_date)
order by id
5 创建物化视图
CREATE MATERIALIZED VIEW consumer TO kafka_order_mergetree AS
SELECT id, goods_name, price, user_name, addr, order_date FROM kafka_order_consumer;
kafka_order_mergetree
最终存储的本地表
kafka_order_consumer
消费kafka的消费者表
6 kafka生产者发送数据
# 进入终端
bin/kafka-console-producer.sh --broker-list 192.168.211.10:9092 --topic order
# json数据
{"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳", "order_date":"2021-03-09 23:01:00"}
{"id":2, "goods_name":"小力丸", "price":888.88, "user_name":"jack", "addr":"广东深圳", "order_date":"2021-03-09 23:39:00"}
{"id":3, "goods_name":"营养快线", "price":666.66, "user_name":"jack", "addr":"广东深圳", "order_date":"2021-03-09 23:46:00"}
7 查看本地表数据
select * from kafka_order_mergetree;
查询结果
┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
│ 2 │ 小力丸 │ 888.88 │ jack │ 广东深圳 │ 2021-03-09 23:39:00 │
└────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
│ 3 │ 营养快线 │ 666.66 │ jack │ 广东深圳 │ 2021-03-09 23:46:00 │
└────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
│ 1 │ 大力丸 │ 999.99 │ jack │ 广东深圳 │ 2021-03-09 23:01:00 │
└────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
8 中间的小坑
一开始kafka发送数据时,order_date字段的时间格式为yyyy-MM-dd HH:mm
时间格式不正确,导致时间格式转换错误,修正的数据后重新发送数据,查看kafka消费表和本地表都没有数据,一度怀疑是clickhouse没有消费到kafka的数据,所以又通过kafka-console-consumer.sh
的方式以同样的消费者组ck
去消费消息,发现并没有消费到任何消息,说明消息已经被ck消费过了,接着重置了ck
消费者组的offset,发现查kafka_order_consumer
表报了错,怀疑是不是只要中间有一条数据有问题,将导致接下来的所有的数据都会消费失败;于是删除了topic,检查了数据order_date
字段的日期格式,重新生产数据发现可以了;
后面查看错误日志发现了...
DB::UnionBlockInputStream::~UnionBlockInputStream(): Code: 41, e.displayText() = DB::ParsingException: Cannot parse datetime 2021-03-09 23:01"}{}:
或许我一开始就应该去clickhouse的log目录下看日志的!!!!天色不早了,这个问题有待进一步深入研究...
作者:FyK_21f8
原文链接:https://www.jianshu.com/p/a23e1aa20861