Hive和Kafka在数据稽核和同步中的应用

    曹建华 徐晨敏 郭昱含

    

    

    

    【摘要】? ? 中国电信自主测评管理平台使用了Hadoop数据仓库工具Hive对基础数据进行合规性稽核,稽核后的数据通过Sqoop工具同步至Oracle关系数据库。针对多批次百万级数据量并行同步会导致Oracle负载过大影响正常OLTP的情况,通过应用Kafka消息队列,将Hive与Oracle之间的数据并行同步改为异步模式下可按需设置串行/并行同步,问题得到有效解决。

    【关键词】? ? Hadoop? ? Hive? ? Sqoop? ? Kafka

    Application of hive and Kafka in data audit and synchronization

    Cao jianhua, Xu chenmin, Guo yuhan(Customer service operation support center of China Telecom Group,Shanghai 200041)

    Abstract:China Telecom independent evaluation management platform uses Hive which is a Hadoop data warehouse tool to audit the basic data, and the audited data is synchronized to Oracle relational database through sqoop tool. When multiple batches of millions of data are synchronized in parallel, Oracle load will be too large, which will affect the normal OLTP. By applying Kafka message queue, the data parallel synchronization between hive and Oracle can be changed to asynchronous mode, and either serial or parallel synchronization can be set on demand. The problem has been effectively solved.

    Key words:Hadoop、Hive、Sqoop、Kafka

    引言

    中国电信自主测评管理平台用于支撑建立“客户说了算”的服务评价体系,负责对测评数据进行全流程管理,其中包括基础数据质量稽核和用户免打扰处理,以提升整体测评质量。用户免打扰处理是指对已标识的特殊用户不纳入测评,对曾经测评过的用户在一定期限内避免做二次测评。用户满意度测评分若干个指标,其中综合满意度测评单批次基础数据达百万级,为对基础数据进行高效的稽核,本平台采用了Hadoop分布式系统中的Hive数据仓库工具和oracle关系数据库相结合的技术方案,前者用于数据稽核和免打扰处理,后者用于存储稽核后的数据便于数据查询应用。

    本文重点介绍采用Hive进行数据稽核及与Oracle之间进行数据同步的技术实现和优化方案。

    一、技术方案

    1.1技术方案选型

    目前中国电信自主测评覆盖了公众、政企、触点、以及NPS等20多个指标,在对用户进行满意度测评前,需要对基础测评数据进行稽核、精准抽样,以提升整体测评质量。数据稽核和免打扰处理包括号码长度校验、是否数字化校验、省份和本地网归属校验、批次内重复数据校验、与测评免打扰库数据重复性校验,其中公众综合满意度测评基础数据量单批次达上百万,并且根据测评场景要求多个批次数据经常要并行稽核,因此对数据稽核处理能力提出了较高的要求。传统上数据处理通常使用关系型数据库如Oracle,本平台还搭建了Hadoop分布式系统(版本2.7.1),其中包含支持SQL的Hive工具。Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询语言(称为HiveQL)。下表是Hive与关系数据库数据处理的对比:

    本次数据稽核不对原数据字段做修改,而是根据稽核情况追加数据标签。考虑到单批次处理的数据规模和并行处理要求,采用Hive作为数据稽核处理工具,稽核之后的数据通过sqoop工具同步至oracle关系数据库,方便数据的查询和应用。具体处理过程如图1。

    1.2稽核和免打扰处理具体实现

    如图2所示,数据稽核和同步采用HDFS Shell脚本嵌HiveSQL文件的方式,通过Linux的crontab定时任务工具触发。各个测评指标的稽核脚本相互独立,只要扫描到有需要稽核的数据便开始执行。

    下面是以其中某个测评指标为例的部分关键代码:

    #step1:执行加载数据到Hive的sql脚本文件

    hive -hiveconf dt=$op_time -hiveconf type1=$type1 -hiveconf file1=$file1 -f /app/data/shell/10000load.sql

    #其中10000load.sql代碼如下

    load data local inpath ‘${hiveconf:file1} overwrite into table ctd.table_temp partition(dt=${hiveconf:dt},type1=${hiveconf:type1});

    #step2:执行稽核打标和免打扰处理的sql脚本文件

    hive? ?-hiveconf? ?dt=$dt? -hiveconf type1=$type1 -hiveconf file_batch1=$file_batch1 -f /app/data/ shell/business_yhfw.sql

    由于该sql文件语句较为复杂,由于篇幅所限此处不再详细展开。

    #step3:执行数据导出至Oracle数据库的shell脚本

    sh /app/data/shell/Sqoop_Export.sh $dt $type1

    #其中Sqoop_Export.sh脚本中关键代码如下

    sqoop export --table oracle_tablename --connect ***:thin:*** --username *** --password *** --export-dir '/apps/hive/warehouse/ctd.db/dt='$op_time'/type1='$type1''? ? ? ?\

    --columns UUID,COL1,COL2,…,COLN --input-fields-terminated-by '\001' --input-lines-terminated-by '\n' --input-null-string '\\N' --input-null-non-string '\\N'

    需要说明的是,HiveSQL语法、表模型设计、执行计划和计算引擎是影响Hive执行性能的主要因素,具体调优方法可见本文参考文献[2]。

    1.3数据同步优化方案

    上述方案在具体应用过程中,各类测评指标数据稽核任务独立进行,稽核完毕后即调用Sqoop工具将数据同步至oracle数据库。当超过百万数据量的多个任务并行写入Oracle时,会导致其OLTP(On-Line Transaction Processing)受到严重影响。为了解决这个问题,引入了Hadoop中的Kafka消息队列,将数据并行同步优化为异步模式下可按数据量规模设置串行/并行同步,具体流程图如图3。

    Kafka是一个分布式的、高吞吐量、高可扩展性的消息系统,它基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。Kafka 基于页缓存和磁盘顺序写的方式实现了写数据的超高性能,还具有数据压缩、同时支持离线和实时数据处理等优点,适用于大批量日志压缩收集、监控数据聚合等需要异步处理的场。应用Kafka要避免消息不丢失不重复消费,需要设置生产者和消费者的相关配置参数,其生产者和消费者默认模式都采用at least once(至少一次),即消息不会丢失,但可能被处理多次。本方案使用的Kafka版本为2.11-0.9.0.1,可支持在生产者设置enable.idempotent参数为true,同时在消费者设置enable.auto.commit参数为false,并自行控制offset(偏移量)的提交,來实现exactly once(精确一次)模式。

    本方案中通过shell脚本实现生产者向Kafka发布稽核完成的任务主题消息。为便于对Kafka中的partition进行offset操作,应用Java语言实现消费者订阅主题消息,以获取到需要数据同步的具体任务消息,再通过shell脚本调用sqoop工具实现数据从Hadoop同步至oracle数据库。

    消费者监听到主题消息时,会先行判断该消息对应的数据稽核任务中数据量规模,当超过设定阈值时,采用单线程执行数据串行同步,降低对oralce数据库的压力;当低于设定阈值时,采用多线程执行数据并行同步,提升同步效率。

    关键代码如下:

    #调整2.2中step3代码:由执行数据同步改为发送数据同步消息,异步处理

    #sh /app/data/shell/Sqoop_Export.sh $dt $type1

    sh /app/data/shell/ kafkaproject/start_kafka.sh $dt $topic $path $file_count $type1 $limit

    #其中start_kafka.sh消息发送的关键代码

    cat /app/data/ctd/shell/dsfcpeq/kafkaproject/batchMessage.txt | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic} | > /dev/null

    #step4:应用java实现消费者的关键代码

    @Component

    public class KafkaConsumer {

    ...

    // 多线程池

    ExecutorService fixedThreadPool = null;

    // 单线程池

    final ExecutorService singleThreadPool = ThreadPoolFactory.getNewSingleThreadPool();

    //监听Kafka消息

    @KafkaListener(topics = “#{‘${spring.kafka.consumer.topics}.split(‘,)}”)

    public void onMessage(ConsumerRecord record) {

    // 获取任务消息内容

    ReqPara reqPara = JSON.parseObject(record.value().toString(), ReqPara.class);

    ...

    //根据数据稽核任务中数据规模等条件设置数据同步方式(单线程串行/多线程并行)

    if (!StringUtils.isEmpty(size) && !StringUtils.isEmpty(limit) && Integer.parseInt(size) > Integer.parseInt(limit)) {

    singleThreadPool.execute(runnable);

    } else {

    ...

    fixedThreadPool = ThreadPoolFactory.getNewFixedThreadPool(CpuCores * 2);

    …

    fixedThreadPool.execute(runnable);

    }

    }

    //线程实现,调用shell脚本触发sqoop同步数据

    private Runnable newThread(List pathAndParams) {

    return new Runnable() {

    @Override

    public void run() {

    …

    ProcessBuilder processBuilder = new ProcessBuilder(pathAndParams);

    processBuilder.redirectErrorStream(true);

    exec = processBuilder.start();

    …

    }

    }

    }

    }

    本案例只是Kafka應用的其中一角,在自主测评管理平台中,还借助Kafka实现了对全网测评执行能力的统筹管理,基于工作流和数据流统一调度CATI测评、智能语音测评、互联网测评等能力平台。Kafka是成长最快的开源项目之一,正在成为管理和处理流式数据的利器。它虽然类似于ActiveMQ、RabbitMQ等消息队列产品,但它以集群的方式运行可以自由伸缩,可以满足数据个性化存储的要求,其流式处理能力可支持动态地处理派生流和数据集。更多关于其安装配置、消息生产与消费、管理监控的知识可详见本文参考文献[3]。

    1.4更换Hive引擎提升数据处理效率

    HiveSQL最后都会转化成各个计算引擎所能执行的任务,目前Hive支持MapReduce(MR)、Tez和Spark 3种计算引擎。本平台使用了Hive1.2.1版本,其默认使用MR作为执行引擎。由于MapReduce中间计算均需要写入磁盘,而Spark是放在内存中整体处理效率更高,所以可通过修改Hive的引擎即设置成Hive on Spark模式来提升数据稽核处理的效率。

    需要提醒的是Hive与Spark存在版本兼容的要求,安装配置过程较为复杂,且上述使用的shell脚本也需要同步调整,具体本文不再赘述。

    二、总结

    本文介绍了基于Hive的大批量数据稽核处理的技术实现方案,并通过优化HiveSQL语法和更换计算引擎进一步提升了数据处理效率。针对多个大批量数据并行同步导致oracle的OLTP受到严重影响的问题,并通过引入Kafka将数据并行同步优化为异步模式下可按数据量规模设置串行/并行同步,兼顾了性能和效率。本应用案例对于大数据量稽核和异步处理场景具有较高的可参考性。

    参? 考? 文? 献

    [1]张良均,樊哲,位文超,刘名军. Hadoop与大数据挖掘[M].北京:机械工业出版社,2016:25-27

    [2]林志煌. Hive性能调优[M].北京:机械工业出版社·华章图文,2020:2-10

    [3]Neha Narkhede等著.Kafka权威指南[M].薛命灯译.北京:人民邮电出版社,2017:15-35