spark本来已经是一个分布式的计算平台,按说不应该手工去处理并行/异步的事情。但是,最近我实现的一个spark任务,需要一次写出数十个分区的数据,虽然这些分区的数据之间完全独立,但坑爹的是,基础数据平台提供的写数据接口只支持 同步 的一次写一个分区的数据。这样造成的结果就是,用循环来实现时,虽然我有很多个计算节点,数据(RDD)也分布于各个节点之上,但是我只能等一个分区写完成后,再写下一个分区:因为“写分区”这个任务的下发是同步阻塞的。
partitions .map(part => writeToDisk(data.filter(part), part))
这里要感谢scala提供的Future方案。它可以方便的将同步的阻塞操作包装成异步操作并行下发。
配合Await.ready操作来等待所有future完成,我们可以将上面的代码改写为:
partitions .map(part => Future { writeToDisk(data.filter(data.part == part), part) }) .map(f => Await.ready(f, Duration.Inf))
在spark中,我们知道使用cache/persist可以避免数据流的重复计算。在这里也是一样,Future之前需要将data用cache/persist打个点。
但是这样还!不!够!
在这里我们希望发生的事情是data在future之前先计算好(只计算一次),然后异步的分发下去写对应的分区。
但是由于spark的惰性计算特性,使用Future之后,多个job并行下发,每个job在执行时data都还没有计算出来,也就没有cache的数据。反应到spark ui上的jobs页面的情况就是,看上去多个job并行执行了,但是cache操作并没有带来tasks skipped。
这时,我们需要在future之前,强制把data计算出来并cache住。这里其实只需要调用一些不影响数据的action算子即可,例如data.count()。
最终的结果,在使用上面的改进措施之后,我的这个spark任务执行时间缩短了约60%。
推荐阅读:
使用双buffer无锁化
不要拷贝
一个新朋友 Git Hooks转载请注明出处: http://blog.guoyb.com/2018/04/21/spark-scala-future/
欢迎使用微信扫描下方二维码,关注我的微信公众号TechTalking,技术·生活·思考: