阅读 471

JSON数据全量写入Clickhouse(含在Shell中处理JSON数据)

前言

我司需对全国车厂数据做分析以及监控(缴费、异常抬杆、进出、车辆类型、牌照等),JAVA那边的小伙伴负责部署Driver端实时把各节点底表数据同步上传至kafka中供数据组使用。

为了保证数据不丢失,除了实时接入数据,每天在凌晨会接入一版昨日的全量数据。数据冗余则通过Clickhouse的ReplacingMergeTree()引擎解决

前置准备

  • 阿里云OSS(离线数据从OSS接入)

  • ossutil (钱不到位,我司购买的oss不支持以目录的形式挂载到Linux中,所以使用该工具同步)

  • jq(在Linux中处理JSON数据)

  • Clickhouse(数据落地)

实施

一、安装ossfs

各版本下载地址以及安装教程:help.aliyun.com/document_de…

1、下载安装

[root@hadoop-prod-datanode1 ~]# wget http://gosspublic.alicdn.com/ossutil/1.7.7/ossutil64 [root@hadoop-prod-datanode1 ~]# chmod 755 ossutil64 复制代码

2、使用交互式配置生成配置文件

[root@hadoop-prod-datanode1 ~]# ./ossutil64 config 复制代码

根据提示分别设置Endpoint、AccessKey ID、AccessKey Secret和STSToken参数

参数说明如下:

  • endpoint:填写Bucket所在地域的Endpoint。各地域Endpoint详情,请参见访问域名和数据中心

    • 您也可以增加http://https://指定ossutil访问OSS使用的协议,默认使用HTTP协议。例如使用HTTPS协议访问深圳的Bucket,设置为https://oss-cn-shenzhen.aliyuncs.com

  • accessKeyID、accessKeySecret:填写账号的AccessKey。 *使用阿里云账号或RAM用户访问时,AccessKey的获取方式,请参见获取AccessKey。

    • 使用STS临时授权账号访问时,AccessKey的获取方式,请参见使用STS临时访问凭证访问OSS。

  • stsToken:使用STS临时授权账号访问OSS时需要配置该项,否则置空即可。

    • stsToken生成方式参见临时访问凭证。

二、安装JQ工具解析json

oss中的json数据格式为json包json,Clickhouse并不支持这种格式写入,所以需要把json拆成一条一条的

1、下载jq

[root@hadoop-prod-datanode1 ~]# yum install jq 复制代码

2、拿一个json测下功能可用性

[root@hadoop-prod-datanode1 ~]# jq -c .[] /data/park/park_data/xxx.json > /data/park/park_parse_data/xxx.json 复制代码

三、编写脚本批量写入Clickhouse

写入格式详见Ck官网 clickhouse.com/docs/en/int…

1、编写功能处理脚本

#!/bin/bash #*********************************************** #Author:        CoolCoolのXinXing #Version:       1.0 #Date:          2021-07-08 #FileName: park_extr_insert_clickhouse.sh #Description:   车厂全量数据入库 #*********************************************** #获取当前时间 date=$(date -d "1 day ago" +"%Y-%m-%d") echo "---------------------------${date}--------------------------------" #获取原始数据路径 path="/data/park/park_data/$date" #模糊匹配使oss中的文件名对应上Clickhouse中的表 args=$1 parkTableJson="$args*.json" #模式匹配要处理(导入)哪张表 case $args in     'VechicleExit')       tabName='out_vehicle_record'     ;;     'VechicleEnter')       tabName='in_vehicle_record'     ;;     'TempStopPayment')       tabName='pay_record'     ;; 'MonthlyRent')   tabName='token_record' ;; esac #把匹配到的所有json文件目录放入数组以供遍历 array1=$(ls $path/$parkTableJson) #创建本地数据文件夹,按天分目录存放 mkdir -p /data/park/parse_park_data/$date echo "-------------------${tabName}-----------------------------------" #循环处理 for value in ${array1[*]} do #把原始路径替换为本地数据路径落地解析拆分后的json parsePath=${value/park_data/parse_park_data} jq -c .[] $value > $parsePath     #判断文件是否为空,Ck插入空数据会报错     if [ -s $parsePath ]; then             #输出文件记录数,以便在日志中回溯             echo `wc -l $parsePath`             #插入数据到Clickhouse             clickhouse-client -h 10.196.17.27 --port=9999 --database="park" -u default --password 123456 --query="insert into park.${tabName} FORMAT JSONEachRow" < $parsePath     else         echo "文件内容为空不做处理:$parsePath"     fi done 复制代码

2、拉取oss增量数据并调用上面的脚本

#!/bin/bash #*********************************************** #Author:        CoolCoolのXinXing #Version:       1.0 #Date:          2021-07-08 #FileName: park_extr_all.sh #Description:   车厂全量数据入库 #*********************************************** #从OSS增量拉取车厂昨日全量数据 ossutil64 cp -r oss://park-client/file/ /data/park/park_data/ --update sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "VechicleExit" sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "VechicleEnter" sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "TempStopPayment" sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "MonthlyRent" 复制代码

后言

写点实战博客,具体要用到的技术例如:oss、jq、cilckhouse语法,这些都是工具,我认为没有必要在博客中讲的清清楚楚,毕竟百度 google一搜啥都有。重点分享的是思路和一点点经验。参考现有同学们的思路你可以扩展出自己的实现方式,这样就不局限于本文中的这些工具啦,条条大路通罗马,小年轻思路要打开


作者:XinXing
链接:https://juejin.cn/post/7020980356257939470


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