转载

Hadoop优化 第一篇 : HDFS/MapReduce

比较惭愧,博客很久(半年)没更新了。最近也自己搭了个博客,wordpress玩的还不是很熟,感兴趣的朋友可以多多交流哈!地址是:http://www.leocook.org/

另外,我建了个QQ群:305994766,希望对大数据、算法研发、系统架构感兴趣的朋友能够加入进来,大家一起学习,共同进步(进群请说明自己的公司-职业-昵称)。

1.应用程序角度进行优化

1.1.减少不必要的reduce任务

若对于同一份数据需要多次处理,可以尝试先排序、分区,然后自定义InputSplit将某一个分区作为一个Map的输入,在Map中处理数据,将Reduce的个数设置为空。

1.2.外部文件引用

如字典、配置文件等需要在Task之间共享的数据,可使用分布式缓存DistributedCache或者使用-files

1.3.使用Combiner

combiner是发生在map端的,作用是归并Map端输出的文件,这样Map端输出的数据量就小了,减少了Map端和reduce端间的数据传输。需要注意的是,Combiner不能影响作业的结果;不是每个MR都可以使用Combiner的,需要根据具体业务来定;Combiner是发生在Map端的,不能垮Map来执行(只有Reduce可以接收多个Map任务的输出数据)

1.4.使用合适的Writable类型

尽可能使用二进制的Writable类型,例如:IntWritable, FloatWritable等,而不是Text。因为在一个批处理系统中将数值转换为文本时低效率的。使用二进制的Writable类型可以降低cpu资源的消耗,也可以减少Map端中间数据、结果数据占用的空间。

1.5.尽可能的少创建新的Java对象

a)需要注意的Writable对象,例如下面的写法:

public void map(...) {     …     for (String word : words) {         output.collect(new Text(word), new IntWritable(1));     } }

这样会冲去创建对象new Text(word)和new IntWritable(1)),这样可能会产生海量的短周期对象。更高效的写法见下:

class MyMapper … {  Text wordText = new Text();  IntWritable one = new IntWritable(1);  public void map(...) {   for (String word: words) {   wordText.set(word);    output.collect(wordText, one);   }  } } 

b)对于可变字符串,使用StringBuffer而不是String

String类是经过final修饰的,那么每次对它的修改都会产生临时对象,而SB则不会。

2. Linux系统层面上的配置调优

2.1. 文件系统的配置

a) 关闭文件在被操作时会记下时间戳:noatime和nodiratime

b) 选择I/O性能较好的文件系统(Hadoop比较依赖本地的文件系统)

2.2. Linux文件系统预读缓冲区大小

命令blockdev

2.3. 去除RAID和LVM

2.4. 增大同时打开的文件数和网络连接数

ulimit

net.core.somaxconn

2.5. 关闭swap分区

在Hadoop中,对于每个作业处理的数据量和每个Task中用到的各种缓冲,用户都是完全可控的。

/etc/sysctl.conf

2.6. I/O调度器选择

详情见AMD的白皮书

3. Hadoop平台内参数调优

Hadoop相关可配置参数共有几百个,但是其中只有三十个左右会对其性能产生显著影响。

3.1. 计算资源优化

a) 设置合理的slot(资源槽位)

mapred.tasktracker.map.tasks.maximum / mapred.tasktracker.reduce.tasks.maximum

参数说明:每个TaskTracker上可并发执行的Map Task和Reduce Task数目

默认值:都是2

推荐值:根据具体的节点资源来看,推荐值是(core_per_node)/2~2*(cores_per_node)

单位:无

3.2. 节点间的通信优化

a) TaskTracker和JobTracker之间的心跳间隔

这个值太小的话,在一个大集群中会造成JobTracker需要处理高并发心跳,可能会有很大的压力。

建议集群规模小于300时,使用默认值3秒,在此基础上,集群规模每增加100台,会加1秒。

b) 启用带外心跳(out-of-band heartbeat)

mapreduce.tasktracker.outofband.heartbeat

参数说明:主要是为了减少任务分配延迟。它与常规心跳不同,一般的心跳是一定时间间隔发送的,而带外心跳是在任务运行结束或是失败时发送,这样就能在TaskTracker节点出现空闲资源的时候能第一时间通知JobTracker。

3.3. 磁盘块的配置优化

a) 作业相关的磁盘配置:mapred.local.dir

参数说明:map本地计算时所用到的目录,建议配置在多块硬盘上

b) 存储相关的磁盘配置(HDFS数据存储):dfs.data.dir

参数说明:HDFS的数据存储目录,建议配置在多块硬盘上,可提高整体IO性能

例如:

<property>   <name>dfs.name.dir</name>   <value>/data1/hadoopdata/mapred/jt/,/data2/hadoopdata/mapred/jt/</value> </property>

c) 存储相关的磁盘配置(HDFS元数据存储):dfs.name.dir

参数说明:HDFS的元数据存储目录,建议设置多目录,每个多目录都可保存元数据的一个备份注:要想提升hadoop整体IO性能,对于hadoop中用到的所有文件目录,都需要评估它磁盘IO的负载,对于IO负载可能会高的目录,最好都配置到多个磁盘上,以提示IO性能

3.4. RPC Handler个数和Http线程数优化

a) RPC Handler个数(mapred.job.tracker.handler.count)

参数说明:JobTracker需要并发的处理来自各个TaskTracker的RPC请求,可根据集群规模和并发数来调整RPC Handler的个数。

默认值:10

推荐值:60-70,最少要是TaskTracker个数的4%

单位:无

b) Http线程数(tasktracker.http.threads)

在Shuffle阶段,Reduce Task会通过Http请求从各个TaskTracker上读取Map Task的结果,TaskTracker是使用Jetty Server来提供服务的,这里可适量调整Jetty Server的工作线程以提高它的并发处理能力。

默认值:40

推荐值:50-80+

3.5. 选择合适的压缩算法

mapred.compress.map.output / Mapred.output.compress

map输出的中间结果时需要进行压缩的,指定压缩方式(Mapred.compress.map.output.codec/ Mapred.output.compress.codec)。推荐使用LZO压缩。

3.6. 启用批量任务调度(现在新版本都默认支持了)

a) Fair Scheduler

mapred.fairscheduler.assignmultiple

b) Capacity Scheduler

3.7. 启用预读机制(Apache暂时没有)

Hadoop是顺序读,所以预读机制可以很明显的提高HDFS的读性能。

HDFS预读:

dfs.datanode.readahead :true

dfs.datanode.readahead.bytes :4MB

shuffle预读:

mapred.tasktracker.shuffle.fadvise : true

mapred.tasktracker.shuffle.readahead.bytes : 4MB

3.8.HDFS相关参数优化

1) dfs.replication

参数说明:hdfs文件副本数

默认值:3

推荐值:3-5(对于IO较为密集的场景可适量增大)

单位:无

2) dfs.blocksize

参数说明:

默认值:67108864(64MB)

推荐值:稍大型集群建议设为128MB(134217728)或256MB(268435456)

单位:无

3) dfs.datanode.handler.count

参数说明:DateNode上的服务线程数

默认值:10

推荐值:

单位:无

4) fs.trash.interval

参数说明:HDFS文件删除后会移动到垃圾箱,该参数时清理垃圾箱的时间

默认值:0

推荐值:1440(1day)

单位:无

5) io.sort.factor

参数说明:当一个map task执行完之后,本地磁盘上(mapred.local.dir)有若干个spill文件,map task最后做的一件事就是执行merge sort,把这些spill文件合成一个文件(partition)。执行merge sort的时候,每次同时打开多少个spill文件由该参数决定。打开的文件越多,不一定merge sort就越快,所以要根据数据情况适当的调整。

默认值:10

推荐值:

单位:无

6) mapred.child.java.opts

参数说明:JVM堆的最大可用内存

默认值:-Xmx200m

推荐值:-Xmx1G | -Xmx4G | -Xmx8G

单位:-Xmx8589934592也行,单位不固定

7) io.sort.mb

参数说明:Map Task的输出结果和元数据在内存中占的buffer总大小,当buffer达到一定阀值时,会启动一个后台进程来对buffer里的内容进行排序,然后写入本地磁盘,形成一个split小文件

默认值:100

推荐值:200 | 800

单位:兆

8) io.sort.spill.percent

参数说明:即io.sort.mb中所说的阀值

默认值:0.8

推荐值:0.8

单位:无

9) io.sort.record

参数说明:io.sort.mb中分类给元数据的空间占比

