Skip to content

四火的唠叨

一个纯正程序员的啰嗦

Menu
  • 所有文章
  • About Me
  • 关于四火
  • 旅行映像
  • 独立游戏
  • 资源链接
Menu

Spark 性能优化——和 shuffle 搏斗

Posted on 05/22/201606/23/2019 by 四火

Spark

Spark 的性能分析和调优很有意思,今天再写一篇。主要话题是 shuffle,当然也牵涉一些其他代码上的小把戏。

以前写过一篇文章,比较了几种不同场景的性能优化,包括 portal 的性能优化,web service 的性能优化,还有 Spark job 的性能优化。Spark 的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用 Spark 来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等。事实上,我们都知道没有银弹,但是每一种性能优化场景都有一些特定的 “大 boss”,通常抓住和解决大 boss 以后,能解决其中一大部分问题。比如对于 portal 来说,是页面静态化,对于 web service 来说,是高并发(当然,这两种可以说并不确切,这只是针对我参与的项目总结的经验而已),而对于 Spark 来说,这个大 boss 就是 shuffle。

首先要明确什么是 shuffle。Shuffle 指的是从 map 阶段到 reduce 阶段转换的时候,即 map 的 output 向着 reduce 的 input 映射的时候,并非节点一一对应的,即干 map 工作的 slave A,它的输出可能要分散跑到 reduce 节点 A、B、C、D …… X、Y、Z 去,就好像 shuffle 的字面意思 “洗牌” 一样,这些 map 的输出数据要打散然后根据新的路由算法(比如对 key 进行某种 hash 算法),发送到不同的 reduce 节点上去。(下面这幅图来自 《Spark Architecture: Shuffle》)

shuffle

为什么说 shuffle 是 Spark job 的大 boss,就是因为 Spark 本身的计算通常都是在内存中完成的,比如这样一个 map 结构的 RDD:(String, Seq),key 是字符串,value 是一个 Seq,如果只是对 value 进行一一映射的 map 操作,比如(1)先计算 Seq 的长度,(2)再把这个长度作为元素添加到 Seq 里面去。这两步计算,都可以在 local 完成,而事实上也是在内存中操作完成的,换言之,不需要跑到别的 node 上去拿数据,因此执行的速度是非常快的。但是,如果对于一个大的 rdd,shuffle 发生的时候,就会因为网络传输、数据序列化/反序列化产生大量的磁盘 IO 和 CPU 开销。这个性能上的损失是非常巨大的。

要减少 shuffle 的开销,主要有两个思路:

  1. 减少 shuffle 次数,尽量不改变 key,把数据处理在 local 完成;
  2. 减少 shuffle 的数据规模。

先去重,再合并

比如有 A、B 这样两个规模比较大的 RDD,如果各自内部有大量重复,那么二者一合并,再去重:

A.union(B).distinct()

这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小 shuffle 的开销(注意 Spark 的默认 union 和 Oracle 里面的 “union all” 很像——不去重):

A.distinct().union(B.distinct()).distinct()

看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从 3 个小时减到 20 分钟。

如果中间结果 rdd 如果被调用多次,可以显式调用 cache() 和 persist(),以告知 Spark,保留当前 rdd。当然,即便不这么做,Spark 依然存放不久前计算过的结果(以下来自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

数据量大,并不一定慢。通常情况下,由于 Spark 的 job 是放到内存里面进行运算的,因此一个复杂的 map 操作不一定执行起来很慢。但是如果牵涉到 shuffle,这里面有网络传输和序列化的问题,就有可能非常慢。

类似地,还有 filter 等等操作,目的也是要先对大的 RDD 进行 “瘦身” 操作,然后在做其他操作。

mapValues 比 map 好

明确 key 不会变的 map,就用 mapValues 来替代,因为这样可以保证 Spark 不会 shuffle 你的数据:

A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}

改成:

A.mapValues{case ((B, C), (D, E)) => (B, C, E)}

用 broadcast + filter 来代替 join

这种优化是一种特定场景的神器,就是拿大的 RDD A 去 join 一个小的 RDD B,比如有这样两个 RDD:

  • A 的结构为 (name, age, sex),表示全国人民的 RDD,超大
  • B 的结果为 (age, title),表示 “年龄 -> 称号” 的映射,比如 60 岁有称号 “花甲之年”,70 岁则是 “古稀之年”,这个 RDD 显然很小,因为人的年龄范围在 0~200 岁之间,而且有的 “年龄” 还没有 “称号”

现在我要从全国人民中找出这些有称号的人来。如果直接写成:

A.map{case (name, age, sex) => (age, (name, sex))}
 .join(B)
 .map{case (age, ((name, sex), title)) => (name, age, sex)}

你就可以想象,执行的时候超大的 A 被打散和分发到各个节点去。而且更要命的是,为了恢复一开始的 (name, age, sex) 的结构,又做了一次 map,而这次 map 一样导致 shuffle。两次 shuffle,太疯狂了。但是如果这样写:

val b = sc.broadcast(B.collectAsMap)
A.filter{case (name, age, sex) => b.values.contains(age)}

一次 shuffle 都没有,A 老老实实待着不动,等着全量的 B 被分发过来。

另外,在 Spark SQL 里面直接有 BroadcastHashJoin,也是把小的 rdd 广播出去。

不均匀的 shuffle

在工作中遇到这样一个问题,需要转换成这样一个非常巨大的 RDD A,结构是 (countryId, product),key 是国家 id,value 是商品的具体信息。当时在 shuffle 的时候,这个 hash 算法是根据 key 来选择节点的,但是事实上这个 countryId 的分布是极其不均匀的,大部分商品都在美国(countryId=1),于是我们通过 Ganglia 看到,其中一台 slave 的 CPU 特别高,计算全部聚集到那一台去了。

找到原因以后,问题解决就容易了,要么避免这个 shuffle,要么改进一下 key,让它的 shuffle 能够均匀分布(比如可以拿 countryId+商品名称的 tuple 作 key,甚至生成一个随机串)。

明确哪些操作必须在 master 完成

如果想打印一些东西到 stdout 里去:

A.foreach(println)

想把 RDD 的内容逐条打印出来,但是结果却没有出现在 stdout 里面,因为这一步操作被放到 slave 上面去执行了。其实只需要 collect 一下,这些内容就被加载到 master 的内存中打印了:

A.collect.foreach(println)

再比如,如果遇到 RDD 操作嵌套的情况,通常考虑优化掉,因为只有 master 才能去理解和执行 RDD 的操作,slave 只能处理被分配的 task 而已。比如:

A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}

就可以用 join 来代替:

A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}

用 reduceByKey 代替 groupByKey

这一条应该是比较经典的了。reduceByKey 会在当前节点(local)中做 reduce 操作,也就是说,会在 shuffle 前,尽可能地减小数据量。而 groupByKey 则不是,它会不做任何处理而直接去 shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为 reduceByKey 要求参与运算的 value,并且和输出的 value 类型要一样,但是 groupByKey 则没有这个要求。

有一些类似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。

另外,还有一条类似的是用 treeReduce 来代替 reduce,主要是用于单个 reduce 操作开销比较大,可以条件 treeReduce 的深度来控制每次 reduce 的规模。

文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》

×Scan to share with WeChat

你可能也喜欢看:

  1. Notes: Spark metrics
  2. Hadoop 的 Map-side join 和 Reduce-side join
  3. Spark 的性能调优
  4. 三次性能优化经历
  5. 从淘汰 Oracle 数据库的事情说起

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

订阅·联系

四火,啰嗦的程序员一枚,现居西雅图

Amazon Google Groovy Hadoop Haskell Java JavaScript LeetCode Oracle Spark 互联网 亚马逊 前端 华为 历史 同步 团队 图解笔记 基础设施 工作 工作流 工具 工程师 应用系统 异步 微博 思考 技术 数据库 曼联 测试 生活 眼界 程序员 管理 系统设计 缓存 编程范型 美股 英语 西雅图 设计 问题 面向对象 面试

