2016/05/27 Spark 调优

Spark 调优最重要的就是序列化和内存调优。通常情况下,使用 Kryo 序列化库就可以解决大多数的性能问题,但有时可能还需要配合其他手段。

内存调优

Spark 是基于内存的,所以合理地管理内存会提高程序的性能。想要有效地管理内存,就要知道对象占用了多少内存,访问对象的开销以及 GC 的执行情况。

首先,要知道对象占了多少内存,我们需要清楚有哪些地方会消耗掉内存。默认情况下,Java 可以快速访问对象,但是需要消耗的空间是原生数据类型2-5倍。Java 对象为什么会比原生数据类型多消耗这么多空间呢?原因有以下几点:

  • 每个 Java 对象都有一个“对象头”,对象头大约 16 个字节,包含类似指向类的指针信息。如果一个对象很小,对象头可能比对象包含的数据还要大。
  • Java 的 String 在原始的 String 数据上会额外占用大约 40 字节 ,用来存储如长度之类的信息。由于 Java 使用 UTF-16 编码,所以每个字符会占用 2 个字节。一个 10 个字符的 String 大概会占用 60 个字节。
  • HashMap 和 LinkedList 这样常见的数据结构使用了链表数据结构,每一个 entry 都是一个包装对象,这些对象不光包含对象头,还包含了指向下一个对象的引用,这个引用会额外占用 8 个字节。
  • 常见的原始数据类型集合都会以包装类形式存储,比如 Integer。

因此,降低内存消耗的一种方法是不使用带有头和额外开销的数据结构。比如

  • 使用数组和原始数据类型代替集合和包装类。
  • 尽量避免嵌套的数据结构。
  • 使用数值型和枚举型代替字符串作为 id 。
  • 对于内存小于 32 G的机器,使用 JVM 的 XX:+UseCompressedOops 参数限制引用长度为 4 字节。

在 Spark 中内存被用来执行计算(shuffle、join、sort、aggregations)和存储数据(缓存和广播数据)。执行计算和存储数据使用的是同一块区域(M),如果执行计算的内存没有被使用,存储数据可以占用这些内存,反之,执行计算也可以使用没有被存储占用的内存。如果一个程序没有使用缓存,那么程序可以将所有内存用来执行计算。虽然执行计算可以释放存储以获取更多的内存,但是缓存在不超过阈值(R)的情况下,才会被释放。

关于内存的划分,Spark 提供了两个参数,但大多数场景下,用户是不需要调整 Spark 的配置的。

  • spark.memory.fraction 默认为 0.6 ,即 M 的 60% 。剩余的 40% 空间用来存储。
  • spark.memory.storageFraction 默认为 0.5 ,即 R 占 M 的 50% 。

GC

当您的程序中存储的 RDDs 有很大的“搅动”时,JVM 垃圾收集可能会成为一个问题。当 Java 需要清除旧对象为新对象腾出空间时,它需要跟踪所有的 Java 对象并找到未使用的对象。这里要记住的要点是,垃圾收集的成本与 Java 对象的数量成比例,所以使用较少对象的数据结构(例如,一个 Ints 数组而不是一个 LinkedList )可以大大降低这个成本。一种更好的方法是将以序列化的形式存储对象,如上所述:现在每个RDD分区只会有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要尝试的是使用序列化缓存。

GC调优的第一步是收集有关垃圾收集发生频率和花费的时间的统计信息。这可以通过添加 -verbose:gc -XX:+PrintGCDetails -XX:+printgctimetstamp 选项来完成。

在进行调优之前,需要了解 Java 的 GC 模型。

  • Java 堆空间分为两个区域,分别是 Young 和 Old。Young 用来保存寿命较短的对象,而 Old 则用来保存具有较长寿命的对象。
  • Young 被进一步划分为三个区域:Eden, Survivor1, Survivor2 。
  • 当 Eden 满时,会运行一次小的 GC ,Eden 和 Survivor1 中没有被回收的对象被复制到 Survivor2 。当对象寿命足够长或 Survivor2 满了,对象将被移到 Old 。最后,当 Old 接近饱和时,将调用 full GC 。

在Spark中,GC 调优的目标是确保长寿的 RDD 只存储在 Old 中,同时 Young 有足够的空间来存储寿命较短的 RDD 。这样有助于避免在执行期间调用 full GC 来回收临时对象。以下方法可以参考:

  • 通过收集 GC 统计信息检查是否有太多的 GC。如果在任务执行过程中多次出现 full GC,说明没有足够的内存可用来执行任务。
  • 如果有大量的小 GC ,可以调整 Eden 的大小。可以将 Eden 的大小设置为任务所需要内存的最大值。如果将 Eden 的大小确定为 E ,那么 Young 的大小可以设置为 -Xmn=4/3 E。例如,从 HDFS 上读取数据,可以根据 HDFS 使用的数据块来估算需要的内存。如果有 3-4 个 HDFS 数据块,每个块的大小是 128 M,那么需要的 Eden 大小大概是 4 * 3 * 128 M 。(压缩块的大小是普通块大小的 2-3 倍)
  • 在打印的 GC 统计数据中,如果 Old 快满了,那么通过降低 spark.memory.fraction 来减少用于缓存的内存的数量或者减小 -Xmn 的值。如果无效,可以尝试调大 JVM 的 NewRatio 参数。
  • 使用 G1 垃圾回收器。设置参数 -XX:+UseG1GC 。使用 -XX: g1heap 增加 G1 的大小。

剩下的工作就是在我们修改 GC 的参数后,GC 统计信息是否有变化。

并行度

Spark 的 shuffle 操作会在每一个任务中创建散列表进行计算,所以结果会很大,有时候会出现 OutOfMemoryError 。这时我们可以增加任务的并行度使每个 shuffle 的输入更小,执行的更快。shuffle 操作的默认使用父 RDD 的分区数量,可以通过修改 spark.default.parallelism 或方法输入参数来改变默认数量。一般推荐的数量是每个 CPU 2-3 个任务。

数据位置