【翻译】spark 调优
官网地址:https://spark.apache.org/docs/2.1.0/tuning.html
摘要:鉴于 Spark 基于内存计算这一天性,以下集群资源可能会造成 Spark 程序的瓶颈:CPU,带宽和内存。通常情况下,如果内存足够的情况下,瓶颈就是网络带宽,但有时,你也需要做一些优化,例如以序列化的格式存储RDD,来减少内存使用。本指南将涵盖两个主要主题:数据序列化(这对于良好的网络性能至关重要,并且还可以减少内存使用)、内存调优。同时也会讨论一些较小的主题。
一、数据序列化
序列化在分布式系统中扮演着重要的角色,那些会让对象序列化过程缓慢,或是会消耗大量字节存储的序列化格式会大大降低计算速率。通常这是用户在优化 Spark 应用程序中应该调整的第一件事。Spark旨在在便利性(允许您使用操作中的任何 Java 类型)和性能之间取得平衡。它提供了下面两种序列化库:
- Java serialization 默认情况下,Spark 序列化一个对象时使用 Java 自带的
ObjectOutputStream
框架,对于任何实现了java.io.Serializable
接口的类都有效。也可以通过继承java.io.Externalizable
来自定义你的序列化过程。Java 序列化很灵活但性能差速度很慢,并导致许多类的大型序列化格式。 - Kryo serialization Spark 也可以使用更快的序列化类库 Kryo library(version 2)来序列化对象。相比 Java 序列化,Kryo 更快和更加紧凑(通常提供 10 倍于 Java 序列化的效率)。但对于所有可序列化的类型不是全部都支持,因此为了更好的效率,你需要提前为你的程序注册这些类。
可以通过设置初始化时的 SparkConf
和调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
来切换到 Kryo 模式。这项配置不仅可以对 worker 节点之间的 shuffle 数据起作用,还可以在将RDD序列化到硬盘上时起作用。Kryo 之所以没有成为默认设置是因为使用者需要自行注册一些类,但是我们建议在一些网络密集型应用中尝试使用 kryo 序列化。从 Spark2.0.0 开始,当对于简单类型,简单类型数组和字符串类型的 RDD 进行 shuffling,Spark 已经使用 Kryo 进行了内部整合。
Spark 自动对许多在 Twitter chill 库中的 AllScalaRegistrar 所覆盖的许多常用核心 Scala 类注册了 Kryo。
要使自定义类应用 Kryo 注册,你需要 registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo文档 介绍了更多高级注册选项,例如添加自定义序列化代码。
如果你的对象很大,你可能需要增大 spark.kryoserializer.buffer
配置。这个值需要足够大以至于能够容纳下被序列化的最大的对象。
最后,如果你没有注册你的自定义类,Kryo 仍然可以工作,但它将不得不为每个对象存储完整的类名,这是浪费的。
二、内存调优
在对内存的使用进行调优时有三个考虑点:用户对象的内存使用量(用户可能希望整个数据集都保存在内存中),访问这些对象的开销和垃圾回收的开销(如果用户的对象周转率很高)
默认情况下,java 对象的访问是很快的,但很容易就会消耗比字段中原始数据多2-5倍的空间。这是以下几个原因导致的:
- 每个不同的Java对象都有一个“object header”,这个头部大概会占用 16bytes 的空间并且会包含指向类的指针等信息。对于一个数据量很小的对象(例如一个Int对象),这些“对象头”信息占用的内存空间会比数据占用的空间更大。
- Java 字符串比原始字符串数据多了大约 40 个字节的开销(因为它们是以
Char
s 数据的形式存储的,并且保存了一些例如length
的额外信息),并且由于字符串内部的 UTF-16 编码,会将它存储为两个 bytes。所以一个有 10 个字符的字符串会很容易消耗 60bytes。 - 常见的集合类,例如
HashMap
和LinkedList
,使用了链式结构,它对于每个 entry(例如Map.Entry
)会有一个"wrapper"对象。这个对象不仅包含了“头信息”,而且存储了指向下一个对象的指针(通常会占用 8bytes)。 - 原始基本数据类型的集合对象在存储每一个基本类型时还是用了包装类对象,例如
java.lang.Integer
本节会以 Spark 的内存管理机制的概述开始,然后讨论用户能在应用程序中采用的更有效的内存策略。我们将着重描述如何确定对象的内存占用和如何改变数据结构和序列化方式来降低内存占用。然后,我们会介绍如何优化 Spark 的缓存大小和 Java 垃圾回收。
2.1 内存管理概述
Spark 的内存使用基本上可以分为两大类:执行内存和存储内存。执行内存指的是在 shuffle,join,和 aggregation 计算中使用的内存,存储内存指的是集群中缓存和传播内部数据使用的内存。在 Spark 中,执行和存储共享一个统一的区域 M。当没有执行内存使用时,存储可以获得全部的可用内存,反之亦然。执行在必要的时候可能会驱逐存储,但只有在总存储内存使用量低于某个阈值 R 时才会触发。用另一句话来说,R 描述在统一内存 M 中一定不会被驱逐的缓存 block 子集。由于实现的复杂性,存储内存不会驱逐执行内存。
这种设计方案确保了几个令人满意的特性。首先,不使用缓存的应用可以使用全部内存来用于执行,从而消除不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的不受驱逐的数据库存储空间 R。最后,这种方法为各种工作负载提供了合理的开箱即用性能,不需要用户了解内存如何内部划分的专门知识。
尽管有两个相关的配置,但是通常用户不需要对它们进行调整,因为默认值适用于大多数工作负载:
spark.memory.fraction
:代表了上文中的 M,表示内存占用(JVM heap space-300MB)比率(默认值 0.6)。剩余的 40% 的空间主要用来存储数据结构、内部元数据并预防由稀疏、大记录引发 OOMspark.memory.storageFraction
:表示上文提到的 R,表示从 M 中划分出 R 大小的一个区域(默认值 0.5),这个被划分出的区域中的缓存数据块对于计算模块的驱逐是免疫的
spark.memory.fraction
的值应该设置为可以适配 JVM 的老年代或终身代的使用。具体可以参考下面的 GC 章节。
2.2 内存消耗确定
判断一个数据集到底消耗多少内存的最佳方式是:将数据集加载到 RDD 并将其缓存下来,然后去 Spark Web UI 查看“Storage”页面。这个页面将告诉你,你的 RDD 正在申请多大的内存。
估算某一个特定对象的内存消耗,可以使用 SizeEstimator
的 estimate
方法,这对于尝试不同的数据布局来减少内存使用,以及确定一个广播变量将占用每个执行器堆的空间量是很有用的。
2.3 数据结构调优
减少内存消耗的首选方法是避免使用会增加开销的 java 特性,例如基于指针的数据结构和包装器对象。下面是集中解决方法:
- 使用对象数组和原始类型来构造你的数据结构,而不是使用标准的 Java 和 Scala 集合类(例如 HashMap)。fastutil 库提供了针对原始类型的便捷的集合类,这些类兼容 Java 标准库。
- 避免使用包含过多小对象和指针的嵌套结构。
- 考虑使用数字和枚举对象代替字符串作为键值。
- 如果你使用的随机内存少于 32G,设置 JVM 的标志
-XX:+UseCompressedOops
来使引用只占用 4 字节而不是 8 字节。你可以在 spark-env.sh 中添加这个配置项
2.4 序列化RDD存储
当尽管进行了调优,但你的对象仍然太大,无法有效存储时,一个更简单的方法是使用序列化的格式来存储它们以此来减少内存的使用,使用 RDD persistance API 来设置序列化的存储级别,例如 MEMORY_ONLY_SER
。Spark 将 RDD 的每一个分区作为一个大的字节数组进行存储。以序列化格式存储数据的唯一缺点是访问速度较慢,因为不得不在使用中反序列化每一个对象。如果您想以序列化的形式缓存数据,那么我们强烈建议 使用 Kryo,因为它比 Java 序列化(当然也要比原始 Java 对象)小得多。
2.5 垃圾回收调优
当你的程序中存储的 RDD 有大量的替换和变更时,JVM 垃圾回收可能会造成问题(当对一个 RDD 仅读取一次,然后在其上进行多次操作时并不会带来问题)。当 Java 需要将旧对象驱逐出去来为新对象腾出空间时,它需要跟踪所有的 Java 对象来找到未引用的对象。这里需要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构(例如使用 int
的数组而不是 LinkedList
)会极大地减少消耗。一个更好的方法是以序列化的形式持久化对象,如上所述:每个 RDD 的分区只会有一个对象(一个字节数组)。所以当存在 GC 问题时,在尝试其他技巧前,首先要尝试的是使用序列化的缓存。
由于任务的工作内存(运行任务所需的空间量)和在节点上缓存的 RDDs 之间的干扰, GC 也可能是一个问题。我们将讨论如何控制分配给 RDD 缓存的空间以减轻这个问题。
2.5.1 测量GC的影响
GC 调优的第一步是收集 GC 发生频率和 GC 时间的统计。可以通过增加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
的 Java 选项来实现。http://spark.apache.org/docs/latest/configuration.html#Dynamically-Loading-Spark-Properties 中详细描述了将 Java 参数传递给 Spark Job 的方法。下次 Spark 应用程序运行时,就可以看到 Woker 节点的 log 会打印出 GC 信息。注意这些 log 是在集群中的 workder 节点,而不是 driver 程序中。
2.5.2 GC调优
为了进一步优化垃圾收集,我们首先需要了解 JVM 中关于内存管理的一些基本信息:
- Java 对内存被分为两个区域,新生代和老年代。新生代是为了保存寿命较短的对象,而老年代是为了保持寿命更长的对象。
- 新生代被进一步划分为三个区域: Eden,Survivor1,Survivor2
- 垃圾收集过程的简化描述:当 Eden 区使用占满时,一个 minor GC 会在 Eden 中发生,仍然存活的对象会从 Eden 和 Survivor1 区域中复制到 Survivor2。如果一个对象存活的时间够久或是 Survivor2 区域空间占满时,它会移动到老年代。最后当老年空间接近占满时,会触发 full GC。
Spark 中的 GC 调优的目的是为了确保只有长期存在的 RDD 会存储在老年代中,新生代有足够大的空间来存储短期对象。这有助于在任务执行期间避免收集临时对象造成的 full GC。下面是一些可用步骤:
- 通过收集 GC 状态来检查是否有太多 GC。如果在一个任务完成之前触发了好几次 full GC,意味着任务执行的可用内存不足。
- 如果有许多 minor GC 但是没有太多 major GC,可以为 Eden 分配更多内存。可以通过估计任务的来村来设置 Eden 的大小。如果 Eden 的大小被设定为 E,可以通过
-Xmn=4/3*E
来设置新生代的大小。(4/3 的比例是为了 Survivor 使用的空间) - 在打印出来的 GC 状态中,如果老年代接近占满,可以通过减低
spark.memory.fraction
来减少用于缓存的内存。缓存较少相比减慢任务执行速率要好。另外,也可以考虑减少新生代的大小。这意味着降低-Xmn
的设置。或者尝试获取 JVM 的NewRatio
参数,许多 JVM 默认设置为 2,意味着老年代占据了 2/3 的堆内存。它应该足够大,一直未这个比例超过了spark.memory.fraction
- 通过设置
-XX:+UseG1GC
来使用 G1GC 垃圾回收器。在某些情况,垃圾收集是一个瓶颈,它可以提高性能。注意,在堆内存够大时,需要通过-XX:G1HeapRegionSize
来增大 G1区域大小。 - 如果你的任务是从 HDFS 中读取数据,可以使用从 HDFS 读取的数据块的大小来估计任务所使用的内存数量。注意,解压缩块的大小通常是块大小的 2-3 倍,因此,如果我们希望获得 3-4 个任务空间,而 HDFS 的块大小是 128MB,我们可以估计 Eden 的大小为
4*3*128MB
。 - 更改设置后持续监视 GC 的频率和时间
我们的经验表明,GC 调优的效果取决于您的应用程序和可用内存的数量。在网上有更多的调优选项,管理频繁的 GC 发生的频率可以帮助减少开销。
执行器的 GC 调整标志可以通过设置作业配置中的 spark.executor.extraJavaOptions
来指定。
三、其他因素
3.1 并行级别
除非每一个操作的并行度都设置的足够高,要不然集群不会被充分利用。Spark 自动根据文件的大小设定了运行在其上的 map 任务的数量(也可以通过 SparkContext.textFile
参数来控制),并且对于分布式的 reduce 操作,例如 groupBykey
和 reduceByKey
,它会使用父 RDD 中最大的分区数量。你可以将并行度作为一个次级参数(参考文档 spark.PairRDDFunctions),或是设置在配置文件 spark.default.parallelism
来改变默认配置。通常情况下,我们推荐为集群中的每个 CPU 分配 2-3 个任务。
3.2 Reduce任务的内存使用
有些时候,你会因为 task 中的数据集,例如 groupByKey
,太大而造成 OutOfMemoryError,而不是 RDD 和内存不匹配。Spark 的 shuffle 操作(sortByKey
,groupByKey
,reduceByKey
,join
等等)会在每个任务中创建一个 hash table 来执行 grouping 操作,这个操作经常会很大。最简单的处理方案是增加并行度,让每个任务获取到的数据集更小。Spark 对于短于 200ms 的任务执行的很好,因为它在多个任务中重用一个 executor JVM,任务的启动成本很低,因此,你可以安全地将并行级别增加到您的集群中的核心数量。
3.3 广播大变量
使用 SparkContext
中的广播特性,你可以极大地减少序列化任务的大小,和集群中的启动任务开销。如果你的任务用到了 driver 中的一个大的对象(例如一个 static lookup table),可以考虑将它变为广播变量。Spark 将每个任务的序列化大小打印在主服务器上,因此您可以查看它来决定您的任务是否太大;一般来说,大于 20kb 的任务很可能是值得优化的。
3.4 数据本地化
数据本地化对于 Spark 任务的性能有很大的影响。如果数据和操作的代码在一起,那么计算往往很快。但是由于代码和数据是分离开的,它们中总会有一方要向另一方传递。通常,将序列化的代码从一个地方发送到另一个地方比传输数据块要快,因为代码的大小比数据要小得多。Spark 构建了它围绕数据局部性原则的调度。
数据本地化是数据和处理它的代码之间的距离。下面有基于数据当前维值的几种本地化设置。通过选取最短距离来达成最快的处理速度:
PROCESS_LOCAL
数据在运行代码的同一个 JVM 中。这是最优选择NODE_LOCAL
数据在同一个节点上。例如可能在同一个节点上的 HDFS 上,或是在同一个节点上的另一个处理器中。这比 PROCESS_LOCAL 稍微慢一点,因为这涉及到进程间的数据通信NO_PREF
数据可以从任何地方同样快速地访问,并且没有本地偏好RACK_LOCAL
数据位于相同的服务器机架上。数据在同一个机架上的另一台服务器上,所以需要通过网络发送,通常需要通过一个网关ANY
数据是在网络上的其他地方,而不是在同一个机架上
Spark 希望把所有的任务都安排在最合适的位置上,但这并不会总是可行的。在没有任何空闲执行机的情况下,Spark 会切换到较低的局部性。有两种选择:a. 在同一个服务器上等待 CPU 空闲,再提交任务 b. 立即在一个其他执行机上开始执行任务,并将数据移动过去。
Spark 通常情况下会等待 CPU 空闲。一旦等待时间超时,它会开始移动数据到较远的空闲 CPU 上。每个级别之间的等待超时可以单独配置,也可以在一个参数中组合在一起。具体配置参考 spark.locality
。默认配置通常效果较好,可以根据任务特性来修改这些配置。
四、总结
本文是针对 Spark 应用程序调优中需要注意的主要问题的一个简单指南,主要关注数据序列化和内存调优。对大多数应用来说,切换到 Kryo 序列化并 persist 序列化数据可以解决大多数性能问题。