189 8069 5689

Spark中的RDD到底是什么

这篇文章主要讲解了“Spark中的RDD到底是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Spark中的RDD到底是什么”吧!

创新互联专业为企业提供西平网站建设、西平做网站、西平网站设计、西平网站制作等企业网站建设、网页设计与制作、西平企业网站模板建站服务,10年西平做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

Spark是开源的分布式计算引擎,基于RDD来构造数据处理流程,并在集群间调度任务,通过分区数据管理机制来划分任务的并行度,并在任务之间交换分区数据,实现分布式的数据处理。

RDD是Spark中最重要的概念,理解了RDD是什么,基本也就理解了一半Spark的内部机密了。

1、RDD基类

RDD是Spark中表示数据集的基类,是可序列号的对象,因此RDD可在Spark节点中复制。RDD定义了数据迭代器来循环读取数据,以及在数据集上定义各类转换操作,生成新的RDD。

RDD的各种算子会触发生成新的RDD。如:

map操作生成MapPartitionsRDD。

filter操作也生成MapPartitionsRDD,filter操作其实是在之前的RDD迭代器上封装了一层filter操作,其实还是第一个迭代器,只不过这个迭代器会抛弃掉一些不满足的记录。

RDD的计算过程是通过compute方法来触发的。

1.1 RDD触发任务

submit过程是提交spark程序到集群,这时候会触发application事件和driver事件等,并通过master节点选择对应的node来创建app和driver,同时在node上执行spark jar包里的main方法。但task的真正执行要等到RDD的compute动作来触发的。

RDD通过compute触发任务,提交FinalStage给Dag执行。如collect(),count()等方法都会触发compute过程,间接提交任务。

RDD.compute()=> finalStage => dag.submitJob()=> submitMissingStage() .

dag.submitJob()=> scheduleImpl.launchTask()=>scheduleBackend => executorBackend=> executor.launchTask()=> executorBackend.taskComplete msg => scheduleBackend.taskCompleted=>dag.stageFinished()=> ...

上面是RDD提交任务的大致流程。Compute函数是触发函数,这会导致最后一个RDD被执行,也是finalStage;finalStage调用DAG的submitJob函数提交stage,这里的stage就是finalStage。

Stage是从源头到finalStage串起来的,执行的时候是反向寻找的,这句话要好好体会,这个过程其实就是RDD的秘密了。

Spark中的RDD到底是什么

我们先看下RDD的经典图例。图中中间的部分Transformation是RDD的计算过程,左边的HDFS示意数据源,右边的HDFS示意RDD的finalStage执行的操作(图中的操作是写入hdfs,当然也可以是print操作等等,就看你怎么写了)。

Stage1和stage2是窄依赖,map和union都是窄依赖;stage3是宽依赖,这里是join操作。窄依赖的意思就是操作只依赖一个stage的数据,宽依赖的意思是依赖于多个stage,对这多个stage的数据要做全连接操作。

1.2、RDD执行示例

RDD通过runJob调用来获得执行,如下:

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

Sc是SparkContext。

对每个分区执行func操作,返回结果是一个长度等于分区数的Array。

Sc.runJob再调dagScheduler.runJob方法。具体可以看DagScheduler的作业执行步骤,这里先不说,看笔者的专门论述DagScheduler的文章。

1.3、迭代器

RDD实际执行是通过迭代器读取数据的。

RDD是抽象类,定义了几个接口:

分别是getPartitions、compute、getPreferredLocations。RDD数据是分区存储,每一个分区可能分布在申请spark资源的任何位置。这三个接口可以描述RDD的全部信息,其中getPreferredLocations这个方法是和计算本地化有关的,这里我们就先忽略它,不影响我们理解RDD的原理。

override protected def getPartitions: Array[Partition] = {}
override def compute(split: Partition, context: TaskContext): Iterator[java.lang.Integer] = new NextIterator[Integer] {}

getPartitions方法我们也不用太关注,它的作用是返回一个分区列表,表示这个RDD有几个分区,实际运行的时候RDD的每个分区会被安排到单独的节点上运行,这样来实现分布式计算的。

我们最关心的是compute的方法,这个方法返回一个迭代器,这个迭代器就是这个RDD的split这个分区的数据集。至于这个迭代器的数据是什么,是在compute方法体中写代码来生成的。我们可以定义自己的RDD,只要写代码实现这几个方法就可以了!

自定义RDD有什么好处呢?最大的好处就是可以把自己的数据集纳入到Spark的分布式计算体系中,帮助你实现数据分区,任务分配,和其他RDD执行全连接汇聚操作等。

言归正传,回到compute方法本身。

怎么获得Iterator[T],对ShuffleRDD来说是从BlockManager获取迭代器Iterator[T]。这种迭代器是blockResult,是ShuffleMapTask执行结果的保存格式;另一种就是直接获得iter,这种是ResultTask的执行结果的数据。

第一种情况,看BlockManager能否找到本RDD的partition的BlockResult。看看getOrElseUpdate方法还传递了一个函数作为最后一个入参,如果不存在指定的BlockResult,则返回入参函数来计算得到iter,方法体定义如下:

() => {
  readCachedBlock = false
  computeOrReadCheckpoint(partition, context)
}

主要就是调用computeOrReadCheckpoint方法计算分区。

def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

computeOrReadCheckpoint得到Iterator,如果是checkpoint的那么调用第一个父类的iterator方法得到Iterator,这里父类就是CheckpointRDD;否则就是调用compute方法得到Iterator。

所以,RDD的迭代器的实际获取分成两步:

首先,判断是否存在该RDD指定partition的BlockResult,如果存在则将BlockResult作为Iterator结果,此时表示该RDD是shuffleRDD之类。

然后如果上述不满足,则又分两种情况,第一种这是checkpoint的RDD,则调用父RDD的iterator方法(此时父RDD就是CheckpointRDD);否则调用compute方法来获得Iterator。

2、Stage划分

我们知道RDD的提交Spark集群执行是分阶段划分Stage提交的。从最后一个Stage开始,依次循环递归判断是否要调用依赖RDD的Stage,Stage的划分是根据是否要Shuffle作为分界点的。

如果某个RDD的依赖(dep)是ShuffleDependency,则次RDD作为ShuffleMapTask任务提交,否则最后一个RDD作为ResultTask提交。

递归提交Stage,对ShuffleMapTask类型的RDD,会一直递归判断该RDD是否存在前置的ShuffleDependency,如果存在则递归提交前依赖RDD。

整个Spark作业是RDD串接的,如果不存在Shuffle依赖,则提交最后一个RDD,并且只有这一个RDD被提交。在计算最后一个RDD的iterator时,被调用到父RDD的iterator方法,此时父RDD一般都是MapPartitionsRDD。在MapPartitionsRDD中有进一步叙述。

3、RDD子类

RDD含有多个子类,如MapPartitionRDD,HadoopRDD、CoGroupedRDD等等。笔者这里就找几个例子简单说明一下他们的内部逻辑。 

3.1 MapPartitionsRDD

MapPartitionsRDD是RDD的子类,前面看到RDD的诸多算子都会生成新的MapPartitionRDD。

MapPartitionsRDD的构造函数需要入参f,它是一个函数抽象类或者叫做泛类。

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

f的入参有三个:

(1) TaskContext:是任务上下文

(2) Int:是分区编码

(3) Iterator[T]是分区迭代器

f的输出也是一个Iterator迭代器。可以看出,f是一个抽象的从一个迭代器生成另一个迭代器的自定义函数。对数据的处理逻辑就是体现在f上。

MapPartitionRDD中触发计算的compute方法定义如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

这里的f是MapPartitionRDD的构造函数中传进入的入参,是用户自定义的map函数。这样,通过RDD的map、flatmap等算子和MapPartitionRDD,可以将RDD上的一系列操作不停的串联下去。

3.2 CoalescedRDD

CoalescedRDD将M个分区的RDD重新分成N个分区,形成新的RDD。在计算过程中,会引起Shuffle工程。

首先CoalescedRDD需要一个重新分区算法,将M个分区如何划分到N个分区,这里M>N。重新分区的结果是N的每个分区对应了M的多个分区,用List来表示,List中每个Int表示父RDD中M个分区之一的编号。

如果CoalescedRDD没有指定自己的重新分区算法,则用DefaultPartitionCoalescer来做重新分区计算。

CoalescedRDD的compute过程如下:

override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
  partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { 
    parentPartition => firstParent[T].iterator(parentPartition, context)
  }
}

partition.parents是指CoalescedRDD的第partition分区所对应的父RDD的分区列表,对分区列表的每个分区,执行:

firstParent[T].iterator(parentPartition, context)

然后得到最终的Iterator[T]。这段应该不难理解。

需要留意的是,这里得到的Iterator[T]最终是要写到Shuffle的,因为CoalescedRDD对应的ShuffleMapTask而不是ResultTask。

对于理解Spark计算流程来说,理解了Shuffle的过程,也就解决了一半的疑惑了。

感谢各位的阅读,以上就是“Spark中的RDD到底是什么”的内容了,经过本文的学习后,相信大家对Spark中的RDD到底是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


名称栏目:Spark中的RDD到底是什么
网页网址:http://cdxtjz.cn/article/gdpdoi.html

其他资讯