JSON数据全量写入Clickhouse(含在Shell中处理JSON数据)
前言
我司需对全国车厂数据做分析以及监控(缴费、异常抬杆、进出、车辆类型、牌照等),JAVA那边的小伙伴负责部署Driver端实时把各节点底表数据同步上传至kafka中供数据组使用。
为了保证数据不丢失,除了实时接入数据,每天在凌晨会接入一版昨日的全量数据。数据冗余则通过Clickhouse的ReplacingMergeTree()
引擎解决
前置准备
阿里云OSS(离线数据从OSS接入)
ossutil (钱不到位,我司购买的oss不支持以目录的形式挂载到Linux中,所以使用该工具同步)
jq(在Linux中处理JSON数据)
Clickhouse(数据落地)
实施
一、安装ossfs
各版本下载地址以及安装教程:help.aliyun.com/document_de…
1、下载安装
[root@hadoop-prod-datanode1 ~]# wget http://gosspublic.alicdn.com/ossutil/1.7.7/ossutil64 [root@hadoop-prod-datanode1 ~]# chmod 755 ossutil64 复制代码
2、使用交互式配置生成配置文件
[root@hadoop-prod-datanode1 ~]# ./ossutil64 config 复制代码
根据提示分别设置Endpoint、AccessKey ID、AccessKey Secret和STSToken参数
参数说明如下:
endpoint:填写Bucket所在地域的Endpoint。各地域Endpoint详情,请参见访问域名和数据中心
您也可以增加
http://
或https://
指定ossutil访问OSS使用的协议,默认使用HTTP协议。例如使用HTTPS协议访问深圳的Bucket,设置为https://oss-cn-shenzhen.aliyuncs.com
。accessKeyID、accessKeySecret:填写账号的AccessKey。 *使用阿里云账号或RAM用户访问时,AccessKey的获取方式,请参见获取AccessKey。
使用STS临时授权账号访问时,AccessKey的获取方式,请参见使用STS临时访问凭证访问OSS。
stsToken:使用STS临时授权账号访问OSS时需要配置该项,否则置空即可。
stsToken生成方式参见临时访问凭证。
二、安装JQ工具解析json
oss中的json数据格式为json包json,Clickhouse并不支持这种格式写入,所以需要把json拆成一条一条的
1、下载jq
[root@hadoop-prod-datanode1 ~]# yum install jq 复制代码
2、拿一个json测下功能可用性
[root@hadoop-prod-datanode1 ~]# jq -c .[] /data/park/park_data/xxx.json > /data/park/park_parse_data/xxx.json 复制代码
三、编写脚本批量写入Clickhouse
写入格式详见Ck官网 clickhouse.com/docs/en/int…
1、编写功能处理脚本
#!/bin/bash #*********************************************** #Author: CoolCoolのXinXing #Version: 1.0 #Date: 2021-07-08 #FileName: park_extr_insert_clickhouse.sh #Description: 车厂全量数据入库 #*********************************************** #获取当前时间 date=$(date -d "1 day ago" +"%Y-%m-%d") echo "---------------------------${date}--------------------------------" #获取原始数据路径 path="/data/park/park_data/$date" #模糊匹配使oss中的文件名对应上Clickhouse中的表 args=$1 parkTableJson="$args*.json" #模式匹配要处理(导入)哪张表 case $args in 'VechicleExit') tabName='out_vehicle_record' ;; 'VechicleEnter') tabName='in_vehicle_record' ;; 'TempStopPayment') tabName='pay_record' ;; 'MonthlyRent') tabName='token_record' ;; esac #把匹配到的所有json文件目录放入数组以供遍历 array1=$(ls $path/$parkTableJson) #创建本地数据文件夹,按天分目录存放 mkdir -p /data/park/parse_park_data/$date echo "-------------------${tabName}-----------------------------------" #循环处理 for value in ${array1[*]} do #把原始路径替换为本地数据路径落地解析拆分后的json parsePath=${value/park_data/parse_park_data} jq -c .[] $value > $parsePath #判断文件是否为空,Ck插入空数据会报错 if [ -s $parsePath ]; then #输出文件记录数,以便在日志中回溯 echo `wc -l $parsePath` #插入数据到Clickhouse clickhouse-client -h 10.196.17.27 --port=9999 --database="park" -u default --password 123456 --query="insert into park.${tabName} FORMAT JSONEachRow" < $parsePath else echo "文件内容为空不做处理:$parsePath" fi done 复制代码
2、拉取oss增量数据并调用上面的脚本
#!/bin/bash #*********************************************** #Author: CoolCoolのXinXing #Version: 1.0 #Date: 2021-07-08 #FileName: park_extr_all.sh #Description: 车厂全量数据入库 #*********************************************** #从OSS增量拉取车厂昨日全量数据 ossutil64 cp -r oss://park-client/file/ /data/park/park_data/ --update sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "VechicleExit" sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "VechicleEnter" sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "TempStopPayment" sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "MonthlyRent" 复制代码
后言
写点实战博客,具体要用到的技术例如:oss、jq、cilckhouse语法,这些都是工具,我认为没有必要在博客中讲的清清楚楚,毕竟百度 google一搜啥都有。重点分享的是思路和一点点经验。参考现有同学们的思路你可以扩展出自己的实现方式,这样就不局限于本文中的这些工具啦,条条大路通罗马,小年轻思路要打开
作者:XinXing
链接:https://juejin.cn/post/7020980356257939470