默认值:0.05

推荐值:0.05

单位:无

10) Mapred.reduce.parallel

参数说明:Reduce shuffle阶段copier线程数。默认是5,对于较大集群,可调整为16~25

默认值:5

推荐值:16~25

单位:无

4.系统实现角度调优

https://www.xiaohui.org/archives/944.html

主要针对HDFS进行优化,HDFS性能低下的两个原因:调度延迟和可移植性

4.1. 调度延迟

关于调度延迟主要是发生在两个阶段:

a) tasktracker上出现空余的slot到该tasktracker接收到新的task;

b) tasktracker获取到了新的Task后,到连接上了datanode,并且可以读写数据。

之所以说这两个阶段不够高效,因为一个分布式计算系统需要解决的是计算问题,如果把过多的时间花费在其它上,就显得很不合适,例如线程等待、高负荷的数据传输。

下面解释下会经历上边两个阶段发生的过程:

a) 当tasktracker上出现slot时,他会调用heartbeat方法向jobtracker发送心跳包(默认时间间隔是3秒,集群很大时可适量调整)来告知它,假设此时有准备需要执行的task,那么jobtracker会采用某种调度机制(调度机制很重要,是一个可以深度研究的东东)选择一个Task,然后通过调用heartbeat方法发送心跳包告知tasktracker。在该过程中,HDFS一直处于等待状态,这就使得资源利用率不高。

b) 这个过程中所发生的操作都是串行化的:tasktracker会连接到namenode上获取到自己需要的数据在datanode上的存储情况,然后再从datanode上读数据,在该过程中,HDFS一直处于等待状态,这就使得资源利用率不高。

若能减短hdfs的等待时间;在执行task之前就开始把数据读到将要执行该task的tasktracker上,减少数据传输时间,那么将会显得高效很多。未解决此类问题,有这样几种解决方案:重叠I/O和CPU阶段(pipelining),task预取(task prefetching),数据预取(data prefetching)等。

4.2. 可移植性

Hadoop是Java写的,所以可移植性相对较高。由于它屏蔽了底层文件系统,所以无法使用底层api来优化数据的读写。在活跃度较高的集群里(例如共享集群),大量并发读写会增加磁盘的随机寻道时间,这会降低读写效率;在大并发写的场景下,还会增加大量的磁盘碎片,这样将会大大的增加了读数据的成本,hdfs更适合文件顺序读取。

对于上述问题,可以尝试使用下面的解决方案:

tasktracker现在的线程模型是:one thread per client,即每个client连接都是由一个线程处理的(包括接受请求、处理请求,返回结果)。那么这一块一个拆分成两个部分来做,一组线程来处理和client的通信(Client Threads),一组用于数据的读写(Disk Threads)。

想要解决上述两个问题,暂时没有十全十美的办法,只能尽可能的权衡保证调度延迟相对较低+可移植性相对较高。

4.3. 优化策略:Prefetching与preshuffling

a) Prefetching包括Block-intra prefetching和Block-inter prefetching:

Block-intra prefetching:对block内部数据处理方式进行了优化,即一边进行计算,一边预读将要用到的数据。这种方式需要解决两个难题:一个是计算和预取同步,另一个是确定合适的预取率。前者可以使用进度条(processing bar)的概念,进度条主要是记录计算数据和预读数据的进度,当同步被打破时发出同步失效的通知。后者是要根据实际情况来设定,可采用重复试验的方法来确定。

Block-inter prefetching:在block层面上预读数据,在某个Task正在处理数据块A1的时候,预测器能预测接下来将要读取的数据块A2、A3、A4,然后把数据块A2、A3、A4预读到Task所在的rack上。

b) preshuffling数据被map task处理之前,由预测器判断每条记录将要被哪个reduce task处理,将这些数据交给靠近reduce task的map task来处理。

参考资料:

cloudera官方文档

http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/

AMD白皮书(较为实用)

http://www.admin-magazine.com/HPC/content/download/9408/73372/file/Hadoop_Tuning_Guide-Version5.pdf

国内博客(大部分内容都是AMD白皮书上的翻译):

http://dongxicheng.org/mapreduce/hadoop-optimization-0/

http://dongxicheng.org/mapreduce/hadoop-optimization-1/

正文到此结束
Loading...