掘金 后端 ( ) • 2022-08-09 15:00

大数据开发:利用shell脚本重刷指定日期区间的历史数据,下沉到clickhouse

在做数据ETL的时候,可能会遇到这样的情景: 需要对某个每天新增原始数据的表,进行数据清理,提取出有价值的数据内容,然后重新下沉到另一张表中,也就是一个初步的ETL过程。由于需要处理的历史数据可能是多天的,所以需要我们编写脚本来去指定日期,自动运行。而不用我们每天都要去手动执行一遍。

本次使用的数据库是Clickhouse,因此会借助一些ck中的函数来方便我们的处理。具体的相关CK函数的内容,可以参考:

JSON函数¶

首先列出整体的代码内容:

#!/bin/bash
startDate=$1
endDate=$2



####log_correct函数打印正确的输出到日志文件
function log_correct () {
DATE=`date "+%Y-%m-%d %H:%M:%S"` ####显示打印日志的时间
USER=$(whoami) ####那个用户在操作
echo "${DATE} ${USER} execute $0 [INFO] $@"  ######($0脚本本身,$@将参数作为整体传输调用)
}


function log_error ()
{
DATE=`date "+%Y-%m-%d %H:%M:%S"`
USER=$(whoami)
echo "${DATE} ${USER} execute $0 [ERROR] $@" 
}

log_correct "执行的开始日期为: ${startDate}"
log_correct "执行的结束日期为: ${endDate}"


if [ $# -eq 2 ];
then
    ############ execute sink clickhouse #############
    while :
    do
        start_date="$startDate 00:00:00"
        end_date="$startDate 23:59:59"
        log_correct "正在执行${startDate}日期的数据"
        sql="
            insert into countly_device_info_i_r_local
            select app_key as appKey,
                   device_id as deviceId,
                   log_time as nginxTime, 
                   visitParamExtractRaw(data, 'rooted') as rooted,
                   visitParamExtractString(data, 'sys_name') as sys_name,
                   visitParamExtractString(data, 'carrier') as carrier,
                   visitParamExtractString(data, 'sys_version') as sys_version,
                   visitParamExtractString(data, 'name') as name,
                   visitParamExtractString(data, 'phone_no') as phone_no,
       
            from 
            (
                select app_key, device_id, log_time, trim(BOTH '\"' FROM extract(data, '\"{.*}\"')) as data
                from 
                (
                    select app_key, device_id, log_time, data from countly_simple where log_time>="\'${start_date}\'" and log_time<="\'${end_date}\'" and data like '%devInfo%'
                ) tmp where and isValidJSON(data)=1
            ) tmp2;
            "

        
        clickhouse-client --multiline -u default  -h 10.105.220.210 --password ch20482048 --query "${sql}" 2>>/home/webedit/sinkLog.txt
        
        ############ Check whether the SQL statement is successfully executed #############
        exitCode=$?
        if [ $exitCode -ne 0 ];
        then
            log_error "sink execute is failed!!! ${sql}"
            exit $exitCode
        else
            log_correct "${startDate}日期的数据输入导入完成}"
        fi


        if [[ ${startDate} -eq ${endDate} ]];
        then
            break
        fi

        startDate=$(date -d "${startDate} 1 days" "+%Y-%m-%d")

    done

else
    log_error "请输入执行的开始时间和结束时间,输入的两个日期参数格式必须是yyyy-MM-dd"
fi

首先利用定义了log函数来对脚本运行过程中的信息进行输出

利用while循环来遍历需要处理的日期区间。由于我数据表的日期字段是DateTime类型,所以需要添加时分秒的信息。

接下来,编写SQL。

首先内层SQL限定了所需要处理的时间日期区间的数据,以及筛选出data字段具有特定格式的数据条目。

然后外层SQL借助了Clickhouse的字符串查找函数extract,利用了正则表达式,匹配出JSON格式的数据部分。然后利用trim函数,做数据清理,删除掉JSON格式开头和结尾的大括号前后的"。同时,做了一些限定,比如isValidJSON(data)=1表示只选取清理后的data字段是JSON格式的数据条目。

保证了得到的每条数据条目都是JSON格式的数据后,利用Clickhouse的JSON函数提取出需要获取的字段,value为String类型的字段使用visitParamExtractString,由于rooted字段的value是Boolean类型,所以使用visitParamExtractRaw函数。

最后利用insert into [table] select ...的方式,将etl清理好的数据导入到新表中。

执行过程如下所示:

[webedit@bigdata-client-m220-106 ~]$ sh shelltest.sh 2021-12-17 2021-12-20
2021-12-20 17:44:45 webedit execute shelltest.sh [INFO] 执行的开始日期为: 2021-12-17
2021-12-20 17:44:45 webedit execute shelltest.sh [INFO] 执行的结束日期为: 2021-12-20
2021-12-20 17:44:45 webedit execute shelltest.sh [INFO] 正在执行2021-12-17日期的数据
2021-12-20 17:44:48 webedit execute shelltest.sh [INFO] 2021-12-17日期的数据输入导入完成}
2021-12-20 17:44:48 webedit execute shelltest.sh [INFO] 正在执行2021-12-18日期的数据
2021-12-20 17:44:50 webedit execute shelltest.sh [INFO] 2021-12-18日期的数据输入导入完成}
2021-12-20 17:44:50 webedit execute shelltest.sh [INFO] 正在执行2021-12-19日期的数据
2021-12-20 17:44:51 webedit execute shelltest.sh [INFO] 2021-12-19日期的数据输入导入完成}
2021-12-20 17:44:51 webedit execute shelltest.sh [INFO] 正在执行2021-12-20日期的数据
2021-12-20 17:44:52 webedit execute shelltest.sh [INFO] 2021-12-20日期的数据输入导入完成}

查看对应的Clickhouse数据库,可以发现数据导入到对应的数据表中。Clickhouse本身的速度确实很快,千万级别的数据也是几秒就能实现查询和插入了。