Distributed System | Spark RDD 论文总结

本篇文章是对Spark RDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。

RDD

Motivation

传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象 —— RDD

设计思想及特点

Resilient Distributed Dataset(RDD)是Apache Spark中数据的核心抽象,它是一种只读的、分区的数据记录集合。

RDD的特点:

  • Lazy evaluation,只在需要的时候才进行计算
  • RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算
  • Resilient: 借助RDD lineage graph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance

那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:

1
2
3
4
5
6
7
val rdd = sc.parallelize(1 to 100)
val result = rdd.map(_ + 10)
.filter(_ > 15)
.map(x => (x, 1))
.reduceByKey(_+_)
.collect

并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。

RDD的表示

Spark中的RDD主要包含五部分信息:

  • partitions(): partition集合
  • dependencies(): 当前RDD的dependency集合
  • iterator(split, context): 对每个partition进行计算或读取操作的函数
  • partitioner(): 分区方式,如HashPartitionerRangePartitioner
  • preferredLocations(split): 访问某个partition最快的节点

所有的RDD都继承抽象类RDD。几种常见的操作:

  • sc#textFile: 生成HadoopRDD,代表可以从HDFS中读取数据的RDD
  • sc#parallelize: 生成ParallelCollectionRDD,代表从Scala集合中生成的RDD
  • map, flatMap, filter: 生成MapPartitionsRDD,其partition与parent RDD一致,同时会对parent RDD中iterator函数返回的数据进行对应的操作(lazy)
  • union: 生成UnionRDDPartitionerAwareUnionRDD
  • reduceByKey, groupByKey: 生成ShuffledRDD,需要进行shuffle操作
  • cogroup, join: 生成CoGroupedRDD

Operations

Spark里面对RDD的操作分为两种:transformationaction

  • transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作
  • action会依次执行计算操作并且得到结果

这些transformation和action在FP中应该是很常见的,如map, flatMap, filter, reduce, count, sum

对单个数据操作的transformation函数都在RDD抽象类内,而对tuple操作的transformation都在PairRDDFunctions包装类中。RDD可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions类,从而可以进行reduceByKey之类的操作。对应的implicit函数:

1
2
3
4
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

Dependency

上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineage graph(依赖图)。Dependency大致分两种:narrow dependency和wide dependency。

  • Narrow dependency(NarrowDependency): Parent RDD中的每个partition最多被child RDD中的一个partition使用,即一对一的关系。比如map, flatMap, filter等transformation都是narrow dependency
  • Wide dependency(ShuffleDependency):Parent RDD中的每个partition会被child RDD中的多个partition使用,即一对多的关系。比如join生成的RDD一般是wide dependency(不同的partitioner)

论文中的图例很直观地表示了RDD间的依赖关系:

Spark RDD Dependency

这样划分dependency的原因:

  1. Narrow dependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而wide dependency必须要等所有的parent RDD的结果都准备好以后再执行计算
  2. Narrow dependency失败以后,Spark只需要重新计算失败的parent RDD即可;而对于wide dependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算

Shuffle

Spark中的shuffle操作与MapReduce中类似,在计算wide dependency对应的RDD的时候(即ShuffleMapStage)会触发。

首先来回顾一下为什么要进行shuffle操作。以reduceByKey操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组map task来将每个分区写入到临时文件中,然后下一个stage端(reduce task)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在map side也可能在reduce side进行)。

Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。

  • Shuffle write(map task): SortShuffleWriter#write
  • Shuffle read(reduce task): ShuffleRDD#compute

Persistence

Checkpointing

Checkpoint的目的是保存那些计算耗时较长的RDD数据(long lineage chains),执行Checkpoint的时候会新提交一个Job,因此最好先persistcheckpoint

Cache/Persist

cachepersist用于缓存一些经常使用的RDD结果(但是不能太大)。

  • persist方法的主要作用是改变StorageLevel以在compute的时候通过BlockManager进行相应的持久化操作
  • cache方法相当于设置存储级别为MEMORY_ONLY

Job Scheduling

简单来说,Spark会将提交的计算划分为不同的stages,形成一个有向无环图(DAG)。Spark的调度器会按照DAG的次序依次进行计算每个stage,最终得到计算结果。执行计算的几个重要的类或接口如下:

  • DAGScheduler
  • ActiveJob
  • Stage
  • Task
  • TaskScheduler
  • SchedulerBackend

