RDD(Resilient Distributed Dataset)译作弹性分布式数据集,是Spark中最常用的数据抽象,是一个只可读、可分区、可并行计算的数据集合。RDD允许将工作集缓存在内存中进行复用,大大地提升了查询速度。
RDD简介
1、MapReduce 在面对日益复杂的业务逻辑时已经表现出严重的不足:
1.1)维护成本高昂,每一次数据处理都需要编写复杂的Map和Reduce步骤,中间某一步骤出错就要重试以处理异常;
1.2)难以上手,造成处理性能低;
2、因此人们提出用有向无环图(DAG)来抽象表达复杂的数据处理逻辑,各个数据处理步骤表示成图中的节点与边依赖关系,形成数据流的抽象表示,而把复杂的性能优化提交给后台自动处理;
3、RDD也即分布式对象集合,是一个只读的分区记录集合,每个RDD可以划分成多个分区,每个分区就是数据集的一部分,同时不同分区可以存储在集群中不同的节点上,从而利用集群节点优势进行并行计算;
4、RDD提供了丰富的操作以支持常见的数据处理,即“转换”(Transformation)和“行动”(Action)
两种类型操作:
- **转换操作:指定RDD的依赖关系,通过接受RDD并返回RDD,即从一个RDD转换到另外一个RDD;**
- **行动操作:执行计算并指定输出的形式,通过接受RDD返回输出值或结果(非RDD);**
5、通过Spark的API可以使用不同的语言调用RDD的操作,常见过程流程如下:
- 从各种数据源创建RDD;
- 对RDD指定一系列的转换操作;
- 最后调用行动操作,输出结果或写入外部数据源;
6、RDD操作的惰性机制,是指在RDD执行操作时,只有触发行动操作
才会做真正的计算,而在行动前的所有转换操作都只是记录下相互的依赖关系,形成数据流的管道化(pipeline),而不会做真正的计算;
RDD的创建
通过数据集合转化为RDD
1 | sc = SparkContext("local", "create_rdd") |
从HDFS数据源或本地文件创建
从一篇CNN新闻报道 Uber and Lyft may look the same, but their visions are not 中抽取新闻主体并放到文件news_sep.txt
中,一段话为一行。
1 | At first blush, it can be hard to tell Uber and ... |
通过指定文件路径读取为RDD:
1 | sc = SparkContext("local", "create_rdd") |
从其他数据库读取数据创建
(待更新)
使用数据流创建
结合流数据处理技术,如Spark Streaming、Kafka以及flume等,通过接收实时的输入数据流创建RDD。
一般RDD的转换操作(Transformation)
官方API文档详细列出转换操作函数,下面简单介绍RDD常用的转换操作:
flatMap()
flatMap(func)
:对于每一个输入元素,通过指定函数映射到0或多个元素,输出新的RDD。
1 | # 将上述新闻文本的每一行每一段话根据空格进行分词 |
map()
map(func)
:对于每一个输入元素,通过执行指定函数映射到唯一输出(1v1关系),产生新的RDD。
map()类似于Python中的map,针对RDD对应的列表的每一个元素,进行map()函数里面的尼玛函数(这个函数是map函数的一个参数)对应的操作,返回的仍然是一个RDD对象;
reduce()则是针对RDD对应的列表中的元素,递归地选择第一个和第二个元素进行操作,操作的结果作为一个元素用来替换这两个元素,注意,reduce返回的是一个Python可以识别的对象,非RDD对象。
1 | # 对上述分词结果的RDD中每一个单词去除标点符号,并转化为小写 |
filter()
filter(func)
:是对RDD元素进行过滤,把经过指定函数后返回值为true的元素组成一个新的RDD。
1 | # 对上述结果筛选掉一些常见的介词,如 |
distinct()
distinct([numPartitions])
:对数据进行去重,返回一个新的RDD,numPartitions参数用于设置任务并行数。
1 | # 输出新闻中所有提及过的单词 |
sample()
sample(withReplacement,fraction,seed=None)
:对数据进行采样,withReplacement参数表示是否放回抽样;fraction参数表示抽样比例;seed表示随机种子。
1 | # 随机抽取新闻提及部分单词 |
union()
union(otherRDD)
:可以与另一个RDD数据集合并,合并后并不会去掉重复的数值,它返回一个新的RDD。
1 | rdd1 = sc.parallelize(["union","other","rdd"]) |
intersection()
intersection(otherRDD)
:可以与另一个RDD数据集进行求交集计算,返回新的RDD。
1 | rdd1 = sc.parallelize([1,2,3,4,5]) |
subtract()
subtract(otherRDD,[numPartitions])
:是对otherRDD进行减法操作,将原始RDD的元素减去新输入RDD的元素,将差值返回新RDD。
1 | rdd1 = sc.parallelize([1,2,3,4,5]) |
cartesian()
cartesian(otherRDD)
:可以对两个RDD数据集U,V求笛卡尔积,返回一个新的RDD数据集,其中每个元素为(u,v)。
1 | rdd1 = sc.parallelize([1,2]) |
键值对RDD的转换操作(Transformation)
map()
map(func)
:操作可以将一般RDD转换为键值对RDD,元素变成(K,V)。
1 | # 对上述新闻分词结果的RDD中每一个单词转化为(w,1)键值对 |
reduceByKey()
reduceByKey(func,[numPartitions])
:可以对具有相同键的值进行合并,返回一个新的键值对RDD,numPartitions用于设置任务并行数。
需要区分的是:
reduce()最终只返回一个值,reduceByKey()和reduceByKeyLocally()均是将Key相同的元素合并。
区别在于,reduce()和reduceByKeyLocally()函数均是将RDD转化为非RDD对象,而reduceByKey()将RDD对象转化为另一个RDD对象,需要collect()函数才能输出。
1 | # 对新闻出现过的单词进行词频统计 |
groupByKey()
groupByKey([numPartitions])
:可以对具有相同键的值进行分组,返回一个元素为(K,[Iterable])的键值对RDD,numPartitions用于指定任务并行数,默认为8。
1 | # 对新闻出现过的单词进行词频统计 |
aggregateByKey()
aggregateByKey(zeroValue,seqFunc,combFunc,[numPartitions])
:可以对具有相同键的值进行聚合,把(K,V)键值对RDD转换为新的(K,U)键值对RDD,其中U由给定的combFunc和中立零值zeroValue聚合而成,U可以有与V不一致的形式;
- zeroValue 可以是0如果聚合的目的是求和,可以是List如果目的是对值进行统合,可以是Set如果目的是聚合唯一值;
- seqFunc: (U,V) => U 对分区内的元素进行聚合(操作发生在每个分区内部);
- combFunc: (U,U) => U 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
- numPartitions 用于设置任务并行数;
为什么使用两个函数?见Apache Spark aggregateByKey Example
1 | # 同样对新闻出现过的单词进行词频统计 |
combineByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,[numPartitions])
:可以对具有相同键的值按照自定义函数的逻辑进行聚合,把(K,V)键值对RDD转换为新的(K,U)键值对RDD,U可以有与V不一致的形式;
- createCombiner: V => C 创建新的聚合器方便后续步骤操作,对原始值进行附加操作并返回,跟flatMap()类似;
- mergeValue: (C,V) => C 对分区内的元素进行聚合(操作发生在每个分区内部);
- mergeCombiners: (C,C) => C 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
- numPartitions 用于设置任务并行数;
groupByKey(), groupByKey(), aggregateByKey() 等都不同程度上依赖于 combineByKey() 操作
1 | # 对词频统计的结果求频数均值,例如 |
sortByKey()
sortByKey(ascending,[numPartitions])
:可以对键值对RDD按照键进行排序操作,其中K需要实现Ordered方法。
- ascending 决定RDD中的元素按升序还是降序排序,默认是True升序;
- numPartitions 用于设置任务并行数;
1 | # 对词频统计结果按降序排序(先把K-V值互换) |
keys()与values()
keys(),values()
:分别把键值对RDD的key和value返回形成一个新的RDD。
1 | # 获得上一步排序结果的key或value |
mapValues()
mapValues(func)
:可以对键值对RDD每个元素的value加载到预定义函数进行操作,而不改变key。
1 | # 例如对词频统计结果值都减去1 |
join()
join(otherRDD,[numPartitions])
:与关系数据库查询一样,表示内连接,给定两个键值对RDD如(K,V1)和(K,V2),对于两个数据集都存在的key才对其输出,得到一个新的RDD,元素为(K,(V1,V2))。除此以外,还包括其他情形:
- fullOuterJoin(otherRDD,[numPartitions]) 全连接
- leftOuterJoin(otherRDD,[numPartitions]) 左外连接
- rightOuterJoin(otherRDD,[numPartitions]) 右外连接
1 | rdd1 = sc.parallelize([('spark',1),('hadoop',1)]) |
RDD的行动操作(Action)
官方API文档详细列出行动操作函数,下面简单介绍常用的行动操作:
count()
count()
:返回RDD数据集中元素的个数。
collect()
collect()
:以数组的形式返回RDD数据集的所有元素。
first()
first()
:返回RDD数据集的第一个元素。
top()
top(num,key=None)
:以数组的形式返回RDD数据集的前num个元素,默认按降序,或者通过key函数指定。
1 | rdd = sc.parallelize([5,24,3,12,46]) |
take()
take(num)
:以数组的形式返回RDD数据集的前num个元素。
takeOrdered()
takeOrdered(num,key=None)
:以数组的形式返回RDD数据集的前nu
m个元素,默认按升序排序,或者通过key函数指定。
1 | rdd = sc.parallelize([5,2,3,1,4]) |
takeSample()
takeSample(withReplacement,num,seed=None)
:对RDD数据集进行采样,并以数组的形式返回,withReplacement参数表示是否放回抽样;num参数表示抽样个数;seed表示随机种子。
1 | rdd = sc.parallelize([5,2,3,1,4]) |
lookup()
lookup(key)
:以数组的形式返回键值对RDD中键为key的所有值,如果RDD数据集经过特定转换操作按照key进行了分区,那么此行动操作效率会很高。
foreach()
foreach(func)
:将RDD数据集中的每个元素加载到指定函数进行操作,无返回值。
1 | # 对转换结果逐个输出 |
reduce()
reduce(func)
:通过指定函数(如求和、统计)对RDD数据集元素进行聚合。
1 | # 对所有元素进行求和 |
aggregate()
aggregate(zeroValue,seqOp,combOp)
:对RDD数据集的元素进行聚合,不要求返回值类型与RDD类型一致;
- zeroValue: U 给定初始值,形式与最终返回值U一致;
- seqOp: (U,V) => U 对分区内的元素进行聚合(操作发生在每个分区内部);
- combOp: (U,U) => U 对不同分区的聚合结果做进一步的聚合(操作发生在全部分区的聚合结果间);
1 | # 求一个数组元素的均值 |
countByKey()
countByKey()
:以字典的形式返回键值对RDD数据集中每个键的元素的统计数,即(K,count)
1 | rdd = sc.parallelize([("rdd",1),("rdd",2),("spark",2)]) |
countByValue()
countByValue()
:以字典的形式返回RDD数据集中每个元素的统计数,即(V,count)
1 | rdd = sc.parallelize([2,2,3,1,1]) |
saveAsTextFile()
saveAsTextFile(path, compressionCodecClass=None)
:把RDD数据集保存为文本文件,并可以指定是否压缩。
1 | f = NamedTemporaryFile(delete=True) |
RDD的持久化
- 由于RDD采用惰性机制,每次遇到行动操作都会根据DAG的依赖关系从头开始执行计算,如果遇到迭代计算,需要重复调用中间数据,会造成极大的计算开销;
- 可以通过持久化操作来解决以上的问题,用
persist()
方法对需要重复使用的RDD标记为持久化,当遇到第一次行动操作后,会把计算结果持久化,保存在计算节点的内存备用;
persist()
persist(storageLevel)
:storageLevel参数表示持久化级别,通过使用不同的级别可以把数据缓存到不同的位置,详见 RDD Persistence;其中使用cache()
函数会调用默认的持久化方法,即persist(MEMORY_ONLY)
将RDD作为反序列化的对象存储在JVM中;而unpersist()
方法则可以把持久化的RDD从缓存中删除。
1 | rdd = sc.parallelize(['spark','rdd','hadoop']) |
Reference: