Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark系列: Transformations算子讲解 #9

Open
shilinlee opened this issue Jun 30, 2019 · 1 comment
Open

Spark系列: Transformations算子讲解 #9

shilinlee opened this issue Jun 30, 2019 · 1 comment
Labels
RDD Resilient Distributed Dataset Spark Apache Spark 大数据 整个大数据体系

Comments

@shilinlee
Copy link
Owner

shilinlee commented Jun 30, 2019

Value数据类型的Transformation 

输入分区与输出分区一对一型

map

将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。

下图每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。但是,实际只有等到 Action算子触发后,这个 f 函数才会和其他函数在一个stage 中对数据进行运算。

image

flatMap

将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。

下图 表 示 RDD 的 一 个 分 区 ,进 行 flatMap函 数 操 作, flatMap 中 传 入 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为V’1、 V’2、 V’3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。

image

mapPartitions

mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 作。 内 部 实 现 是 生 成MapPartitionsRDD。

下图中的方框代表一个 RDD 分区。图中,用户通过函数 f (iter)=>iter.filter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、 2、 3 的分区过滤只剩下元素 3。

image

glom

glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。

图中的每个方框代表一个RDD分区。该图表示含有V1、 V2、 V3的分区通过函数glom形成一数组Array[(V1),(V2),(V3)]

image

输入分区与输出分区多对一型

union

使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。

图中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。含有V1、V2、U1、U2、U3、U4的RDD和含有V1、V8、U5、U6、U7、U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,U1、U2、U3、U4、U5、U6、U7、U8形成一个分区。

image

>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd1
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
>>> rdd2 = sc.parallelize([4, 5, 6])
>>> rdd3 = sc.parallelize([7, 8, 9])
>>> rdd = sc.union([rdd1, rdd2, rdd3])
>>> rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> first = rdd1.union(rdd2)
>>> first.collect()
[1, 2, 3, 4, 5, 6]

cartesian

对 两 个 RDD 内 的 所 有 元 素 进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD

图中左侧大方框代表两个RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。

例 如: V1 和 另 一 个 RDD 中 的 W1、 W2、 Q5 进 行 笛 卡 尔 积 运 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。

image

>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd2 = sc.parallelize([4, 5, 6])
>>> rdd = rdd1.cartesian(rdd2)
>>> rdd.collect()
[(1, 4), (1, 5), (1, 6), (2, 4), (2, 5), (2, 6), (3, 4), (3, 5), (3, 6)]

输入分区与输出分区多对多型

groupByKey(Avoid)

将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
函数实现如下:

  • 将用户函数预处理:val cleanF = sc.clean(f)

  • 对数据进行map函数操作,最后再进行 groupByKey 分组操作。

    this.map(t => (cleanF(t), t)).groupByKey(p) 其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。

    image

输出分区为输入分区子集型

filter

filter 函数功能是对元素进行过滤,对每个元素应用 f 函数, 返回值为true的元素在RDD中保留,返回值为false 的元素将被过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。下面代码为函数的本质实现:

def filter(f:T=>Boolean):
	RDD[T] = new FilteredRDD(this, sc.clean(f))

image

distinct

distinct将RDD中的元素进行去重操作。

image

subtract

subtract相当于进行集合的操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

图中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。 右侧大方框
代表合并后的RDD,大方框内的小方框代表分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。

image

sample

sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。

函数参数设置:

  • withReplacement=true,表示有放回的抽样。
  • withReplacement=false,表示无放回的抽样。

通 过 sample 函 数, 采 样 50% 的 数 据。V1、 V2、 U1、 U2、U3、U4 采样出数据 V1 和 U1、 U2 形成新的 RDD。

image

takeSample

takeSample函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。

通过takeSample对数据采样,设置为采样一份数据,返回结果为V1。

image

Cache型

cache

cache 将 RDD 元素从磁盘缓存到内存。相当于persist(MEMORY_ONLY) 函数的功能。

图中每个方框代表一个 RDD 分区,左侧相当于数据分区都存储在磁盘,通过 cache 算子将数据缓存在内存。

image

persist

persist 函数对 RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。 有以下几种类型的组合: DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

下面为函数定义, StorageLevel 是枚举类型,代表存储模式,用户可以通过图 按需进行选择。 persist(newLevel:StorageLevel)
例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。

image

Key-Value数据类型的Transfromation

输入分区与输出分区一对一(???)

mapValues

针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

图中的方框代表 RDD 分区。 a=>a+2 代表对 (V1,1) 这样的(Key Value)数据对,数据只对 Value 中的 1 进行加 2 操作,返回结果为 3。

image

对单个RDD聚集(PairRDDFunctions)

combineByKey

手动标星

查看源码

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
    	combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      	partitioner, mapSideCombine, serializer)(null)
  }

说明:

  • createCombiner: V => C: C 不存在的情况下,比如通过 V 创建 seq C。

  • mergeValue: (C, V) => C: 当 C 已经存在的情况下,需要 merge,比如把 item V 加到 seq C 中,或者叠加。

  • mergeCombiners: (C, C) => C,合并两个 C。

  • partitioner: Partitioner:Shuffle 时需要的 Partitioner。

  • mapSideCombine : Boolean = true: 为了减小传输量,很多 combine 可以在 map端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,再 shuffle。

  • serializerClass: String = null,传输需要序列化,用户可以自定义序列化类

    image

reduceByKey

查看源码

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] =     
	self.withScope {
    	combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),比如叠加。所以 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。

image

partitionBy

查看源码

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
}

对RDD进行分区操作。

如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。

图的方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。

image

对两个RDD聚集

cogroup

查看源码

def cogroup[W](
      other: RDD[(K, W)],
      numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
      cogroup(other, new HashPartitioner(numPartitions))
}

连接(PairRDDFunctions)

join

查看源码

join 对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。

下面代码为 join 的函数实现,本质是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再通过flatMapValues将合并的数据打散。

this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }

图 是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。

image

leftOutJoin 和 rightOutJoin

@shilinlee shilinlee changed the title Spark系列: transformation算子讲解 Spark系列: Transformations算子讲解 Jun 30, 2019
@shilinlee shilinlee added Spark Apache Spark 大数据 整个大数据体系 labels Jun 30, 2019
@shilinlee shilinlee added the RDD Resilient Distributed Dataset label Jul 3, 2019
@lllong33
Copy link

ref: 并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()
spark3 可以做到自身去重的, 将所有RDD合并成一个新的去重

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
RDD Resilient Distributed Dataset Spark Apache Spark 大数据 整个大数据体系
Projects
None yet
Development

No branches or pull requests

2 participants