分类

  • Algorithm and Data Structure (30)
  • Concurrency and Asynchronization (6)
  • System Architecture and Design (43)
  • Distributed System (18)
  • Tools Frameworks and Libs (13)
  • Storage and Data Access (8)
  • Front-end Development (33)
  • Programming Languages and Paradigms (55)
  • Testing and Quality Assurance (4)
  • Network and Communication (6)
  • Authentication and Authorization (6)
  • Automation and Operation Excellence (13)
  • Machine Learning and Artificial Intelligence (6)
  • Product Design (7)
  • Hiring and Interviews (14)
  • Project and Team Management (14)
  • Engineering Culture (17)
  • Critical Thinking (25)
  • Career Growth (57)
  • Life Experience and Thoughts (45)

推荐文章

  • 聊一聊分布式系统中的时间
  • 谈谈分布式锁
  • 常见分布式系统设计图解(汇总)
  • 系统设计中的快速估算技巧
  • 从链表存在环的问题说起
  • 技术面试中,什么样的问题才是好问题?
  • 从物理时钟到逻辑时钟
  • 近期面试观摩的一些思考
  • RSA 背后的算法
  • 谈谈 Ops(汇总 + 最终篇):工具和实践
  • 不要让业务牵着鼻子走
  • 倔强的程序员
  • 谈谈微信的信息流
  • 评审的艺术——谈谈现实中的代码评审
  • Blog 安全问题小记
  • 求第 K 个数的问题
  • 一些前端框架的比较(下)——Ember.js 和 React
  • 一些前端框架的比较(上)——GWT、AngularJS 和 Backbone.js
  • 工作流系统的设计
  • Spark 的性能调优
  • “残酷” 的事实
  • 七年工作,几个故事
  • 从 Java 和 JavaScript 来学习 Haskell 和 Groovy(汇总)
  • 一道随机数题目的求解
  • 层次
  • Dynamo 的实现技术和去中心化
  • 也谈谈全栈工程师
  • 多重继承的演变
  • 编程范型:工具的选择
  • GWT 初体验
  • java.util.concurrent 并发包诸类概览
  • 从 DCL 的对象安全发布谈起
  • 不同团队的困惑
  • 不适合 Hadoop 解决的问题
  • 留心那些潜在的系统设计问题
  • 再谈大楼扔鸡蛋的问题
  • 几种华丽无比的开发方式
  • 我眼中的工程师文化
  • 观点的碰撞
  • 谈谈盗版软件问题
  • 对几个软件开发传统观点的质疑和反驳
  • MVC 框架的映射和解耦
  • 编程的未来
  • DAO 的演进
  • 致那些自嘲码农的苦逼程序员
  • Java 多线程发展简史
  • 珍爱生命,远离微博
  • 网站性能优化的三重境界
  • OSCache 框架源码解析
  • “ 你不适合做程序员”
  • 画圆画方的故事

近期评论

  • Ticket: TRANSACTION 1.922915 BTC. Go to withdrawal >> https://yandex.com/poll/enter/BXidu5Ewa8hnAFoFznqSi9?hs=20bd550f65c6e03103876b28cabc4da6& on 倔强的程序员
  • panshenlian.com on 初涉 ML Workflow 系统:Kubeflow Pipelines、Flyte 和 Metaflow
  • panzhixiang on 关于近期求职的近况和思考
  • Anonymous on 闲聊投资:亲自体验和护城河
  • 四火 on 关于近期求职的近况和思考
  • YC on 关于近期求职的近况和思考
  • mafulong on 常见分布式基础设施系统设计图解(四):分布式工作流系统
  • 四火 on 常见分布式基础设施系统设计图解(八):分布式键值存储系统
  • Anonymous on 我裸辞了
  • https://umlcn.com on 资源链接
© 2025 四火的唠叨 | Powered by Minimalist Blog WordPress Theme