这里面最为重要的就是 DAGScheduler 了,它会将逻辑执行计划(即RDD lineage)转化为物理执行计划(stage/task)。之前我们提到过,当开发者对某个RDD执行action的时候,Spark才会执行真正的计算过程。当开发者执行action的时候,SparkContext会将当前的逻辑执行计划传给DAGSchedulerDAGScheduler会根据给定的逻辑执行计划生成一个Job(对应ActiveJob类)并提交。每执行一个acton都会生成一个ActiveJob

提交Job的过程中,DAGScheduler会进行stage的划分。Spark里是按照shuffle操作来划分stage的,也就是说stage之间都是wide dependency,每个stage之内的dependency都是narrow dependency。这样划分的好处是尽可能地把多个narrow dependency的RDD放到同一个stage之内以便于进行pipeline计算,而wide dependency中child RDD必须等待所有的parent RDD计算完成并且shuffle以后才能接着计算,因此这样划分stage是最合适的。

划分好的stages会形成一个DAG,DAGScheduler会根据DAG中的顺序先提交parent stages(如果存在的话),再提交当前stage,以此类推,最先提交的是没有parent stage的stage。从执行角度来讲,一个stage的parent stages执行完以后,该stage才可以被执行。最后一个stage是产生最终结果的stage,对应ResultStage,而其余的stage都是ShuffleMapStage。下面是论文中stage划分的一个图例,非常直观:

DAG of stages

提交stage的时候,Spark会根据stage的类型生成一组对应类型的Task(ResultTaskShuffleMapTask),然后将这些Task包装成TaskSet提交到TaskScheduler中。一个Task对应某个RDD中的某一个partition,即一个Task只负责某个partition的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
// 此处代码略...
}

TaskScheduler会向执行任务的后端(SchedulerBackend,可以是Local, Mesos, Hadoop YARN或者其它集群管理组件)发送ReviveOffers消息,对应的执行后端接收到消息以后会将Task封装成TaskRunner(Runnable接口的实例),然后提交到底层的Executor中,并行执行计算任务。

Executor中的线程池定义如下:

1
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
1
2
3
4
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

可以看到底层执行task的线程池实际上是JUC中的CachedThreadPool,按需创建新线程,同时会复用线程池中已经建好的线程。

最后用一幅图总结一下Job, Stage和Task的关系(图来自 Mastering Apache Spark 2.0):

Stage, Job and Task in Spark

整个Spark Context执行task的步骤图:

Memory Management

Spark中RDD的存储方式有两种:in memory和on disk,默认是in memory的。进行分布式计算的时候通常会读入大量的数据,并且通常还需要重用这些数据,如果简单地把内存管理交给GC的话,很容易导致回收失败从而cause full GC,影响性能。

Spark 1.5开始不再通过GC管理内存。Spark 1.5实现了一个内存管理器用于手动管理内存(Project Tungsten),底层通过Unsafe类来直接分配和回收内存。

另外,分布式计算系统的GC方面还可以参考OSDI 2016的一篇论文: Yak: A High-Performance Big-Data-Friendly Garbage Collector

PageRank实例

下面在Spark中跑一个PageRank来观察一下生成的Stage DAG。PageRank的公式比较简单:

$$PageRank (p_i) = \frac{1-d}{N} + d \sum_{p_j \in M(p_i)} \frac{PageRank (p_j)}{L(p_j)} $$

这里我们选择damping factor=0.85,初始的rank值为1.0;PageRank算法可以用马尔科夫矩阵进行优化,但是这里迭代次数较小,可以直接进行迭代计算。对应代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val iters = 10
val data = sc.textFile("data.txt")
val links = data.map { s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap { case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()

对应的Stage DAG:

DAG of stages in PageRank Algorithm

其中Stage 3中的RDD dependencies如下:

One stage in PageRank Algorithm


References

文章目录
  1. 1. RDD
    1. 1.1. Motivation
    2. 1.2. 设计思想及特点
    3. 1.3. RDD的表示
  2. 2. Operations
  3. 3. Dependency
  4. 4. Shuffle
  5. 5. Persistence
    1. 5.1. Checkpointing
    2. 5.2. Cache/Persist
  6. 6. Job Scheduling
  7. 7. Memory Management
  8. 8. PageRank实例
  9. 9. References