Spark(一)Spark RDD

Spark RDD

Posted by Spencer on May 22, 2017

一. RDD简介

What is Spark RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可以并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性

RDD的属性

Internally, each RDD is characterized by five main properties

  • A list of partitions. RDD由一系列partition组成,对RDD来说,每个partition都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定partition个数,如果没有指定,默认采用程序所分配到的Cpu Core的数量

  • A function for computing each split. RDD的计算是以分片(split)为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果

  • A list of dependencies on other RDDs. RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算

  • Optionally, a Partitioner for key-value RDDs. 分区器作用于k,v格式的RDD上,当前Spark中实现了两种类型的分区器,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner

  • Optionally, a list of preferred location to compute each split on. RDD提供一系列最佳计算位置

如何创建RDD

有两种创建RDD的方式

  • Parallelizing an existing collection in your driver program

  • Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

RDD概念图

rdd1.png

二. RDD的依赖关系

RDD和它依赖的父RDD的关系有两种不同的类型,即shuffle依赖(shuffle dependency)和窄依赖(narrow dependency)

窄依赖

如果RDD与上游RDD分区是一对一的关系,那么RDD和其上游RDD之间的依赖关系属于窄依赖(NarrowDependency)。NarrowDependency一共有两个子类,分别为

  • OneToOneDependency
  • RangeDependency

shuffle依赖

RDD与上游RDD的分区如果不是一对一的关系,或者RDD的分区依赖于上游RDD的多个分区,那么这种依赖关系就叫做Shuffle依赖

三. RDD缓存机制

cache

  • cache作用 cache 能够让重复数据在同一个 application 中的 jobs 间共享。RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY

  • Application如何读取cache 下次计算(一般是同一 application 的下一个 job 计算)时如果用到 cached RDD,task 会直接去 blockManager 的 memoryStore 中读取。具体地讲,当要计算某个 rdd 中的 partition 时候(通过调用 rdd.iterator())会先去 blockManager 里面查找是否已经被 cache 了,如果 partition 被 cache 在本地,就直接使用 blockManager.getLocal() 去本地 memoryStore 里读取。如果该 partition 被其他节点上 blockManager cache 了,会通过 blockManager.getRemote() 去其他节点上读取

persist

  • persist持久化级别
    object StorageLevel {
    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    // 如果数据在内存中放不下,则溢写到磁盘上
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    // 如果数据在内存中放不下,则溢写到磁盘上,在内存中存放序列化后的数据
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(false, false, true, false)
    }
    

    Scala和Java中,默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中

需要注意的情况

  • 如果某个需要cache的数据大小超过executor-memory,spark并不会报错,Exectutor会缓存部分数据,缓存不了的数据在计算时会重新拉取

  • 如果缓存的数据太多,内存中装不下,Spark会自动利用最近最少使用(LRU)缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘

四. CheckPoint

checkPoint与cache区别

cache把 RDD 计算出来然后放在内存中, 但是RDD 的依赖链也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过依赖链重新计算出来;而 checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链,因为checkpoint是需要把 job 重新从头算一遍, 最好先cache一下, checkpoint就可以直接保存缓存中的 RDD 了, 就不需要重头计算一遍了, 对性能有极大的提升。

这里值得注意的是:cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但 checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。也就是说需要 checkpoint 的 RDD 会被计算两次。因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(),这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。

checkPoint和persist区别

rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 区别的是:前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉( 话说怎么 remove checkpoint 过的 RDD? ),是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用

如何使用checkPoint

// 首先需要设置checkpoint目录
sc.setCheckpointDir("hdfs://10.211.55.7:8020/sparkCheckPointTestDir")
val rdd1 = sc.textFile("....")
rdd1.checkpoint
rdd1.flapMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).saveAsTextFile("...")
// 再一次使用rdd1时会从checkPointDir中读取数据
rdd1.map(...)