本篇文章是对Spark RDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。
RDD
Motivation
传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象 —— RDD。
设计思想及特点
Resilient Distributed Dataset(RDD)是Apache Spark中数据的核心抽象,它是一种只读的、分区的数据记录集合。
RDD的特点:
- Lazy evaluation,只在需要的时候才进行计算
- RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算
- Resilient: 借助RDD lineage graph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance
那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:
|
|
并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。
RDD的表示
Spark中的RDD主要包含五部分信息:
partitions()
: partition集合dependencies()
: 当前RDD的dependency集合iterator(split, context)
: 对每个partition进行计算或读取操作的函数partitioner()
: 分区方式,如HashPartitioner
和RangePartitioner
preferredLocations(split)
: 访问某个partition最快的节点
所有的RDD都继承抽象类RDD
。几种常见的操作:
sc#textFile
: 生成HadoopRDD
,代表可以从HDFS中读取数据的RDDsc#parallelize
: 生成ParallelCollectionRDD
,代表从Scala集合中生成的RDD
map
,flatMap
,filter
: 生成MapPartitionsRDD
,其partition与parent RDD一致,同时会对parent RDD中iterator
函数返回的数据进行对应的操作(lazy)union
: 生成UnionRDD
或PartitionerAwareUnionRDD
reduceByKey
,groupByKey
: 生成ShuffledRDD
,需要进行shuffle操作cogroup
,join
: 生成CoGroupedRDD
Operations
Spark里面对RDD的操作分为两种:transformation 和 action。
- transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作
- action会依次执行计算操作并且得到结果
这些transformation和action在FP中应该是很常见的,如map
, flatMap
, filter
, reduce
, count
, sum
。
对单个数据操作的transformation函数都在RDD
抽象类内,而对tuple操作的transformation都在PairRDDFunctions
包装类中。RDD
可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions
类,从而可以进行reduceByKey
之类的操作。对应的implicit函数:
|
|
Dependency
上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineage graph(依赖图)。Dependency大致分两种:narrow dependency和wide dependency。
- Narrow dependency(
NarrowDependency
): Parent RDD中的每个partition最多被child RDD中的一个partition使用,即一对一的关系。比如map
,flatMap
,filter
等transformation都是narrow dependency - Wide dependency(
ShuffleDependency
):Parent RDD中的每个partition会被child RDD中的多个partition使用,即一对多的关系。比如join
生成的RDD一般是wide dependency(不同的partitioner)
论文中的图例很直观地表示了RDD间的依赖关系:
这样划分dependency的原因:
- Narrow dependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而wide dependency必须要等所有的parent RDD的结果都准备好以后再执行计算
- Narrow dependency失败以后,Spark只需要重新计算失败的parent RDD即可;而对于wide dependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算
Shuffle
Spark中的shuffle操作与MapReduce中类似,在计算wide dependency对应的RDD的时候(即ShuffleMapStage)会触发。
首先来回顾一下为什么要进行shuffle操作。以reduceByKey
操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组map task来将每个分区写入到临时文件中,然后下一个stage端(reduce task)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在map side也可能在reduce side进行)。
Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。
- Shuffle write(map task):
SortShuffleWriter#write
- Shuffle read(reduce task):
ShuffleRDD#compute
Persistence
Checkpointing
Checkpoint的目的是保存那些计算耗时较长的RDD数据(long lineage chains),执行Checkpoint的时候会新提交一个Job,因此最好先persist
后checkpoint
。
Cache/Persist
cache
和persist
用于缓存一些经常使用的RDD结果(但是不能太大)。
persist
方法的主要作用是改变StorageLevel
以在compute
的时候通过BlockManager
进行相应的持久化操作cache
方法相当于设置存储级别为MEMORY_ONLY
Job Scheduling
简单来说,Spark会将提交的计算划分为不同的stages,形成一个有向无环图(DAG)。Spark的调度器会按照DAG的次序依次进行计算每个stage,最终得到计算结果。执行计算的几个重要的类或接口如下:
DAGScheduler
ActiveJob
Stage
Task
TaskScheduler
SchedulerBackend
这里面最为重要的就是 DAGScheduler 了,它会将逻辑执行计划(即RDD lineage)转化为物理执行计划(stage/task)。之前我们提到过,当开发者对某个RDD执行action的时候,Spark才会执行真正的计算过程。当开发者执行action的时候,SparkContext
会将当前的逻辑执行计划传给DAGScheduler
,DAGScheduler
会根据给定的逻辑执行计划生成一个Job(对应ActiveJob
类)并提交。每执行一个acton都会生成一个ActiveJob
。
提交Job的过程中,DAGScheduler
会进行stage的划分。Spark里是按照shuffle
操作来划分stage的,也就是说stage之间都是wide dependency,每个stage之内的dependency都是narrow dependency。这样划分的好处是尽可能地把多个narrow dependency的RDD放到同一个stage之内以便于进行pipeline计算,而wide dependency中child RDD必须等待所有的parent RDD计算完成并且shuffle
以后才能接着计算,因此这样划分stage是最合适的。
划分好的stages会形成一个DAG,DAGScheduler
会根据DAG中的顺序先提交parent stages(如果存在的话),再提交当前stage,以此类推,最先提交的是没有parent stage的stage。从执行角度来讲,一个stage的parent stages执行完以后,该stage才可以被执行。最后一个stage是产生最终结果的stage,对应ResultStage
,而其余的stage都是ShuffleMapStage
。下面是论文中stage划分的一个图例,非常直观:
提交stage的时候,Spark会根据stage的类型生成一组对应类型的Task
(ResultTask
或ShuffleMapTask
),然后将这些Task
包装成TaskSet
提交到TaskScheduler
中。一个Task
对应某个RDD中的某一个partition,即一个Task
只负责某个partition的计算:
|
|
TaskScheduler
会向执行任务的后端(SchedulerBackend
,可以是Local, Mesos, Hadoop YARN或者其它集群管理组件)发送ReviveOffers
消息,对应的执行后端接收到消息以后会将Task
封装成TaskRunner
(Runnable
接口的实例),然后提交到底层的Executor
中,并行执行计算任务。
Executor
中的线程池定义如下:
|
|
|
|
可以看到底层执行task的线程池实际上是JUC中的CachedThreadPool
,按需创建新线程,同时会复用线程池中已经建好的线程。
最后用一幅图总结一下Job, Stage和Task的关系(图来自 Mastering Apache Spark 2.0):
整个Spark Context执行task的步骤图:
Memory Management
Spark中RDD的存储方式有两种:in memory和on disk,默认是in memory的。进行分布式计算的时候通常会读入大量的数据,并且通常还需要重用这些数据,如果简单地把内存管理交给GC的话,很容易导致回收失败从而cause full GC,影响性能。
Spark 1.5开始不再通过GC管理内存。Spark 1.5实现了一个内存管理器用于手动管理内存(Project Tungsten),底层通过Unsafe
类来直接分配和回收内存。
另外,分布式计算系统的GC方面还可以参考OSDI 2016的一篇论文: Yak: A High-Performance Big-Data-Friendly Garbage Collector。
PageRank实例
下面在Spark中跑一个PageRank来观察一下生成的Stage DAG。PageRank的公式比较简单:
$$PageRank (p_i) = \frac{1-d}{N} + d \sum_{p_j \in M(p_i)} \frac{PageRank (p_j)}{L(p_j)} $$
这里我们选择damping factor=0.85,初始的rank值为1.0;PageRank算法可以用马尔科夫矩阵进行优化,但是这里迭代次数较小,可以直接进行迭代计算。对应代码:
|
|
对应的Stage DAG:
其中Stage 3中的RDD dependencies如下: