Hadoop于2006年1月28日诞生,至今已有10年,它改变了企业对数据的存储、处理和分析的过程,加速了大数据的发展,形成了自己的极其火爆的技术生态圈,并受到非常广泛的应用。在2016年Hadoop十岁生日之际,InfoQ策划了一个Hadoop热点系列文章,为大家梳理Hadoop这十年的变化,技术圈的生态状况,回顾以前,激励以后。
近十年来,随着Hadoop生态系统的不断完善,Hadoop早已成为大数据事实上的行业标准之一。面对当今互联网产生的巨大的TB甚至PB级原始数据,利用基于Hadoop的数据仓库解决方案Hive早已是Hadoop的热点应用之一。达观数据团队长期致力于研究和积累Hadoop系统的技术和经验,并构建起了分布式存储、分析、挖掘以及应用的整套大数据处理平台。
本文将从Hive原理、数据分析平台架构、数据分析实战、Hive优化等四个方面来分享一些关于系统架构和Hive的心得和实战经验,希望大家有所收获。
1 Hive原理
Hadoop是一个流行的开源框架,用来存储和处理商用硬件上的大规模数据集。对于HDFS上的海量日志而言,编写Mapreduce程序代码对于类似数据仓库的需求来说总是显得相对于难以维护和重用,Hive作为一种基于Hadoop的数据仓库解决方案应运而生,并得到了广泛应用。
Hive是基于Hadoop的数据仓库平台,由Facebook贡献,其支持类似SQL的结构化查询功能。Facebook设计开发Hive的初衷就是让那些熟悉sql编程方式的人也可以更好的利用hadoop,hive可以让数据分析人员只关注于具体业务模型,而不需要深入了解Map/Reduce的编程细节,但是这并不意味着使用hive不需要了解和学习Map/Reduce编程模型和hadoop。对于Hive分析人员来说,深入了解Hadoop和Hive的原理和Mapreduce模型,对于优化查询总有益处。
1.1 Hive组件与模型
Hive的组件总体上可以分为以下几个部分:用户接口(UI)、驱动、编译器、元数据(Hive系统参数数据)和执行引擎。Hive中包含4中数据模型:Tabel、ExternalTable、Partition、Bucket。
图:hive数据模型
a) Table:每一个Table在Hive中都有一个相应的目录来存储数据;
b) Partition:表中的一个Partition对应于表下的一个目录,所有的Partition数据都存储在对应的目录中;
c) Buckets:对指定列计算的hash,根据hash值切分数据,目的是为了便于并行,每一个Buckets对应一个文件;
d) External Table指向已存在HDFS中的数据,可创建Partition。
读时验证机制
与传统数据库对表数据进行写时严重不同,Hive对数据的验证方式为读时模式,即只有在读表数据的时候,hive才检查解析具体的字段、shema等,从而保证了大数据量的快速加载。
如果表schema与表文件内容不匹配,Hive会尽其所能的去读数据。如果schema中表有10个字段,而文件记录却只有3个字段,那么其中7个字段将为null;如果某些字段类型定位为数值类型,但是记录中却为非数值字符串,这些字段也将会被转换为null。Hive会努力catch读数据时遇到的错误,并努力返回。既然Hive表数据存储在HDFS中且Hive采用的是读时验证方式,定义完表的schema会自动生成表数据的HDFS目录,且我们可以以任何可能的方式来加载表数据或者利用HDFS API将数据写入文件,同理,当我们若需要将hive数据写入其他库(如oracle),也可以直接通过api读取数据再写入目标库。
再次注意,加载或者写入的数据内容要和表定义的schema一致,否则将会造成字段或者表为空。
1.2 HQL翻译成MapReduce Job
Hive编译器将HQL代码转换成一组操作符(operator),操作符是Hive的最小操作单元,每个操作符代表了一种HDFS操作或者MapReduce作业。Hive中的操作符包括:TableScanOperator、ReduceSinkOperator、JoinOperator、SelectOperator、FileSinkOperator、FilterOperator、GroupByOperator、MapJoinOperator等。
Hive语句
INSERT OVERWRITE TABLE read_log_tmp SELECT a.userid,a.bookid,b.author,b.categoryid FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;
其执行计划为:
图:join的任务执行流程
1.3 与一般SQL的区别
Hive 视图与一般数据库视图
Hive视图只支持逻辑视图,不支持物化视图,即每次对视图的查询hive都将执行查询任务,因此视图不会带来性能上的提升。作为Hive查询优化的一部分,对视图的查询条件语句和视图的定义查询条件语句将会尽可能的合并成一个条件查询。
Hive索引与一般数据库索引
Hive1.2.1版本目前支持的索引类型有CompactIndexHandler和Bitmap。
CompactIndexHandler 压缩索引通过将列中相同的值得字段进行压缩从而减小存储和加快访问时间。需要注意的是Hive创建压缩索引时会将索引数据也存储在Hive表中。对于表tb_index (id int, name string) 而言,建立索引后的索引表中默认的三列一次为索引列(id)、hdfs文件地址(_bucketname)、偏移量(offset)。
Bitmap 位图索引作为一种常见的索引,如果索引列只有固定的几个值,那么就可以采用位图索引来加速查询。利用位图索引可以方便的进行AND/OR/XOR等各类计算,Hive0.8版本开始引入位图索引,位图索引在大数据处理方面的应用广泛,比如可以利用bitmap来计算用户留存率(索引做与运算,效率远好于join的方式)。如果Bitmap索引很稀疏,那么就需要对索引压缩以节省存储空间和加快IO。Hive的Bitmap Handler采用的是EWAH(https://github.com/lemire/javaewah)压缩方式。
2 数据分析平台
2.1 架构与模块
达观数据分析平台包括数据收集加载模块、数据分析计算模块、任务调度系统以及可视化系统。
图:数据分析平台基本框架
数据收集模块
数据模块负责收集移动端app、网页端以及服务器端大量的日志数据。移动端可自行开发数据上报功能或者使用sdk来上报数据。网页端利用植入的js将用户的行为进行上报,服务器端通过http server来收集上报的数据。服务器端的日志信息可以通过DX模块(一个跨库的数据交换系统)来将待处理数据推入hive数据分析平台。除此之外,数据来源还包括大量的user 、item基本数据等等。数据收集完成将所有需要处理分析的原始数据推入hadoop平台。从物理形式来看,即将待分析数据写入HDFS。
数据ETL模块
一般而言,上报的数据都是非结构化或者半结构化的。ETL(抽取、转换、加载)模块负责将所有的非结构或者半结构的数据转换成结构化的数据并加载到hive库表中。例如对于用户访问日志(可能是web server日志),我们需要从每行日志中抽取出用户的标识(cookie、imei或者userid),ip来源、url等。从形式上来看,ETL将HDFS的原始数据结构化,以表的形式提供分析。
数据分析与计算
根据业务需求和功能,利用HQL实现各种统计分析。一个Hive任务的来源表可能是多个,结果数据也有可能会写入多张表。
图:Hive任务执行输入输出
任务调度系统
从上图可以看出,Hive任务之间存在依赖关系,不至于Hive任务之间存在依赖,Hive任务与DX任务之间、DX任务之间都可能存在某种依赖关系,达观数据分析平台支持的任务类型还包括MR任务、shell任务等,达观数据分析平台自行开发司南调度系统来完成平台中所有任务的调度。关于司南调度系统可见后续讨论。
数据分析平台模块
图:数据分析平台基本模块
接下来将陆续介绍,数据分析平台中的两个重要模块:DX数据交换系统以及任务调度系统。
2.2 DX数据交换
DX系统可以在关系型数据库、Hive、FTP等系统之间实现数据的交换。DX定义了Writer和Reader接口来抽象对数据的读写操作,对于各种存储类型的数据,需定制他们的实现方法。
关系型数据库利用JDBC实现其读写功能;对于Hive而言,直接利用HDFS API实现对HDFS文件的读写,由于Hive的读时验证机制,需要在读写Hive表文件时,定义其字段个数、名称等信息,保证与表定义一致;FTP文件目前的处理方法是先将数据从FTP服务器拉下来,然后将读取文件内容,写入Hive数据库。
以上过程是其他数据源到Hive的数据传输过程,Hive数据同样可以通过DX系统写入其他数据源。
2.3 任务调度
达观数据分析平台开发的司南调度系统将任务分为资源依赖型和实践依赖型。时间依赖型任务类似于crontab定时任务一样,到时触发其执行。资源依赖型任务需要其依赖的资源都满足时才会触发其执行。可调度的任务类型包括DX任务、Hive任务、MR任务、shell任务等。司南系统中最为关键的是dispatcher模块,该模块通过zookeeper来调度任务在agent(执行任务的代理服务器,需要设置多个)上的运行,关于zookeeper如何协调分布式应用的一致性在此不再累述。
2.4 架构演化
达观数据分析平台在使用过程中,不断提高其易用性和稳定性。在大量的研究和开发过程中,平台从无到有,走出第一步到功能完善、发挥巨大的业务价值。
从分散的数据交换到集中的数据交换系统
在使用统一的数据交换系统DX后,各业务系统的数据可以更好的进行汇聚和打通,进行统一的分析和处理。
从分散的作业调度到集中的任务调度系统
每天几千规模的任务数使得任务的调度极其困难,特别是当任务之间存在依赖关系时,显然简单的通过crontab已经无法满足业务的需求。司南调度系统保证所有任务有序正确的运行。
从批量式处理到集成流式处理
随着实时统计分析的需求越来越多,hive查询基于MR任务来实现的缺点日益明显(任务启动开销大)。为了提供实时的数据分析请求,平台开始引入storm流式计算模型。Storm以数据流为驱动。触发计算,每来一条数据就产生一次计算结果,时效性非常高,在业界也得到了丰富的应用。
从关系型数据库到Hbase
初期,数据分析的结果数据都是通过DX导入关系型数据库,以便数据可视化平台调用或者其他系统使用,大量的数据造成关系数据库的日益庞大,带来严重的性能问题。HBase是一个开源、列式分布式的数据库,基于HDFS文件系统,可以方面的和Hive进行集成。经过集成HBase,为可视化平台和线上系统提供服务,降低DX任务量,降低访问延迟。
3 Hive分析实践
3.1 Schema设计
没有通用的schema,只有合适的schema。在设计Hive的schema的时候,需要考虑到存储、业务上的高频查询造成的开销等等,设计适合自己的数据模型。
设置分区表
对于Hive来说,利用分区来设计表总是必要的,分区提供了一种隔离数据和优化查询的便利的方式。设置分区时,需要考虑被设置成分区的字段,按照时间分区一般而言就是一个好的方案,其好处在于其是按照不同时间粒度来确定合适大小的数据积累量,随着时间的推移,分区数量的增长是均匀的,分区的大小也是均匀的。
避免小文件
虽然分区有利于隔离数据和查询,设置过多过细的分区也会带来瓶颈,主要是因为过多的分区意味着文件的数目就越多,过多增长的小文件会给namecode带来巨大的性能压力。同时小文件过多会影响JOB的执行,hadoop会将一个job转换成多个task,即使对于每个小文件也需要一个task去单独处理,带来性能开销。因此,hive表设计的分区不应该过多过细,每个目录下的文件足够大,应该是文件系统中块大小的若干倍。
选择文件格式
Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。
在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。rcfile的聚合运算不一定总是存在,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。
3.2 统计分析
本节将从排序和窗口函数两个方面的介绍Hive的统计分析功能。
排名热门排名在实际的业务场景中经常遇见。例如最受欢迎的书籍、销量TOP100的商品等等。再实际情况下,我们不仅需要考虑各量化指标,还需要考虑置信度问题。
最简单的排名:ORDER BY value LIMIT n
上述查询仅仅考虑了量化指标,排名不够平滑,波动较大。
各种排名方法众多,达观数据分析平台在进行item 排名多采用基于用户投票的排名算法。如基于威尔逊区间的排名算法,该算法可以较好的解决小样本的不准确问题。
图:威尔逊区间
窗口分析函数
Hive提供了丰富了数学统计函数,同时也提供了用户自定义函数的接口,用户可以自定义UDF、UDAF、UDTF Hive 0.11版本开始提供窗口和分析函数(Windowing and Analytics Functions),包括LEAD、LAG、FIRST_VALUE、LAST_VALUE、RANK、ROW_NUMBER、PERCENT_RANK、CUBE、ROLLUP等。窗口函数与聚合函数一样,都是对表子集的操作,从结果上看,区别在于窗口函数的结果不会聚合,原有的每行记录依然会存在。窗口函数的典型分析应用包括:按分区聚合(排序,top n问题)、行间计算(时间序列分析)、关联计算(购物篮分析)。
我们以一个简单的行间计算的例子说明窗口函数的应用(关于其他函数的具体说明,请参考hive文档)。用户阅读行为的统计分析需要从点击书籍行为中归纳统计出来。用户浏览日志结构如下表所示,每条记录为用户的单次点击行为。
通过对连续的用户点击日志分析,通过Hive提供的窗口分析函数可以计算出用户各章节的阅读时间。
SELECT userid, bookid, chapterid, end_time – start_time as read_time
FROM
(
SELECT userid, bookid, chapterid, log_time as start_time,
lead(log_time,1,null) over(partition by userid, bookid order by log_time) as end_time
FROM user_read_log where pt=’2015-12-01’
) t;
通过上述查询既可以找出2015-12-01日所有用户对每一章节的阅读时间。只能通过开发mr代码或者实现udaf来实现上述功能。
窗口分析函数关键在于定义的窗口数据集及其对窗口的操作,通过over(窗口定义语句)来定义窗口。日常分析和实际应用中,经常会有窗口分析应用的场景,例如基于分区的排序、集合、统计等复杂操作。例如我们需要统计每个用户阅读时间最多的3本书:
图:行间计算示意图及代码
窗口函数使得Hive的具备了完整的数据分析功能,在实际的应用环境中,达观数据分析团队大量使用hive窗口分析函数来实现较为复杂的逻辑,提高开发和迭代效率。
3.3 用户画像
用户画像即基于真实数据的用户模型。简单来说,用户画像提取了用户的属性信息、行为信息,从而归纳统计出其人口学特征、偏好特征等。建立用户模型的首要任务就是提取特征,既包括用户基本特征,也包括行为特征和统计特征。
用户模型本质上就是刻画用户兴趣的模型,而用户的兴趣模型是多维度、多尺度的。刻画用户模型还需要从时间上进行度量,甚至是进行多尺度的组合,根据用户行为统计时间的长短,可以将用户的偏好分为短期偏好和长期偏好。偏好的权重即为用户的偏好程度的度量。
对用户偏好的描述,还需要考虑置信度的问题,例如对于一个阅读行为极其稀疏的用户来说,刻画其阅读类别偏好是毫无意义的。
图:用户画像刻画
3.4 反作弊分析
众所周知,存在排名就可能存在作弊。搜索广告、索互联网刷单、刷榜现象层出不穷。一般来说,作弊的目的都是为了提高自己的排名,或者是降低对手的排名。利用Hive对数据进行分析可以过滤掉较明显的作弊数据,达到数据清洗的目的。
例如对于一个刷榜作弊行为,需要作弊着不断刷日志行为来提高其排名,我们可以指定若干规则来过滤作弊数据。如同IP同物品同行为数目异常、同用户ID行为频次异常、同物品ID行为频次异常等等。如下图,如果相比于所有item的平均增长趋势,如果某item的增长趋势相对平均水平过大,那么其作弊的概率就比较高。
图:作弊数据趋势与平均趋势数据对比
作弊分析还需要结合业务需求和特点,采用合适的机器学习算法来进行更进一步的判断和过滤,达到反作弊的目标。
4 Hive优化
达观的数据仓库基于Hive搭建,每日需要处理大量的计算流程,Hive的稳定性和性能至关重要。众多的任务需要我们合理的调节分配集群资源,合理的配置各参数,合理的优化查询。Hive优化包含各个方面,如job个数优化、job的map/reducer个数优化、并行执行优化等等,本节将主要讨论HQL中的无时不在的JOIN的优化经验。
4.1 Join语句
对于上述的join语句,其中book_info表数量为千规模,
INSERT OVERWRITE TABLE read_log_tmp
SELECT a.userid,a.bookid,b.author
FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;
该语句的执行计划为:
图:map join的任务执行流程
对于小数据量,hive会自动采取map join的方式来优化join,从mapreduce的编程模型来看,实现join的方式主要有map端join、reduce端join。Map端join利用hadoop 分布式缓存技术通过将小表变换成hashtable文件分发到各个task,map大表时可以直接判断hashtable来完成join,注意小表的hashtable是放在内存中的,在内存中作匹配,因此map join是一种非常快的join方式,也是一种常见的优化方式。如果小表够小,那么就可以以map join的方式来完成join完成。Hive通过设置hive.auto.convert.join=true(默认值)来自动完成map join的优化,而无需显示指示map join。缺省情况下map join的优化是打开的。
Reduce端join需要reducer来完成join过程,对于上述join代码,reduce 端join的mr流程如下,
图:reduce端join的mapreduce过程
相比于map join, reduce 端join无法再map过程中过滤任何记录,只能将join的两张表的所有数据按照join key进行shuffle/sort,并按照join key的hash值将<key,value>对分发到特定的reducer。Reducer对于所有的键值对执行join操作,例如0号(bookid的hash值为0)reducer收到的键值对如下,其中T1、T2表示记录的来源表,起到标识作用:
图:reduce端join的reducer join
Reducer端join无法避免的reduce截断以及传输的大量数据都会给集群网络带来压力,从上图可以看出所有hash(bookid) % reducer_number等于0的key-value对都会通过shuffle被分发到0号reducer,如果分到0号reducer的记录数目远大于其他reducer的记录数目,显然0号的reducer的数据处理量将会远大于其他reducer,因此处理时间也会远大于其他reducer,甚至会带来内存等其他问题,这就是数据倾斜问题。对于join造成的数据倾斜问题我们可以通过设置参数set Hive.optimize.skewjoin=true,让hive自己尝试解决join过程中产生的倾斜问题。
4.2 Group by
语句我们对user_read_log表按userid goup by语句来继续探讨数据倾斜问题,首先我们explain group by语句:
explain select userid,count(*) from user_read_log group by userid
图:goup by的执行计划
Group by的执行计划按照userid的hash值分发数据,同时在map端也做了本地reduce,group by的shuffle过程是按照hash(userid)来分发的,实际应用中日志中很多用户都是未注册用户或者未登录,userid字段为空的记录数远大于userid不为空的记录数,当所有的空userid记录都分发到特定某一个reducer后,也会带来严重的数据倾斜问题。造成数据倾斜的主要原因在于分发到某个或某几个reducer的数据量远大于其他reducer的数据量。
对于group by造成的数据倾斜问题,我们可以通过设置参数
set hive.map.aggr=true (开启map端combiner);
set hive.groupby.skewindata=true;
这个参数的作用是做reduce操作的时候,拿到的key并不是所有相同值给同一个Reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。虽然多了一轮MR任务,但是可以有效的减少数据倾斜问题可能带来的危险。
Hive解决数据倾斜
正确的设置Hive参数可以在某种程度上避免的数据倾斜问题,合适的查询语句也可以避免数据倾斜问题。要尽早的过滤数据和裁剪数据,减少后续处理的数据量,使得join key的数据分布较为均匀,将空字段随机赋予值,这样既可以均匀分发倾斜的数据:
select userid,name from user_info a
join (
select case when userid is null then cast(rand(47)*100000 as int)
else userid
from user_read_log
) b on a.userid = b.userid
如果用户在定义schema的时候就已经预料到表数据可能会存在严重的数据倾斜问题,Hive自0.10.0引入了skew table的概念,如建表语句
CREATE TABLE user_read_log (userid int,bookid, …)
SKEWED BY (userid) ON (null) [STORED AS DIRECTORIES];
需要注意的是,skew table只是将倾斜特别严重的列的分开存储为不同的文件,每个制定的倾斜值制定为一个文件或者目录,因此在查询的时候可以通过过滤倾斜值来避免数据倾斜问题:
select userid,name from user_info a
join (
select userid from user_read_log where pt=’2015’ and userid is not null
) b on a.userid = b.userid
可以看出,如果不加过滤条件,倾斜问题还是会存在,通过对skew table加过滤条件的好处是避免了mapper的表扫描过滤操作。
4.3 Join的物理优化
Hive内部实现了MapJoinResolver(处理MapJoin)、SkewJoinResolver(处理倾斜join)、CommonJoinResolver(处理普通Join)等类来实现join的查询物理优化(/org/apache/hadoop/hive/ql/optimizer/physical)。
CommonJoinResolver类负责将普通Join转换成MapJoin,Hive通过这个类来实现mapjoin的自动优化。对于表A和表B的join查询,会产生3个分支:
1) 以表A作为大表进行Mapjoin;
2) 以表A作为大表进行Mapjoin;
3) Map-reduce join
由于不知道输入数据规模,因此编译时并不会决定走那个分支,而是在运行时判断走那个分支。需要注意的是要像完成上述自动转换,需要将hive.auto.convert.join.noconditionaltask设置为true(默认值),同时可以手工控制转载进内存的小表的大小(hive.auto.convert.join.noconditionaltask.size)。
MapJoinResolver 类负责迭代各个mr任务,检查每个任务是否存在map join操作,如果有,会将local map work转换成local map join work。
SkewJoinResolver类负责迭代有join操作的reducer任务,一旦单个reducer产生了倾斜,那么就会将倾斜值得数据写入hdfs,然后用一个新的map join的任务来处理倾斜值的计算。虽然多了一轮mr任务,但是由于采用的map join,效率也是很高的。良好的mr模式和执行流程总是至关重要的。
5 总结
本文详细介绍了达观大数据分析平台的基本架构和原理,基于hadoop/hive的大数据分析平台使海量数据的存储、分析、挖掘逐步成为现实,并带来意想不到的益处。作为数据分析平台主力军的Hive仍然处在不断的发展之中,将HQL理解成Mapreduce程序、理解Hadoop的核心能力是更好的使用和优化Hive的根本。达观数据团队也将紧跟技术发展潮流,结合自身的业务需求,采取合理的框架架构,提升数据平台的处理能力。